From 9bb08e772a3ac946466906d89cf460d852e6144d Mon Sep 17 00:00:00 2001 From: Dumitru Ceara Date: Wed, 27 Nov 2019 14:15:22 +0100 Subject: [PATCH ovn 2/4] ovn-controller: Add per node states to I-P engine. This commit transforms the 'changed' field in struct engine_node in a 'state' field. Possible node states are: - "Stale": data in the node is not up to date with the DB. - "Updated": data in the node is valid but was updated during the last run of the engine. - "Valid": data in the node is valid and didn't change during the last run of the engine. - "Aborted": during the last run, processing was aborted for this node. This commit also further refactors the I-P engine: - instead of recursively performing all the engine processing a preprocessing stage is added (engine_get_nodes()) before the main processing loop is executed in order to topologically sort nodes in the engine such that all inputs of a given node appear in the sorted array before the node itself. This simplifies a bit the code in engine_run(). - remove the need for using an engine_run_id by using the newly added states. - turn the global 'engine_abort_recompute' into an argument to be passed to engine_run(). It's relevant only in the current run context anyway as we reset it before every call to engine_run(). Signed-off-by: Dumitru Ceara Signed-off-by: Han Zhou (cherry picked from upstream commit 5ed53faecef12c09330ced445418c961cb1f8caf) Change-Id: I9c3322fe8eb51ffcad62797f5f2a7306bb0c53f4 --- ovn/controller/ovn-controller.c | 101 +++++++++--------- ovn/lib/inc-proc-eng.c | 231 ++++++++++++++++++++++++++++------------ ovn/lib/inc-proc-eng.h | 74 +++++++++---- 3 files changed, 268 insertions(+), 138 deletions(-) diff --git a/ovn/controller/ovn-controller.c b/ovn/controller/ovn-controller.c index ad5b067..8c474e9 100644 --- a/ovn/controller/ovn-controller.c +++ b/ovn/controller/ovn-controller.c @@ -722,10 +722,10 @@ en_ofctrl_is_connected_run(struct engine_node *node) (struct ed_type_ofctrl_is_connected *)node->data; if (data->connected != ofctrl_is_connected()) { data->connected = !data->connected; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return; } - node->changed = false; + engine_set_node_state(node, EN_VALID); } struct ed_type_addr_sets { @@ -775,7 +775,7 @@ en_addr_sets_run(struct engine_node *node) addr_sets_init(as_table, &as->addr_sets); as->change_tracked = false; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -794,11 +794,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node) addr_sets_update(as_table, &as->addr_sets, &as->new, &as->deleted, &as->updated); - node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) - || !sset_is_empty(&as->updated); + if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) || + !sset_is_empty(&as->updated)) { + engine_set_node_state(node, EN_UPDATED); + } else { + engine_set_node_state(node, EN_VALID); + } as->change_tracked = true; - node->changed = true; return true; } @@ -849,7 +852,7 @@ en_port_groups_run(struct engine_node *node) port_groups_init(pg_table, &pg->port_groups); pg->change_tracked = false; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -868,11 +871,14 @@ port_groups_sb_port_group_handler(struct engine_node *node) port_groups_update(pg_table, &pg->port_groups, &pg->new, &pg->deleted, &pg->updated); - node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) - || !sset_is_empty(&pg->updated); + if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) || + !sset_is_empty(&pg->updated)) { + engine_set_node_state(node, EN_UPDATED); + } else { + engine_set_node_state(node, EN_VALID); + } pg->change_tracked = true; - node->changed = true; return true; } @@ -1053,7 +1059,7 @@ en_runtime_data_run(struct engine_node *node) update_ct_zones(local_lports, local_datapaths, ct_zones, ct_zone_bitmap, pending_ct_zones); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -1119,10 +1125,10 @@ en_mff_ovn_geneve_run(struct engine_node *node) enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id(); if (data->mff_ovn_geneve != mff_ovn_geneve) { data->mff_ovn_geneve = mff_ovn_geneve; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return; } - node->changed = false; + engine_set_node_state(node, EN_VALID); } struct ed_type_flow_output { @@ -1284,7 +1290,7 @@ en_flow_output_run(struct engine_node *node) active_tunnels, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -1366,7 +1372,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node) flow_table, group_table, meter_table, lfrr, conj_id_ofs); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return handled; } @@ -1389,7 +1395,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node) lflow_handle_changed_neighbors(sbrec_port_binding_by_name, mac_binding_table, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1493,7 +1499,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node) chassis, ct_zones, local_datapaths, active_tunnels, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1542,7 +1548,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node) mff_ovn_geneve, chassis, ct_zones, local_datapaths, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1656,7 +1662,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } SSET_FOR_EACH (ref_name, updated) { if (!lflow_handle_changed_ref(ref_type, ref_name, @@ -1669,7 +1677,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } SSET_FOR_EACH (ref_name, new) { if (!lflow_handle_changed_ref(ref_type, ref_name, @@ -1682,7 +1692,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } return true; @@ -1904,9 +1916,6 @@ main(int argc, char *argv[]) unixctl_command_register("recompute", "", 0, 0, engine_recompute_cmd, NULL); - uint64_t engine_run_id = 0; - bool engine_run_done = true; - unsigned int ovs_cond_seqno = UINT_MAX; unsigned int ovnsb_cond_seqno = UINT_MAX; @@ -1914,7 +1923,7 @@ main(int argc, char *argv[]) exiting = false; restart = false; while (!exiting) { - engine_run_id++; + engine_init_run(); update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); @@ -2007,15 +2016,9 @@ main(int argc, char *argv[]) * this round of engine_run and continue processing * acculated changes incrementally later when * ofctrl_can_put() returns true. */ - if (engine_run_done) { - engine_set_abort_recompute(true); - engine_run_done = engine_run(&en_flow_output, - engine_run_id); - } + engine_run(false); } else { - engine_set_abort_recompute(false); - engine_run_done = true; - engine_run(&en_flow_output, engine_run_id); + engine_run(true); } } stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, @@ -2034,7 +2037,7 @@ main(int argc, char *argv[]) sbrec_meter_table_get(ovnsb_idl_loop.idl), get_nb_cfg(sbrec_sb_global_table_get( ovnsb_idl_loop.idl)), - en_flow_output.changed); + engine_node_changed(&en_flow_output)); pinctrl_run(ovnsb_idl_txn, sbrec_datapath_binding_by_key, sbrec_port_binding_by_datapath, @@ -2052,7 +2055,7 @@ main(int argc, char *argv[]) &ed_runtime_data.local_datapaths, &ed_runtime_data.active_tunnels); - if (en_runtime_data.changed) { + if (engine_node_changed(&en_runtime_data)) { update_sb_monitors(ovnsb_idl_loop.idl, chassis, &ed_runtime_data.local_lports, &ed_runtime_data.local_datapaths); @@ -2060,20 +2063,23 @@ main(int argc, char *argv[]) } } - if (engine_need_run(&en_flow_output, engine_run_id)) { - VLOG_DBG("engine did not run, force recompute next time: " - "br_int %p, chassis %p", br_int, chassis); - engine_set_force_recompute(true); - poll_immediate_wake(); - } else if (!engine_run_done) { + + if (!engine_has_run()) { + if (engine_need_run()) { + VLOG_DBG("engine did not run, force recompute next time: " + "br_int %p, chassis %p", br_int, chassis); + engine_set_force_recompute(true); + poll_immediate_wake(); + } else { + VLOG_DBG("engine did not run, and it was not needed" + " either: br_int %p, chassis %p", + br_int, chassis); + } + } else if (engine_aborted()) { VLOG_DBG("engine was aborted, force recompute next time: " "br_int %p, chassis %p", br_int, chassis); engine_set_force_recompute(true); poll_immediate_wake(); - } else if (!engine_has_run(&en_flow_output, engine_run_id)) { - VLOG_DBG("engine did not run, and it was not needed" - " either: br_int %p, chassis %p", - br_int, chassis); } else { engine_set_force_recompute(false); } @@ -2098,8 +2104,7 @@ main(int argc, char *argv[]) } } else { VLOG_DBG("Pending_pkt conn but br_int %p or chassis " - "%p not ready. run-id: %"PRIu64, br_int, - chassis, engine_run_id); + "%p not ready.", br_int, chassis); unixctl_command_reply_error(pending_pkt.conn, "ovn-controller not ready."); } @@ -2148,7 +2153,7 @@ main(int argc, char *argv[]) } engine_set_context(NULL); - engine_cleanup(&en_flow_output); + engine_cleanup(); /* It's time to exit. Clean up the databases if we are not restarting */ if (!restart) { diff --git a/ovn/lib/inc-proc-eng.c b/ovn/lib/inc-proc-eng.c index ff07ad9..59b5cac 100644 --- a/ovn/lib/inc-proc-eng.c +++ b/ovn/lib/inc-proc-eng.c @@ -31,21 +31,25 @@ VLOG_DEFINE_THIS_MODULE(inc_proc_eng); static bool engine_force_recompute = false; -static bool engine_abort_recompute = false; +static bool engine_run_aborted = false; static const struct engine_context *engine_context; +static struct engine_node **engine_nodes; +static size_t engine_n_nodes; + +static const char *engine_node_state_name[EN_STATE_MAX] = { + [EN_STALE] = "Stale", + [EN_UPDATED] = "Updated", + [EN_VALID] = "Valid", + [EN_ABORTED] = "Aborted", +}; + void engine_set_force_recompute(bool val) { engine_force_recompute = val; } -void -engine_set_abort_recompute(bool val) -{ - engine_abort_recompute = val; -} - const struct engine_context * engine_get_context(void) { @@ -58,26 +62,69 @@ engine_set_context(const struct engine_context *ctx) engine_context = ctx; } -void -engine_init(struct engine_node *node) +/* Builds the topologically sorted 'sorted_nodes' array starting from + * 'node'. + */ +static struct engine_node ** +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes, + size_t *n_count, size_t *n_size) { + /* It's not so efficient to walk the array of already sorted nodes but + * we know that sorting is done only once at startup so it's ok for now. + */ + for (size_t i = 0; i < *n_count; i++) { + if (sorted_nodes[i] == node) { + return sorted_nodes; + } + } + for (size_t i = 0; i < node->n_inputs; i++) { - engine_init(node->inputs[i].node); + sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes, + n_count, n_size); } - if (node->init) { - node->init(node); + if (*n_count == *n_size) { + sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes); } + sorted_nodes[(*n_count)] = node; + (*n_count)++; + return sorted_nodes; +} + +/* Return the array of topologically sorted nodes when starting from + * 'node'. Stores the number of nodes in 'n_count'. + */ +static struct engine_node ** +engine_get_nodes(struct engine_node *node, size_t *n_count) +{ + size_t n_size = 0; + + *n_count = 0; + return engine_topo_sort(node, NULL, n_count, &n_size); } void -engine_cleanup(struct engine_node *node) +engine_init(struct engine_node *node) { - for (size_t i = 0; i < node->n_inputs; i++) { - engine_cleanup(node->inputs[i].node); + engine_nodes = engine_get_nodes(node, &engine_n_nodes); + + for (size_t i = 0; i < engine_n_nodes; i++) { + if (engine_nodes[i]->init) { + engine_nodes[i]->init(engine_nodes[i]); + } } - if (node->cleanup) { - node->cleanup(node); +} + +void +engine_cleanup(void) +{ + for (size_t i = 0; i < engine_n_nodes; i++) { + if (engine_nodes[i]->cleanup) { + engine_nodes[i]->cleanup(engine_nodes[i]); + } } + free(engine_nodes); + engine_nodes = NULL; + engine_n_nodes = 0; } struct engine_node * @@ -128,16 +175,59 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, ed->n_indexes ++; } +void +engine_set_node_state_at(struct engine_node *node, + enum engine_node_state state, + const char *where) +{ + if (node->state == state) { + return; + } + + VLOG_DBG("%s: node: %s, old_state %s, new_state %s", + where, node->name, + engine_node_state_name[node->state], + engine_node_state_name[state]); + + node->state = state; +} + +bool +engine_node_changed(struct engine_node *node) +{ + return node->state == EN_UPDATED; +} + bool -engine_has_run(struct engine_node *node, uint64_t run_id) +engine_has_run(void) { - return node->run_id == run_id; + for (size_t i = 0; i < engine_n_nodes; i++) { + if (engine_nodes[i]->state != EN_STALE) { + return true; + } + } + return false; +} + +bool +engine_aborted(void) +{ + return engine_run_aborted; +} + +void +engine_init_run(void) +{ + VLOG_DBG("Initializing new run"); + for (size_t i = 0; i < engine_n_nodes; i++) { + engine_set_node_state(engine_nodes[i], EN_STALE); + } } /* Do a full recompute (or at least try). If we're not allowed then * mark the node as "aborted". */ -static bool +static void engine_recompute(struct engine_node *node, bool forced, bool allowed) { VLOG_DBG("node: %s, recompute (%s)", node->name, @@ -145,12 +235,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed) if (!allowed) { VLOG_DBG("node: %s, recompute aborted", node->name); - return false; + engine_set_node_state(node, EN_ABORTED); + return; } + /* Run the node handler which might change state. */ node->run(node); - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; } /* Return true if the node could be computed, false otherwise. */ @@ -159,7 +249,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed) { for (size_t i = 0; i < node->n_inputs; i++) { /* If the input node data changed call its change handler. */ - if (node->inputs[i].node->changed) { + if (node->inputs[i].node->state == EN_UPDATED) { VLOG_DBG("node: %s, handle change for input %s", node->name, node->inputs[i].node->name); @@ -170,55 +260,40 @@ engine_compute(struct engine_node *node, bool recompute_allowed) VLOG_DBG("node: %s, can't handle change for input %s, " "fall back to recompute", node->name, node->inputs[i].node->name); - return engine_recompute(node, false, recompute_allowed); + engine_recompute(node, false, recompute_allowed); + return (node->state != EN_ABORTED); } } } - return true; } -bool engine_run(struct engine_node *node, uint64_t run_id) +static void +engine_run_node(struct engine_node *node, bool recompute_allowed) { - if (node->run_id == run_id) { - /* The node was already updated in this run (could be input for - * multiple other nodes). Stop processing. - */ - return true; - } - - /* Initialize the node for this run. */ - node->run_id = run_id; - node->changed = false; - if (!node->n_inputs) { + /* Run the node handler which might change state. */ node->run(node); - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; - } - - for (size_t i = 0; i < node->n_inputs; i++) { - if (!engine_run(node->inputs[i].node, run_id)) { - return false; - } + return; } - bool need_compute = false; - if (engine_force_recompute) { - return engine_recompute(node, true, !engine_abort_recompute); + engine_recompute(node, true, recompute_allowed); + return; } /* If any of the inputs updated data but there is no change_handler, then * recompute the current node too. */ + bool need_compute = false; for (size_t i = 0; i < node->n_inputs; i++) { - if (node->inputs[i].node->changed) { + if (node->inputs[i].node->state == EN_UPDATED) { need_compute = true; /* Trigger a recompute if we don't have a change handler. */ if (!node->inputs[i].change_handler) { - return engine_recompute(node, false, !engine_abort_recompute); + engine_recompute(node, false, recompute_allowed); + return; } } } @@ -227,33 +302,55 @@ bool engine_run(struct engine_node *node, uint64_t run_id) /* If we couldn't compute the node we either aborted or triggered * a full recompute. In any case, stop processing. */ - return engine_compute(node, !engine_abort_recompute); + if (!engine_compute(node, recompute_allowed)) { + return; + } } - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; + /* If we reached this point, either the node was updated or its state is + * still valid. + */ + if (!engine_node_changed(node)) { + engine_set_node_state(node, EN_VALID); + } } -bool -engine_need_run(struct engine_node *node, uint64_t run_id) +void +engine_run(bool recompute_allowed) { - size_t i; - - if (node->run_id == run_id) { - return false; + /* If the last run was aborted skip the incremental run because a + * recompute is needed first. + */ + if (!recompute_allowed && engine_run_aborted) { + return; } - if (!node->n_inputs) { - node->run(node); - VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); - return node->changed; + engine_run_aborted = false; + for (size_t i = 0; i < engine_n_nodes; i++) { + engine_run_node(engine_nodes[i], recompute_allowed); + + if (engine_nodes[i]->state == EN_ABORTED) { + engine_run_aborted = true; + return; + } } +} - for (i = 0; i < node->n_inputs; i++) { - if (engine_need_run(node->inputs[i].node, run_id)) { +bool +engine_need_run(void) +{ + for (size_t i = 0; i < engine_n_nodes; i++) { + /* Check only leaf nodes for updates. */ + if (engine_nodes[i]->n_inputs) { + continue; + } + + engine_nodes[i]->run(engine_nodes[i]); + VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name, + engine_node_state_name[engine_nodes[i]->state]); + if (engine_nodes[i]->state == EN_UPDATED) { return true; } } - return false; } diff --git a/ovn/lib/inc-proc-eng.h b/ovn/lib/inc-proc-eng.h index a4151ca..2f3ff62 100644 --- a/ovn/lib/inc-proc-eng.h +++ b/ovn/lib/inc-proc-eng.h @@ -82,10 +82,21 @@ struct engine_node_input { bool (*change_handler)(struct engine_node *node); }; -struct engine_node { - /* A unique id to distinguish each iteration of the engine_run(). */ - uint64_t run_id; +enum engine_node_state { + EN_STALE, /* Data in the node is not up to date with the DB. */ + EN_UPDATED, /* Data in the node is valid but was updated during the + * last run. + */ + EN_VALID, /* Data in the node is valid and didn't change during the + * last run. + */ + EN_ABORTED, /* During the last run, processing was aborted for + * this node. + */ + EN_STATE_MAX, +}; +struct engine_node { /* A unique name for each node. */ char *name; @@ -102,8 +113,8 @@ struct engine_node { * node. */ void *data; - /* Whether the data changed in the last engine run. */ - bool changed; + /* State of the node after the last engine run. */ + enum engine_node_state state; /* Method to initialize data. It may be NULL. */ void (*init)(struct engine_node *); @@ -116,23 +127,29 @@ struct engine_node { void (*run)(struct engine_node *); }; -/* Initialize the data for the engine nodes recursively. It calls each node's +/* Initialize the data for the engine nodes. It calls each node's * init() method if not NULL. It should be called before the main loop. */ -void engine_init(struct engine_node *); +void engine_init(struct engine_node *node); -/* Execute the processing recursively, which should be called in the main - * loop. Returns true if the execution is compelte, false if it is aborted, - * which could happen when engine_abort_recompute is set. */ -bool engine_run(struct engine_node *, uint64_t run_id); +/* Initialize the engine nodes for a new run. It should be called in the + * main processing loop before every potential engine_run(). + */ +void engine_init_run(void); + +/* Execute the processing, which should be called in the main loop. + * Updates the engine node's states accordingly. If 'recompute_allowed' is + * false and a recompute is required by the current engine run then the engine + * aborts. + */ +void engine_run(bool recompute_allowed); -/* Clean up the data for the engine nodes recursively. It calls each node's +/* Clean up the data for the engine nodes. It calls each node's * cleanup() method if not NULL. It should be called before the program * terminates. */ -void engine_cleanup(struct engine_node *); +void engine_cleanup(void); /* Check if engine needs to run but didn't. */ -bool -engine_need_run(struct engine_node *, uint64_t run_id); +bool engine_need_run(void); /* Get the input node with for */ struct engine_node * engine_get_input(const char *input_name, @@ -151,16 +168,26 @@ void engine_add_input(struct engine_node *node, struct engine_node *input, * iteration, and the change can't be tracked across iterations */ void engine_set_force_recompute(bool val); -/* Set the flag to cause engine execution to be aborted when there - * is any recompute to be triggered in any node. */ -void engine_set_abort_recompute(bool val); - const struct engine_context * engine_get_context(void); void engine_set_context(const struct engine_context *); -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ -bool engine_has_run(struct engine_node *node, uint64_t run_id); +void engine_set_node_state_at(struct engine_node *node, + enum engine_node_state state, + const char *where); + +/* Return true if during the last iteration the node's data was updated. */ +bool engine_node_changed(struct engine_node *node); + +/* Return true if the engine has run in the last iteration. */ +bool engine_has_run(void); + +/* Returns true if during the last engine run we had to abort processing. */ +bool engine_aborted(void); + +/* Set the state of the node and log changes. */ +#define engine_set_node_state(node, state) \ + engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR) struct ed_ovsdb_index { const char *name; @@ -187,6 +214,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name, struct engine_node en_##NAME = { \ .name = NAME_STR, \ .data = &ed_##NAME, \ + .state = EN_STALE, \ .init = en_##NAME##_init, \ .run = en_##NAME##_run, \ .cleanup = en_##NAME##_cleanup, \ @@ -201,10 +229,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \ const struct DB_NAME##rec_##TBL_NAME##_table *table = \ EN_OVSDB_GET(node); \ if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \ - node->changed = true; \ + engine_set_node_state(node, EN_UPDATED); \ return; \ } \ - node->changed = false; \ + engine_set_node_state(node, EN_VALID); \ } \ static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \ = NULL; \ -- 1.8.3.1