# # # patch "enumerator.cc" # from [7a30cef784137174aa75ccdbe6a412b81e58cae6] # to [db11df61b79edbfa3d59b65cdeee1c3c0a102f9a] # # patch "enumerator.hh" # from [b7e467ace9dd121e55ceeb8ef787ba4c03c4b735] # to [566bc3989489ca714e5c47a71c1f07f094340923] # # patch "netcmd.cc" # from [05d3d94130753605cc408a7369ee9396dd12bf13] # to [354be8e5391634f749c53b7b33e693235b254daa] # # patch "netcmd.hh" # from [889ca3e00418abd466236fb58c89bc6e26c160a5] # to [88a2a49cc8638b5f98366be83cb53bce8ef21e99] # # patch "netsync.cc" # from [b3e1fe14931d87e44a7542b52dba4445a895377e] # to [c83bebf13c42335eab22a6dab1b94c3cbd6c0900] # # patch "refiner.cc" # from [e584a3e6eeeed3c9b009364df62720b9c34ef07c] # to [3fe6d5d6fc1573591fe2d8b080920a6d55747f7d] # ============================================================ --- enumerator.cc 7a30cef784137174aa75ccdbe6a412b81e58cae6 +++ enumerator.cc db11df61b79edbfa3d59b65cdeee1c3c0a102f9a @@ -34,34 +34,44 @@ : cb(cb), app(app) { revision_id root; - set initial; - app.db.get_revision_children(root, initial); - for (set::const_iterator i = initial.begin(); - i != initial.end(); ++i) - revs.push_back(*i); + revs.push_back(root); } +bool +revision_enumerator::done() +{ + return revs.empty() && items.empty(); +} + void revision_enumerator::step() { - // It's ok if this method simply does nothing. - - if (items.empty()) + P(F("stepping...\n")); + while (!done()) { - if (!revs.empty()) + if (items.empty() && !revs.empty()) { revision_id r = revs.front(); revs.pop_front(); - set children; - app.db.get_revision_children(r, children); - for (set::const_iterator i = children.begin(); - i != children.end(); ++i) - revs.push_back(*i); - terminal_nodes.erase(r); - + P(F("step examining rev '%d'\n") % r); + + if (terminal_nodes.find(r) == terminal_nodes.end()) + { + set children; + app.db.get_revision_children(r, children); + P(F("step expanding %d children of rev '%d'\n") % children.size() % r); + for (set::const_iterator i = children.begin(); + i != children.end(); ++i) + revs.push_back(*i); + } + + if (null_id(r)) + continue; + if (cb.process_this_rev(r)) { + P(F("step expanding contents of rev '%d'\n") % r); revision_set rs; app.db.get_revision(r, rs); @@ -114,32 +124,36 @@ } } } - } - else - { - enumerator_item i = items.front(); - items.pop_front(); - I(!null_id(i.ident_a)); - switch (i.tag) + if (!items.empty()) { - case enumerator_item::fdata: - cb.note_file_data(file_id(i.ident_a)); - break; + P(F("step extracting item\n")); - case enumerator_item::fdelta: - I(!null_id(i.ident_b)); - cb.note_file_delta(file_id(i.ident_a), - file_id(i.ident_b)); + enumerator_item i = items.front(); + items.pop_front(); + I(!null_id(i.ident_a)); + + switch (i.tag) + { + case enumerator_item::fdata: + cb.note_file_data(file_id(i.ident_a)); + break; + + case enumerator_item::fdelta: + I(!null_id(i.ident_b)); + cb.note_file_delta(file_id(i.ident_a), + file_id(i.ident_b)); + break; + + case enumerator_item::rev: + cb.note_rev(revision_id(i.ident_a)); + break; + + case enumerator_item::cert: + cb.note_cert(i.ident_a); + break; + } break; - - case enumerator_item::rev: - cb.note_rev(revision_id(i.ident_a)); - break; - - case enumerator_item::cert: - cb.note_cert(i.ident_a); - break; } } } ============================================================ --- enumerator.hh b7e467ace9dd121e55ceeb8ef787ba4c03c4b735 +++ enumerator.hh 566bc3989489ca714e5c47a71c1f07f094340923 @@ -56,6 +56,7 @@ revision_enumerator(enumerator_callbacks & cb, app_state & app); void step(); + bool done(); }; #endif // __ENUMERATOR_H__ ============================================================ --- netcmd.cc 05d3d94130753605cc408a7369ee9396dd12bf13 +++ netcmd.cc 354be8e5391634f749c53b7b33e693235b254daa @@ -43,7 +43,7 @@ } netcmd::netcmd() : version(constants::netcmd_current_protocol_version), - cmd_code(bye_cmd) + cmd_code(error_cmd) {} size_t netcmd::encoded_size() @@ -101,9 +101,10 @@ case static_cast(anonymous_cmd): case static_cast(auth_cmd): case static_cast(error_cmd): - case static_cast(bye_cmd): case static_cast(confirm_cmd): case static_cast(refine_cmd): + case static_cast(note_item_cmd): + case static_cast(note_shared_subtree_cmd): case static_cast(done_cmd): case static_cast(data_cmd): case static_cast(delta_cmd): @@ -637,15 +638,6 @@ BOOST_CHECK(in_errmsg == out_errmsg); L(boost::format("errmsg_cmd test done, buffer was %d bytes\n") % buf.size()); } - - // bye_cmd - { - L(boost::format("checking i/o round trip on bye_cmd\n")); - netcmd out_cmd, in_cmd; - string buf; - do_netcmd_roundtrip(out_cmd, in_cmd, buf); - L(boost::format("bye_cmd test done, buffer was %d bytes\n") % buf.size()); - } // hello_cmd { ============================================================ --- netcmd.hh 889ca3e00418abd466236fb58c89bc6e26c160a5 +++ netcmd.hh 88a2a49cc8638b5f98366be83cb53bce8ef21e99 @@ -27,7 +27,6 @@ { // general commands error_cmd = 0, - bye_cmd = 1, // authentication commands hello_cmd = 2, @@ -92,9 +91,6 @@ void read_error_cmd(std::string & errmsg) const; void write_error_cmd(std::string const & errmsg); -//void read_bye_cmd() {} - void write_bye_cmd() {cmd_code = bye_cmd;} - void read_hello_cmd(rsa_keypair_id & server_keyname, rsa_pub_key & server_key, id & nonce) const; ============================================================ --- netsync.cc b3e1fe14931d87e44a7542b52dba4445a895377e +++ netsync.cc c83bebf13c42335eab22a6dab1b94c3cbd6c0900 @@ -296,9 +296,6 @@ vector written_certs; id saved_nonce; - bool received_goodbye; - bool sent_goodbye; - packet_db_valve dbw; bool encountered_error; @@ -342,8 +339,10 @@ void setup_client_tickers(); bool done_all_refinements(); - bool got_all_data(); - void maybe_say_goodbye(); + bool queued_all_items(); + bool received_all_items(); + bool finished_exchange_ok(); + void maybe_step(); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); @@ -392,7 +391,6 @@ delta const & del); // Incoming dispatch-called methods. - bool process_bye_cmd(); bool process_error_cmd(string const & errmsg); bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, @@ -472,8 +470,6 @@ revision_out_ticker(NULL), revision_checked_ticker(NULL), saved_nonce(""), - received_goodbye(false), - sent_goodbye(false), dbw(app, true), encountered_error(false), epoch_refiner(epoch_item, *this), @@ -704,7 +700,7 @@ bool -session::got_all_data() +session::received_all_items() { return rev_refiner.items_to_receive.empty() && cert_refiner.items_to_receive.empty() @@ -712,7 +708,26 @@ && epoch_refiner.items_to_receive.empty(); } +bool +session::finished_exchange_ok() +{ + return done_all_refinements() + && received_all_items() + && queued_all_items() + && rev_enumerator.done() + && outbuf.empty(); +} +bool +session::queued_all_items() +{ + return rev_refiner.items_to_send.empty() + && cert_refiner.items_to_send.empty() + && key_refiner.items_to_send.empty() + && epoch_refiner.items_to_send.empty(); +} + + void session::maybe_note_epochs_finished() { @@ -898,23 +913,12 @@ // senders void -session::queue_bye_cmd() -{ - L(F("queueing 'bye' command\n")); - netcmd cmd; - cmd.write_bye_cmd(); - write_netcmd_and_try_flush(cmd); - this->sent_goodbye = true; -} - -void session::queue_error_cmd(string const & errmsg) { L(F("queueing 'error' command\n")); netcmd cmd; cmd.write_error_cmd(errmsg); write_netcmd_and_try_flush(cmd); - this->sent_goodbye = true; } void @@ -923,7 +927,7 @@ { string typestr; netcmd_item_type_to_string(type, typestr); - L(F("queueing 'done' command for %s level %s\n") % typestr % level); + P(F("queueing 'done' command for %s level %s\n") % typestr % level); netcmd cmd; cmd.write_done_cmd(level, type); write_netcmd_and_try_flush(cmd); @@ -992,7 +996,7 @@ hexenc hpref; node.get_hex_prefix(hpref); netcmd_item_type_to_string(node.type, typestr); - L(F("queueing request for refinement of %s node '%s', level %d\n") + P(F("queueing request for refinement of %s node '%s', level %d\n") % typestr % hpref % static_cast(node.level)); netcmd cmd; cmd.write_refine_cmd(node); @@ -1006,7 +1010,7 @@ hexenc hitem; encode_hexenc(item, hitem); netcmd_item_type_to_string(ty, typestr); - L(F("queueing note about %s item '%s'") % typestr % hitem); + P(F("queueing note about %s item '%s'") % typestr % hitem); netcmd cmd; cmd.write_note_item_cmd(ty, item); write_netcmd_and_try_flush(cmd); @@ -1043,7 +1047,7 @@ return; } - L(F("queueing %d bytes of data for %s item '%s'\n") + P(F("queueing %d bytes of data for %s item '%s'\n") % dat.size() % typestr % hid); netcmd cmd; @@ -1082,7 +1086,7 @@ return; } - L(F("queueing %s delta '%s' -> '%s'\n") + P(F("queueing %s delta '%s' -> '%s'\n") % typestr % base_hid % ident_hid); netcmd cmd; cmd.write_delta_cmd(type, base, ident, del); @@ -1094,14 +1098,6 @@ // processors bool -session::process_bye_cmd() -{ - L(F("received 'bye' netcmd\n")); - this->received_goodbye = true; - return true; -} - -bool session::process_error_cmd(string const & errmsg) { throw bad_decode(F("received network error: %s") % errmsg); @@ -1484,6 +1480,11 @@ bool session::process_refine_cmd(merkle_node const & node) { + string typestr; + netcmd_item_type_to_string(node.type, typestr); + P(F("processing refine cmd for %s node at level %d\n") + % typestr % node.level); + switch (node.type) { case file_item: @@ -1909,10 +1910,6 @@ switch (cmd.get_cmd_code()) { - - case bye_cmd: - return process_bye_cmd(); - break; case error_cmd: { @@ -2019,6 +2016,7 @@ size_t level; netcmd_item_type type; cmd.read_done_cmd(level, type); + return process_done_cmd(level, type); } break; @@ -2094,11 +2092,15 @@ } void -session::maybe_say_goodbye() +session::maybe_step() { - if (done_all_refinements() && - got_all_data() && !sent_goodbye) - queue_bye_cmd(); + if (done_all_refinements() + && !rev_enumerator.done() + && outbuf_size < constants::bufsz * 10) + { + P(F("stepping enumerator\n")); + rev_enumerator.step(); + } } bool @@ -2133,7 +2135,10 @@ if (inbuf.size() >= constants::netcmd_maxsz) W(F("input buffer for peer %s is overfull after netcmd dispatch\n") % peer_id); guard.commit(); - maybe_say_goodbye(); + + if (finished_exchange_ok()) + return true; + if (!ret) P(F("failed to process '%s' packet") % cmd.get_cmd_code()); return ret; @@ -2212,11 +2217,7 @@ } else { - if (sess.sent_goodbye) - P(F("read from fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id); - else - E(false, F("read from fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id); - return; + E(false, F("read from fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id); } } @@ -2224,10 +2225,7 @@ { if (! sess.write_some()) { - if (sess.sent_goodbye) - P(F("write on fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id); - else - E(false, F("write on fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id); + E(false, F("write on fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id); return; } } @@ -2246,9 +2244,10 @@ % sess.peer_id); return; } + sess.maybe_step(); } - if (sess.sent_goodbye && sess.outbuf.empty() && sess.received_goodbye) + if (sess.finished_exchange_ok()) { P(F("successful exchange with %s\n") % sess.peer_id); @@ -2367,6 +2366,8 @@ sessions.erase(fd); live_p = false; } + else + sess->maybe_step(); } static void @@ -2390,6 +2391,7 @@ % fd % sess->peer_id); sessions.erase(j); } + sess->maybe_step(); } } } @@ -2399,7 +2401,7 @@ unsigned long timeout_seconds) { // kill any clients which haven't done any i/o inside the timeout period - // or who have said goodbye and flushed their output buffers + // or who have exchanged all items and flushed their output buffers set dead_clients; time_t now = ::time(NULL); for (map >::const_iterator i = sessions.begin(); @@ -2412,9 +2414,9 @@ % i->first % i->second->peer_id); dead_clients.insert(i->first); } - if (i->second->sent_goodbye && i->second->outbuf.empty() && i->second->received_goodbye) + if (i->second->finished_exchange_ok()) { - P(F("fd %d (peer %s) exchanged goodbyes and flushed output, disconnecting\n") + P(F("fd %d (peer %s) exchanged all items and flushed output, disconnecting\n") % i->first % i->second->peer_id); dead_clients.insert(i->first); } @@ -2533,21 +2535,22 @@ void -insert_with_parents(revision_id rev, refiner & ref, +insert_with_parents(revision_id rev, + refiner & ref, + set & revs, app_state & app, ticker & revisions_ticker) { deque work; - set seen; work.push_back(rev); while (!work.empty()) { revision_id rid = work.front(); work.pop_front(); - if (!null_id(rid) && seen.find(rid) == seen.end()) + if (!null_id(rid) && revs.find(rid) == revs.end()) { - seen.insert(rid); + revs.insert(rid); ++revisions_ticker; id rev_item; decode_hexenc(rid.inner(), rev_item); @@ -2599,7 +2602,7 @@ j != certs.end(); j++) { insert_with_parents(revision_id(j->inner().ident), - rev_refiner, app, revisions_ticker); + rev_refiner, revision_ids, app, revisions_ticker); // branch certs go in here, others later on hexenc tmp; id item; ============================================================ --- refiner.cc e584a3e6eeeed3c9b009364df62720b9c34ef07c +++ refiner.cc 3fe6d5d6fc1573591fe2d8b080920a6d55747f7d @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -17,6 +18,7 @@ using std::string; using std::set; +using std::make_pair; // The previous incarnation of this algorithm had code related to sending // decisions (and skippable transmissions) mixed in with the refinement @@ -31,6 +33,7 @@ void refiner::note_local_item(id const & item) { + local_items.insert(item); insert_into_merkle_tree(table, type, item, 0); } @@ -54,6 +57,7 @@ our_fake_node.level = their_node.level + 1; our_fake_node.type = their_node.type; cb.queue_refine_cmd(our_fake_node); + exchanged_data_since_last_done_cmd = true; } @@ -77,6 +81,7 @@ our_fake_subtree.set_raw_slot(subslot, our_slotval); our_fake_subtree.set_slot_state(subslot, our_node.get_slot_state(slot)); cb.queue_refine_cmd(our_fake_subtree); + exchanged_data_since_last_done_cmd = true; } @@ -88,6 +93,7 @@ string tmp; our_node.get_raw_slot(slot, slotval); cb.queue_note_item_cmd(type, slotval); + exchanged_data_since_last_done_cmd = true; } @@ -121,6 +127,13 @@ std::set_difference(peer_items.begin(), peer_items.end(), local_items.begin(), local_items.end(), std::inserter(items_to_receive, items_to_receive.begin())); + string typestr; + netcmd_item_type_to_string(type, typestr); + + P(F("pid %d determined %d %s items to send\n") + % getpid() % items_to_send.size() % typestr); + P(F("pid %d determined %d %s items to receive\n") + % getpid() % items_to_receive.size() % typestr); } @@ -133,6 +146,7 @@ merkle_ptr our_subtree; load_merkle_node(our_node.level + 1, subprefix, our_subtree); cb.queue_refine_cmd(*our_subtree); + exchanged_data_since_last_done_cmd = true; } void @@ -184,6 +198,8 @@ } else cb.queue_refine_cmd(*our_subtree); + + exchanged_data_since_last_done_cmd = true; } @@ -191,7 +207,11 @@ : type(type), cb(cb), exchanged_data_since_last_done_cmd(false), finished_refinement(false) -{} +{ + merkle_ptr root = merkle_ptr(new merkle_node()); + root->type = type; + table.insert(make_pair(make_pair(prefix(""), 0), root)); +} void refiner::note_item_in_peer(id const & item) @@ -243,12 +263,24 @@ void refiner::process_done_command(size_t level) { + string typestr; + netcmd_item_type_to_string(type, typestr); + + P(F("pid %d processing 'done' command on %s level %d\n") + % getpid() % typestr % level); + if (!exchanged_data_since_last_done_cmd || level >= 0xff) { // Echo 'done' if we're shutting down if (!finished_refinement) - cb.queue_done_cmd(level+1, type); + { + P(F("pid %d processing 'done' command => echoing shut down of %s refinement\n") + % getpid() % typestr); + cb.queue_done_cmd(level+1, type); + } + P(F("pid %d processing 'done' command => shut down %s refinement\n") + % getpid() % typestr); // Mark ourselves shut down finished_refinement = true; @@ -260,6 +292,8 @@ && !finished_refinement) { // Echo 'done', we're still active. + P(F("pid %d processing 'done' command => continuing to %s level %d\n") + % getpid() % typestr % (level+1)); cb.queue_done_cmd(level+1, type); }