# # # patch "merkle_tree.cc" # from [9dac81fddfc596a6a6e55c205a69cd344eccba83] # to [7892d63a6614ece8b94d54e8674688eeb9dea008] # # patch "merkle_tree.hh" # from [cf513c05a9c90f5dbc1b8664175209230d03a36f] # to [92d813eec1d4aedec4a8f4aba15cb333372032bb] # # patch "netcmd.cc" # from [43a077525d23c4841f5bdaa6afb33150c543ca69] # to [226506a00486dcd7a64e1c912eb37d8d49aa92d2] # # patch "netcmd.hh" # from [2d51f4bf2ea58f075eef6536428a46b2023a9e3e] # to [22664387cea0916b87288113ec5b559c2c49eda5] # # patch "netsync.cc" # from [3983fb90d601e87ef41f34256f8547980344aee6] # to [d3eb91d64064c55ae2854a6c971cb011eb3ae59f] # # patch "refiner.cc" # from [826ecae4d4133f4622670a807d5d1055e35c1979] # to [6e80c65a1f4828f79ebe8aee7244137feb5959e0] # # patch "refiner.hh" # from [cdb2d13b9054ea56707690d422ee4349774df28c] # to [9f724877248fc986aed3c2de8fa39d5a27f934d1] # ============================================================ --- merkle_tree.cc 9dac81fddfc596a6a6e55c205a69cd344eccba83 +++ merkle_tree.cc 7892d63a6614ece8b94d54e8674688eeb9dea008 @@ -258,16 +258,17 @@ } string hash = raw_sha1(oss.str()); I(hash.size() == constants::merkle_hash_length_in_bytes); - outbuf = hash + oss.str(); + outbuf.append(hash); + outbuf.append(oss.str()); } void -read_node(string const & inbuf, merkle_node & out) +read_node(string const & inbuf, size_t & pos, merkle_node & out) { - size_t pos = 0; string hash = extract_substring(inbuf, pos, constants::merkle_hash_length_in_bytes, "node hash"); + size_t begin_pos = pos; out.type = static_cast(extract_datum_lsb(inbuf, pos, "node type")); out.level = extract_datum_uleb128(inbuf, pos, "node level"); @@ -303,9 +304,8 @@ out.set_raw_slot(slot, slot_val); } } - - assert_end_of_buffer(inbuf, pos, "node"); - string checkhash = raw_sha1(inbuf.substr(constants::merkle_hash_length_in_bytes)); + + string checkhash = raw_sha1(inbuf.substr(begin_pos, pos - begin_pos)); out.check_invariants(); if (hash != checkhash) throw bad_decode(F("mismatched node hash value %s, expected %s") ============================================================ --- merkle_tree.hh cf513c05a9c90f5dbc1b8664175209230d03a36f +++ merkle_tree.hh 92d813eec1d4aedec4a8f4aba15cb333372032bb @@ -88,7 +88,7 @@ size_t prefix_length_in_bits(size_t level); size_t prefix_length_in_bytes(size_t level); void write_node(merkle_node const & in, std::string & outbuf); -void read_node(std::string const & inbuf, merkle_node & out); +void read_node(std::string const & inbuf, size_t & pos, merkle_node & out); std::string raw_sha1(std::string const & in); ============================================================ --- netcmd.cc 43a077525d23c4841f5bdaa6afb33150c543ca69 +++ netcmd.cc 226506a00486dcd7a64e1c912eb37d8d49aa92d2 @@ -104,8 +104,6 @@ case static_cast(error_cmd): case static_cast(confirm_cmd): case static_cast(refine_cmd): - case static_cast(note_item_cmd): - case static_cast(note_shared_subtree_cmd): case static_cast(done_cmd): case static_cast(data_cmd): case static_cast(delta_cmd): @@ -354,92 +352,46 @@ } void -netcmd::read_refine_cmd(merkle_node & node) const +netcmd::read_refine_cmd(refinement_type & ty, merkle_node & node) const { - // syntax is: - read_node(payload, node); + // syntax is: + size_t pos = 0; + ty = static_cast + (extract_datum_lsb + (payload, pos, + "refine netcmd, refinement type")); + read_node(payload, pos, node); + assert_end_of_buffer(payload, pos, "refine cmd"); } void -netcmd::write_refine_cmd(merkle_node const & node) +netcmd::write_refine_cmd(refinement_type ty, merkle_node const & node) { cmd_code = refine_cmd; payload.clear(); + payload += static_cast(ty); write_node(node, payload); } void -netcmd::read_note_item_cmd(netcmd_item_type & type, id & item) const +netcmd::read_done_cmd(netcmd_item_type & type, size_t & n_items) const { size_t pos = 0; - // syntax is: - type = read_netcmd_item_type(payload, pos, "note_item netcmd, item type"); - item = id(extract_substring(payload, pos, - constants::merkle_hash_length_in_bytes, - "note_item netcmd, item identifier")); - assert_end_of_buffer(payload, pos, "note_item netcmd payload"); -} - -void -netcmd::write_note_item_cmd(netcmd_item_type type, id const & item) -{ - I(item().size() == constants::merkle_hash_length_in_bytes); - cmd_code = note_item_cmd; - payload.clear(); - payload += static_cast(type); - payload += item(); -} - -void -netcmd::read_note_shared_subtree_cmd(netcmd_item_type & type, - prefix & pref, - size_t & level) const -{ - size_t pos = 0; - // syntax is: - type = read_netcmd_item_type(payload, pos, - "note_shared_subtree netcmd, item type"); - string tmp; - extract_variable_length_string(payload, tmp, pos, - "note_shared_subtree netcmd, tree prefix"); - pref = tmp; - level = extract_datum_uleb128(payload, pos, - "note_shared_subtree netcmd, level number"); - assert_end_of_buffer(payload, pos, "note_shared_subtree netcmd payload"); -} - -void -netcmd::write_note_shared_subtree_cmd(netcmd_item_type type, - prefix const & pref, - size_t level) -{ - payload.clear(); - cmd_code = note_shared_subtree_cmd; - payload += static_cast(type); - insert_variable_length_string(pref(), payload); - insert_datum_uleb128(level, payload); -} - - -void -netcmd::read_done_cmd(size_t & level, netcmd_item_type & type) const -{ - size_t pos = 0; - // syntax is: - level = extract_datum_uleb128(payload, pos, - "done netcmd, level number"); + // syntax is: type = read_netcmd_item_type(payload, pos, "done netcmd, item type"); + n_items = extract_datum_uleb128(payload, pos, + "done netcmd, item-to-send count"); assert_end_of_buffer(payload, pos, "done netcmd payload"); } void -netcmd::write_done_cmd(size_t level, - netcmd_item_type type) +netcmd::write_done_cmd(netcmd_item_type type, + size_t n_items) { cmd_code = done_cmd; payload.clear(); - insert_datum_uleb128(level, payload); payload += static_cast(type); + insert_datum_uleb128(n_items, payload); } void @@ -759,6 +711,7 @@ L(boost::format("checking i/o round trip on refine_cmd\n")); netcmd out_cmd, in_cmd; string buf; + refinement_type out_ty (refinement_query), in_ty(refinement_response); merkle_node out_node, in_node; out_node.set_raw_slot(0, id(raw_sha1("The police pulled Kris Kringle over"))); @@ -769,60 +722,26 @@ out_node.set_slot_state(3, leaf_state); out_node.set_slot_state(15, subtree_state); - out_cmd.write_refine_cmd(out_node); + out_cmd.write_refine_cmd(out_ty, out_node); do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_refine_cmd(in_node); + in_cmd.read_refine_cmd(in_ty, in_node); + BOOST_CHECK(in_ty == out_ty); BOOST_CHECK(in_node == out_node); L(boost::format("refine_cmd test done, buffer was %d bytes\n") % buf.size()); } - // note_item_cmd - {s - L(boost::format("checking i/o round trip on note_item_cmd\n")); - netcmd out_cmd, in_cmd; - string buf; - netcmd_item_type out_ty = revision_item, in_ty; - id out_id(raw_sha1("gone fishin'")), in_id; - - out_cmd.write_note_item_cmd(out_ty, out_id); - do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_note_item_cmd(in_ty, in_id); - BOOST_CHECK(in_ty == out_ty); - BOOST_CHECK(in_id == out_id); - L(boost::format("note_item_cmd test done, buffer was %d bytes\n") % buf.size()); - } - - // note_shared_subtree_cmd - { - L(boost::format("checking i/o round trip on note_item_cmd\n")); - netcmd out_cmd, in_cmd; - string buf; - netcmd_item_type out_ty = revision_item, in_ty; - prefix out_pref("f00f"), in_pref; - size_t out_lev=4, in_lev=0; - - out_cmd.write_note_shared_subtree_cmd(out_ty, out_pref, out_lev); - do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_note_shared_subtree_cmd(in_ty, in_pref, in_lev); - BOOST_CHECK(in_ty == out_ty); - BOOST_CHECK(in_pref == out_pref); - BOOST_CHECK(in_lev == out_lev); - L(boost::format("note_shared_subtre_cmd test done, buffer was %d bytes\n") - % buf.size()); - } - // done_cmd { L(boost::format("checking i/o round trip on done_cmd\n")); netcmd out_cmd, in_cmd; - size_t out_level(12), in_level; + size_t out_n_items(12), in_n_items(0); netcmd_item_type out_type(key_item), in_type(revision_item); string buf; - out_cmd.write_done_cmd(out_level, out_type); + out_cmd.write_done_cmd(out_type, out_n_items); do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_done_cmd(in_level, in_type); - BOOST_CHECK(in_level == out_level); + in_cmd.read_done_cmd(in_type, in_n_items); + BOOST_CHECK(in_n_items == out_n_items); BOOST_CHECK(in_type == out_type); L(boost::format("done_cmd test done, buffer was %d bytes\n") % buf.size()); } ============================================================ --- netcmd.hh 2d51f4bf2ea58f075eef6536428a46b2023a9e3e +++ netcmd.hh 22664387cea0916b87288113ec5b559c2c49eda5 @@ -23,6 +23,13 @@ } protocol_role; +typedef enum + { + refinement_query = 0, + refinement_response = 1 + } +refinement_type; + typedef enum { // general commands @@ -37,13 +44,11 @@ // refinement commands refine_cmd = 6, - note_item_cmd = 7, - note_shared_subtree_cmd = 8, - done_cmd = 9, + done_cmd = 7, // transmission commands - data_cmd = 10, - delta_cmd = 11, + data_cmd = 8, + delta_cmd = 9, // usher commands // usher_cmd is sent by a server farm (or anyone else who wants to serve @@ -129,22 +134,12 @@ void read_confirm_cmd() const; void write_confirm_cmd(); - void read_refine_cmd(merkle_node & node) const; - void write_refine_cmd(merkle_node const & node); + void read_refine_cmd(refinement_type & ty, merkle_node & node) const; + void write_refine_cmd(refinement_type ty, merkle_node const & node); - void read_note_item_cmd(netcmd_item_type & type, id & item) const; - void write_note_item_cmd(netcmd_item_type type, id const & item); + void read_done_cmd(netcmd_item_type & type, size_t & n_items) const; + void write_done_cmd(netcmd_item_type type, size_t n_items); - void read_note_shared_subtree_cmd(netcmd_item_type & type, - prefix & pref, - size_t & level) const; - void write_note_shared_subtree_cmd(netcmd_item_type type, - prefix const & pref, - size_t level); - - void read_done_cmd(size_t & level, netcmd_item_type & type) const; - void write_done_cmd(size_t level, netcmd_item_type type); - void read_data_cmd(netcmd_item_type & type, id & item, std::string & dat) const; ============================================================ --- netsync.cc 3983fb90d601e87ef41f34256f8547980344aee6 +++ netsync.cc d3eb91d64064c55ae2854a6c971cb011eb3ae59f @@ -67,13 +67,6 @@ // its merkle trie. // -- add some sort of vhost field to the client's first packet, saying who // they expect to talk to -// -- connection teardown is still a bit unpleasant: -// -- subtle misdesign: "goodbye" packets indicate completion of data -// transfer. they do not indicate that data has been written to -// disk. there should be some way to indicate that data has been -// successfully written to disk. See message (and thread) -// on -// monotone-devel. // -- apparently we have a IANA approved port: 4691. I guess we should // switch to using that. @@ -188,26 +181,13 @@ // // Refinement is executed by "refiners"; there is a refiner for each // set of 'items' being exchanged: epochs, keys, certs, and revisions. -// When refinement starts, each party knows only their own set of items; -// when refinement completes, each party has learned of the complete set -// of items in its peer. The self-set and peer-set can then be used to -// calculate the set of items to send during the following transmission -// phase. +// When refinement starts, each party knows only their own set of +// items; when refinement completes, each party has learned of the +// complete set of items it needs to send, and a count of items it's +// expecting to receive. // -// Each refinement phase begins with a tramsission of the root merkle node -// in each refiner. When a refiner receives a node, it compares the -// corresponding node in its tree to the node received. Depending on the -// slot-by-slot comparisons (there are 12 cases, see refiner.cc), the -// refiner will respond with either a sub-node, a note about a leaf, a note -// about a shared subtree, or nothing. +// For more details on the refinement process, see refiner.cc. // -// Detecting the end of refinement is a bit subtle: after sending the -// refinement of the root node, a refiner sends a "done 0" command (queued -// behind all the other refinement traffic). When either peer receives a -// "done N" command it immediately responds with a "done N+1" command. When -// two done commands for a given merkle tree arrive with no interveining -// refinements, the entire merkle tree is considered complete. -// // // Transmission // ------------ @@ -391,7 +371,7 @@ // Outgoing queue-writers. void queue_bye_cmd(u8 phase); void queue_error_cmd(string const & errmsg); - void queue_done_cmd(size_t level, netcmd_item_type type); + void queue_done_cmd(netcmd_item_type type, size_t n_items); void queue_hello_cmd(rsa_keypair_id const & key_name, base64 const & pub_encoded, id const & nonce); @@ -409,11 +389,7 @@ string const & signature, base64 server_key_encoded); void queue_confirm_cmd(); - void queue_refine_cmd(merkle_node const & node); - void queue_note_item_cmd(netcmd_item_type ty, id item); - void queue_note_shared_subtree_cmd(netcmd_item_type ty, - prefix const & pref, - size_t level); + void queue_refine_cmd(refinement_type ty, merkle_node const & node); void queue_data_cmd(netcmd_item_type type, id const & item, string const & dat); @@ -438,13 +414,8 @@ id const & nonce1, string const & signature); bool process_confirm_cmd(string const & signature); - bool process_refine_cmd(merkle_node const & node); - bool process_done_cmd(size_t level, netcmd_item_type type); - bool process_note_item_cmd(netcmd_item_type ty, - id const & item); - bool process_note_shared_subtree_cmd(netcmd_item_type ty, - prefix const & pref, - size_t lev); + bool process_refine_cmd(refinement_type ty, merkle_node const & node); + bool process_done_cmd(netcmd_item_type type, size_t n_items); bool process_data_cmd(netcmd_item_type type, id const & item, string const & dat); @@ -506,10 +477,10 @@ dbw(app, true), protocol_state(working_state), encountered_error(false), - epoch_refiner(epoch_item, *this), - key_refiner(key_item, *this), - cert_refiner(cert_item, *this), - rev_refiner(revision_item, *this), + epoch_refiner(epoch_item, voice, *this), + key_refiner(key_item, voice, *this), + cert_refiner(cert_item, voice, *this), + rev_refiner(revision_item, voice, *this), rev_enumerator(*this, app) { dbw.set_on_revision_written(boost::bind(&session::rev_written_callback, @@ -744,10 +715,10 @@ bool session::done_all_refinements() { - bool all = rev_refiner.done() - && cert_refiner.done() - && key_refiner.done() - && epoch_refiner.done(); + bool all = rev_refiner.done + && cert_refiner.done + && key_refiner.done + && epoch_refiner.done; return all; } @@ -758,10 +729,10 @@ { if (role == source_role) return true; - bool all = rev_refiner.items_to_receive.empty() - && cert_refiner.items_to_receive.empty() - && key_refiner.items_to_receive.empty() - && epoch_refiner.items_to_receive.empty(); + bool all = rev_refiner.items_to_receive == 0 + && cert_refiner.items_to_receive == 0 + && key_refiner.items_to_receive == 0 + && epoch_refiner.items_to_receive == 0; return all; } @@ -792,11 +763,11 @@ session::maybe_note_epochs_finished() { // Maybe there are outstanding epoch requests. - if (!epoch_refiner.items_to_receive.empty()) + if (!epoch_refiner.items_to_receive == 0) return; // And maybe we haven't even finished the refinement. - if (!epoch_refiner.done()) + if (!epoch_refiner.done) return; // If we ran into an error -- say a mismatched epoch -- don't do any @@ -811,26 +782,39 @@ rev_refiner.begin_refinement(); } +static void +decrement_if_nonzero(netcmd_item_type ty, + size_t & n) +{ + if (n == 0) + { + string typestr; + netcmd_item_type_to_string(ty, typestr); + E(false, F("underflow on count of %s items to receive") % typestr); + } + --n; +} + void session::note_item_arrived(netcmd_item_type ty, id const & ident) { switch (ty) { case cert_item: - cert_refiner.items_to_receive.erase(ident); + decrement_if_nonzero(ty, cert_refiner.items_to_receive); if (cert_in_ticker.get() != NULL) ++(*cert_in_ticker); break; case revision_item: - rev_refiner.items_to_receive.erase(ident); + decrement_if_nonzero(ty, rev_refiner.items_to_receive); if (revision_in_ticker.get() != NULL) ++(*revision_in_ticker); break; case key_item: - key_refiner.items_to_receive.erase(ident); + decrement_if_nonzero(ty, key_refiner.items_to_receive); break; case epoch_item: - epoch_refiner.items_to_receive.erase(ident); + decrement_if_nonzero(ty, epoch_refiner.items_to_receive); break; default: // No ticker for other things. @@ -999,15 +983,15 @@ } void -session::queue_done_cmd(size_t level, - netcmd_item_type type) +session::queue_done_cmd(netcmd_item_type type, + size_t n_items) { string typestr; netcmd_item_type_to_string(type, typestr); - L(F("queueing 'done' command for %s level %s\n") - % typestr % level); + L(F("queueing 'done' command for %s (%d items)\n") + % typestr % n_items); netcmd cmd; - cmd.write_done_cmd(level, type); + cmd.write_done_cmd(type, n_items); write_netcmd_and_try_flush(cmd); } @@ -1068,47 +1052,21 @@ } void -session::queue_refine_cmd(merkle_node const & node) +session::queue_refine_cmd(refinement_type ty, merkle_node const & node) { string typestr; hexenc hpref; node.get_hex_prefix(hpref); netcmd_item_type_to_string(node.type, typestr); - L(F("queueing request for refinement of %s node '%s', level %d\n") + L(F("queueing refinement %s of %s node '%s', level %d\n") + % (ty == refinement_query ? "query" : "response") % typestr % hpref % static_cast(node.level)); netcmd cmd; - cmd.write_refine_cmd(node); + cmd.write_refine_cmd(ty, node); write_netcmd_and_try_flush(cmd); } void -session::queue_note_item_cmd(netcmd_item_type ty, id item) -{ - string typestr; - hexenc hitem; - encode_hexenc(item, hitem); - netcmd_item_type_to_string(ty, typestr); - L(F("queueing note about %s item '%s'") % typestr % hitem); - netcmd cmd; - cmd.write_note_item_cmd(ty, item); - write_netcmd_and_try_flush(cmd); -} - -void -session::queue_note_shared_subtree_cmd(netcmd_item_type ty, - prefix const & pref, - size_t level) -{ - string typestr; - netcmd_item_type_to_string(ty, typestr); - L(F("queueing note about shared %s subtree at level %d") - % typestr % level); - netcmd cmd; - cmd.write_note_shared_subtree_cmd(ty, pref, level); - write_netcmd_and_try_flush(cmd); -} - -void session::queue_data_cmd(netcmd_item_type type, id const & item, string const & dat) @@ -1556,7 +1514,7 @@ } bool -session::process_refine_cmd(merkle_node const & node) +session::process_refine_cmd(refinement_type ty, merkle_node const & node) { string typestr; netcmd_item_type_to_string(node.type, typestr); @@ -1570,19 +1528,19 @@ break; case key_item: - key_refiner.process_peer_node(node); + key_refiner.process_refinement_command(ty, node); break; case revision_item: - rev_refiner.process_peer_node(node); + rev_refiner.process_refinement_command(ty, node); break; case cert_item: - cert_refiner.process_peer_node(node); + cert_refiner.process_refinement_command(ty, node); break; case epoch_item: - epoch_refiner.process_peer_node(node); + epoch_refiner.process_refinement_command(ty, node); break; } return true; @@ -1670,7 +1628,7 @@ } bool -session::process_done_cmd(size_t level, netcmd_item_type type) +session::process_done_cmd(netcmd_item_type type, size_t n_items) { switch (type) { @@ -1679,22 +1637,22 @@ break; case key_item: - key_refiner.process_done_command(level); - if (key_refiner.done() && role != sink_role) + key_refiner.process_done_command(n_items); + if (key_refiner.done && role != sink_role) send_all_data(key_item, key_refiner.items_to_send); break; case revision_item: - rev_refiner.process_done_command(level); + rev_refiner.process_done_command(n_items); break; case cert_item: - cert_refiner.process_done_command(level); + cert_refiner.process_done_command(n_items); break; case epoch_item: - epoch_refiner.process_done_command(level); - if (epoch_refiner.done()) + epoch_refiner.process_done_command(n_items); + if (epoch_refiner.done) { send_all_data(epoch_item, epoch_refiner.items_to_send); maybe_note_epochs_finished(); @@ -1704,64 +1662,6 @@ return true; } -bool -session::process_note_item_cmd(netcmd_item_type ty, id const & item) -{ - switch (ty) - { - case file_item: - W(F("Unexpected 'note_item' command on non-refined item type\n")); - break; - - case key_item: - key_refiner.note_item_in_peer(item); - break; - - case revision_item: - rev_refiner.note_item_in_peer(item); - break; - - case cert_item: - cert_refiner.note_item_in_peer(item); - break; - - case epoch_item: - epoch_refiner.note_item_in_peer(item); - break; - } - return true; -} - -bool -session::process_note_shared_subtree_cmd(netcmd_item_type ty, - prefix const & pref, - size_t lev) -{ - switch (ty) - { - case file_item: - W(F("Unexpected 'note_item' command on non-refined item type\n")); - break; - - case key_item: - key_refiner.note_subtree_shared_with_peer(pref, lev); - break; - - case revision_item: - rev_refiner.note_subtree_shared_with_peer(pref, lev); - break; - - case cert_item: - cert_refiner.note_subtree_shared_with_peer(pref, lev); - break; - - case epoch_item: - epoch_refiner.note_subtree_shared_with_peer(pref, lev); - break; - } - return true; -} - void session::respond_to_confirm_cmd() { @@ -2182,42 +2082,22 @@ require(authenticated, "refine netcmd received when authenticated"); { merkle_node node; - cmd.read_refine_cmd(node); - return process_refine_cmd(node); + refinement_type ty; + cmd.read_refine_cmd(ty, node); + return process_refine_cmd(ty, node); } break; case done_cmd: require(authenticated, "done netcmd received when not authenticated"); { - size_t level; + size_t n_items; netcmd_item_type type; - cmd.read_done_cmd(level, type); - return process_done_cmd(level, type); + cmd.read_done_cmd(type, n_items); + return process_done_cmd(type, n_items); } break; - case note_item_cmd: - require(authenticated, "note_item netcmd received when not authenticated"); - { - netcmd_item_type ty; - id item; - cmd.read_note_item_cmd(ty, item); - return process_note_item_cmd(ty, item); - } - break; - - case note_shared_subtree_cmd: - require(authenticated, "note_shared_subtree netcmd received when not authenticated"); - { - netcmd_item_type ty; - prefix pref; - size_t lev; - cmd.read_note_shared_subtree_cmd(ty, pref, lev); - return process_note_shared_subtree_cmd(ty, pref, lev); - } - break; - case data_cmd: require(authenticated, "data netcmd received when not authenticated"); require(role == sink_role || ============================================================ --- refiner.cc 826ecae4d4133f4622670a807d5d1055e35c1979 +++ refiner.cc 6e80c65a1f4828f79ebe8aee7244137feb5959e0 @@ -15,20 +15,41 @@ #include "vocab.hh" #include "merkle_tree.hh" #include "netcmd.hh" +#include "netsync.hh" using std::string; using std::set; using std::make_pair; -// The previous incarnation of this algorithm had code related to sending -// decisions (and skippable transmissions) mixed in with the refinement -// actions. +// Our goal is to learn the complete set of items to send. To do this +// we exchange two types of refinement commands: queries and responses. // -// This incarnation is much simpler: our goal is only to learn the complete -// set of items in our peer's tree, and inform our peer of every item we -// have in our tree. To do this we must perform a complete refinement, and -// record the results in an in-memory table. We will decide what to send -// elsewhere, based on this knowledge. +// - On receiving a 'query' refinement for a node (p,l) you have: +// - Compare the query node to your node (p,l), noting all the leaves +// you must send as a result of what you learn in comparison. +// - For each slot, if you have a subtree where the peer does not +// (or you both do, and yours differs) send a sub-query for that +// node, incrementing your query-in-flight counter. +// - Send a 'response' refinement carrying your node (p,l) +// +// - On receiving a 'query' refinement for a node (p,l) you don't have: +// - Send a 'response' refinement carrying an empty synthetic node (p,l) +// +// - On receiving a 'response' refinement for (p,l) +// - Compare the query node to your node (p,l), noting all the leaves +// you must send as a result of what you learn in comparison. +// - Decrement your query-in-flight counter. +// +// The client kicks the process off by sending a query refinement for the +// root node. When the client's query-in-flight counter drops to zero, +// the client sends a done command, stating how many items it will be +// sending. +// +// When the server receives a done command, it echoes it back stating how +// many items *it* is going to send. +// +// When either side receives a done command, it transitions to +// streaming send mode, sending all the items it's calculated. void refiner::note_local_item(id const & item) @@ -37,71 +58,17 @@ insert_into_merkle_tree(table, type, item, 0); } - void refiner::reindex_local_items() { recalculate_merkle_codes(table, prefix(""), 0); } - -void -refiner::refine_synthetic_empty_subtree(merkle_node const & their_node, - size_t slot) -{ - // Our peer has a subtree, we have nothing. We want to explore their - // subtree but we have nothing real to send, so we synthesize an empty - // node and send it as a refinement request. - merkle_node our_fake_node; - their_node.extended_prefix(slot, our_fake_node.pref); - our_fake_node.level = their_node.level + 1; - our_fake_node.type = their_node.type; - cb.queue_refine_cmd(our_fake_node); - exchanged_data_since_last_done_cmd = true; -} - - -void -refiner::refine_synthetic_singleton_subtree(merkle_node const & their_node, - merkle_node const & our_node, - size_t slot) -{ - // Our peer has a subtree, we have a single leaf. We want to explore - // their subtree but we have nothing real to send, so we synthesize an - // empty subtree and push our leaf into it, sending a refinement request - // on the new (fake) subtree. - size_t subslot; - id our_slotval; - merkle_node our_fake_subtree; - our_node.get_raw_slot(slot, our_slotval); - pick_slot_and_prefix_for_value(our_slotval, our_node.level + 1, - subslot, our_fake_subtree.pref); - our_fake_subtree.type = their_node.type; - our_fake_subtree.level = our_node.level + 1; - our_fake_subtree.set_raw_slot(subslot, our_slotval); - our_fake_subtree.set_slot_state(subslot, our_node.get_slot_state(slot)); - cb.queue_refine_cmd(our_fake_subtree); - exchanged_data_since_last_done_cmd = true; -} - - -void -refiner::inform_peer_of_item_in_slot(merkle_node const & our_node, - size_t slot) -{ - id slotval; - string tmp; - our_node.get_raw_slot(slot, slotval); - cb.queue_note_item_cmd(type, slotval); - exchanged_data_since_last_done_cmd = true; -} - - void refiner::load_merkle_node(size_t level, prefix const & pref, merkle_ptr & node) { - merkle_table::const_iterator j = table.find(std::make_pair(pref, level)); + merkle_table::const_iterator j = table.find(std::make_pair(pref, level)); I(j != table.end()); node = j->second; } @@ -115,105 +82,52 @@ } void -refiner::calculate_items_to_send_and_receive() +refiner::calculate_items_to_send() { - if (calculated_items_to_send_and_receive) + if (calculated_items_to_send) return; items_to_send.clear(); - items_to_receive.clear(); + items_to_receive = 0; std::set_difference(local_items.begin(), local_items.end(), peer_items.begin(), peer_items.end(), std::inserter(items_to_send, items_to_send.begin())); - std::set_difference(peer_items.begin(), peer_items.end(), - local_items.begin(), local_items.end(), - std::inserter(items_to_receive, items_to_receive.begin())); string typestr; netcmd_item_type_to_string(type, typestr); - L(F("pid %d determined %d %s items to send\n") - % getpid() % items_to_send.size() % typestr); - L(F("pid %d determined %d %s items to receive\n") - % getpid() % items_to_receive.size() % typestr); - calculated_items_to_send_and_receive = true; + L(F("determined %d %s items to send") % items_to_send.size() % typestr); + calculated_items_to_send = true; } void -refiner::inform_peer_of_subtree_in_slot(merkle_node const & our_node, - size_t slot) +refiner::send_subquery(merkle_node const & our_node, size_t slot) { prefix subprefix; our_node.extended_raw_prefix(slot, subprefix); merkle_ptr our_subtree; load_merkle_node(our_node.level + 1, subprefix, our_subtree); - cb.queue_refine_cmd(*our_subtree); - exchanged_data_since_last_done_cmd = true; + cb.queue_refine_cmd(refinement_query, *our_subtree); + ++queries_in_flight; } void -refiner::note_subtree_shared_with_peer(merkle_node const & our_subtree) +refiner::note_subtree_shared_with_peer(merkle_node const & our_node, size_t slot) { prefix pref; - our_subtree.get_raw_prefix(pref); - collect_items_in_subtree(table, pref, our_subtree.level, peer_items); - exchanged_data_since_last_done_cmd = true; -} - -void -refiner::note_subtree_shared_with_peer(prefix const & pref, size_t lev) -{ - collect_items_in_subtree(table, pref, lev, peer_items); - exchanged_data_since_last_done_cmd = true; -} - - -void -refiner::compare_subtrees_and_maybe_refine(merkle_node const & their_node, - merkle_node const & our_node, - size_t slot) -{ - // Our peer has a subtree at slot, and so do we. - // - // There are three things to do here: - // - // 1. If we have the same subtree as the peer, for every item in our - // subtree, make a note to ourself that the peer has that item too. - // - // 2. If we have the same subtree, make sure our peer knows it, so - // they can perform #1 for themselves. - // - // 3. If we have different subtrees, refine. - - id our_slotval, their_slotval; - their_node.get_raw_slot(slot, their_slotval); - our_node.get_raw_slot(slot, our_slotval); - - prefix pref; our_node.extended_raw_prefix(slot, pref); - merkle_ptr our_subtree; - size_t level = our_node.level + 1; - load_merkle_node(level, pref, our_subtree); - - if (their_slotval == our_slotval) - { - cb.queue_note_shared_subtree_cmd(type, pref, level); - note_subtree_shared_with_peer(*our_subtree); - } - else - cb.queue_refine_cmd(*our_subtree); - - exchanged_data_since_last_done_cmd = true; + collect_items_in_subtree(table, pref, our_node.level+1, peer_items); } - -refiner::refiner(netcmd_item_type type, refiner_callbacks & cb) - : type(type), cb(cb), - exchanged_data_since_last_done_cmd(false), - finished_refinement(0), - calculated_items_to_send_and_receive(false) +refiner::refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb) + : type(type), voice (voice), cb(cb), + sent_initial_query(false), + queries_in_flight(0), + calculated_items_to_send(false), + done(false), + items_to_receive(0) { merkle_ptr root = merkle_ptr(new merkle_node()); root->type = type; @@ -221,30 +135,18 @@ } void -refiner::note_item_in_peer(id const & item) +refiner::note_item_in_peer(merkle_node const & their_node, size_t slot) { - peer_items.insert(item); - exchanged_data_since_last_done_cmd = true; -} - - -void -refiner::note_item_in_peer(merkle_node const & their_node, - size_t slot) -{ I(slot < constants::merkle_num_slots); id slotval; - their_node.get_raw_slot(slot, slotval); + their_node.get_raw_slot(slot, slotval); + peer_items.insert(slotval); - note_item_in_peer(slotval); - // Write a debug message { hexenc hslotval; their_node.get_hex_slot(slot, hslotval); - - size_t lev = static_cast(their_node.level); - + hexenc hpref; their_node.get_hex_prefix(hpref); @@ -253,9 +155,8 @@ L(boost::format("peer has %s '%s' at slot %d " "(in node '%s', level %d)\n") - % typestr % hslotval % slot % hpref % lev); + % typestr % hslotval % slot % hpref % their_node.level); } - exchanged_data_since_last_done_cmd = true; } @@ -264,66 +165,36 @@ { merkle_ptr root; load_merkle_node(0, prefix(""), root); - cb.queue_refine_cmd(*root); - cb.queue_done_cmd(0, type); + cb.queue_refine_cmd(refinement_query, *root); + ++queries_in_flight; + sent_initial_query = true; } - void -refiner::process_done_command(size_t level) +refiner::process_done_command(size_t n_items) { string typestr; netcmd_item_type_to_string(type, typestr); - L(F("pid %d processing 'done' command on %s level %d\n") - % getpid() % typestr % level); - - if (!exchanged_data_since_last_done_cmd - || level >= 0xff) + calculate_items_to_send(); + items_to_receive = n_items; + + if (type == revision_item || type == cert_item) { - // Echo 'done' if we're shutting down - if (finished_refinement < 3) - { - L(F("pid %d processing 'done' command => echoing shut down of %s refinement\n") - % getpid() % typestr); - cb.queue_done_cmd(level+1, type); - } - L(F("pid %d processing 'done' command => shut down %s refinement\n") - % getpid() % typestr); - - // Mark ourselves shut down - finished_refinement++; - - // And prepare for queries from our host - calculate_items_to_send_and_receive(); + P(F("finished %s refinement: %d to send, %d to receive") + % typestr % items_to_send.size() % items_to_receive); } - else if (exchanged_data_since_last_done_cmd - && finished_refinement < 2) - { - // Echo 'done', we're still active. - L(F("pid %d processing 'done' command => continuing to %s level %d\n") - % getpid() % typestr % (level+1)); - cb.queue_done_cmd(level+1, type); - } - - // Reset exchanged_data_since_last_done_cmd - exchanged_data_since_last_done_cmd = false; -} -bool -refiner::done() const -{ - string typestr; - netcmd_item_type_to_string(type, typestr); + if (voice == server_voice) + cb.queue_done_cmd(type, items_to_send.size()); - L(F("%s refiner %s done\n") - % typestr % (finished_refinement >= 2 ? "is" : "is not")); - return finished_refinement >= 2; + done = true; } void -refiner::process_peer_node(merkle_node const & their_node) +refiner::process_refinement_command(refinement_type ty, + merkle_node const & their_node) { prefix pref; hexenc hpref; @@ -334,115 +205,71 @@ netcmd_item_type_to_string(their_node.type, typestr); size_t lev = static_cast(their_node.level); - L(F("received 'refine' netcmd on %s node '%s', level %d\n") - % typestr % hpref % lev); + L(F("received refinement %s netcmd on %s node '%s', level %d") + % (ty == refinement_query ? "query" : "response") % typestr % hpref % lev); - if (!merkle_node_exists(their_node.level, pref)) + merkle_ptr our_node; + + if (merkle_node_exists(their_node.level, pref)) + load_merkle_node(their_node.level, pref, our_node); + else { - L(F("no corresponding %s merkle node for prefix '%s', level %d\n") - % typestr % hpref % lev); + // Synthesize empty node if we don't have one. + our_node = merkle_ptr(new merkle_node); + our_node->pref = their_node.pref; + our_node->level = their_node.level; + our_node->type = their_node.type; + } + + for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) + { + // Note any leaves they have. + if (their_node.get_slot_state(slot) == leaf_state) + note_item_in_peer(their_node, slot); - for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) - { - switch (their_node.get_slot_state(slot)) - { - case empty_state: - // We agree, this slot is empty. - break; + // Compare any subtrees, if we both have subtrees. + if (our_node->get_slot_state(slot) == subtree_state + && their_node.get_slot_state(slot) == subtree_state) + { + id our_slotval, their_slotval; + their_node.get_raw_slot(slot, their_slotval); + our_node->get_raw_slot(slot, our_slotval); + + // Always note when you share a subtree. + if (their_slotval == our_slotval) + note_subtree_shared_with_peer(*our_node, slot); + + // Send subqueries when you have a different subtree + // and you're answering a query message. + else if (ty == refinement_query) + send_subquery(*our_node, slot); + } - case leaf_state: - note_item_in_peer(their_node, slot); - break; + // Note: if they had a leaf (or empty) where I had a subtree, I + // will have noted the leaf and will not send it. They will not + // have any of the *other* parts of my subtree, so it's ok if I + // eventually wind up sending the subtree-minus-their-leaf. + } + + if (ty == refinement_response) + { + E((queries_in_flight > 0), + F("underflow on query-in-flight counter")); + --queries_in_flight; - case subtree_state: - refine_synthetic_empty_subtree(their_node, slot); - break; - } - } + // Possibly this signals the end of refinement. + if (voice == client_voice && queries_in_flight == 0) + { + calculate_items_to_send(); + cb.queue_done_cmd(type, items_to_send.size()); + } } else { - // We have a corresponding merkle node. There are 9 branches - // to the following switch condition. It is awful. Sorry. - L(F("found corresponding %s merkle node for prefix '%s', level %d\n") - % typestr % hpref % lev); - merkle_ptr our_node; - load_merkle_node(their_node.level, pref, our_node); - - for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) - { - switch (their_node.get_slot_state(slot)) - { - case empty_state: - switch (our_node->get_slot_state(slot)) - { - - case empty_state: - // 1: theirs == empty, ours == empty - break; - - case leaf_state: - // 2: theirs == empty, ours == leaf - inform_peer_of_item_in_slot(*our_node, slot); - break; - - case subtree_state: - // 3: theirs == empty, ours == subtree - inform_peer_of_subtree_in_slot(*our_node, slot); - break; - - } - break; - - - case leaf_state: - switch (our_node->get_slot_state(slot)) - { - - case empty_state: - // 4: theirs == leaf, ours == empty - note_item_in_peer(their_node, slot); - break; - - case leaf_state: - // 5: theirs == leaf, ours == leaf - note_item_in_peer(their_node, slot); - inform_peer_of_item_in_slot(*our_node, slot); - break; - - case subtree_state: - // 6: theirs == leaf, ours == subtree - note_item_in_peer(their_node, slot); - inform_peer_of_subtree_in_slot(*our_node, slot); - break; - } - break; - - case subtree_state: - switch (our_node->get_slot_state(slot)) - { - case empty_state: - // 7: theirs == subtree, ours == empty - refine_synthetic_empty_subtree(their_node, slot); - break; - - case leaf_state: - // 14: theirs == subtree, ours == leaf - refine_synthetic_singleton_subtree(their_node, - *our_node, slot); - break; - - case subtree_state: - // 16: theirs == subtree, ours == subtree - compare_subtrees_and_maybe_refine(their_node, - *our_node, slot); - break; - } - break; - } - } + // Always reply to every query with the current node. + I(ty == refinement_query); + cb.queue_refine_cmd(refinement_response, *our_node); } - exchanged_data_since_last_done_cmd = true; } ============================================================ --- refiner.hh cdb2d13b9054ea56707690d422ee4349774df28c +++ refiner.hh 9f724877248fc986aed3c2de8fa39d5a27f934d1 @@ -11,6 +11,7 @@ #include "vocab.hh" #include "merkle_tree.hh" #include "netcmd.hh" +#include "netsync.hh" // This file defines the "refiner" class, which is a helper encapsulating // the main tricky part of the netsync algorithm. You must construct a @@ -31,17 +32,13 @@ // 5. When done, stop refining and examine the sets of local and peer // items you've determined the existence of during refinement. - struct refiner_callbacks { - virtual void queue_refine_cmd(merkle_node const & our_node) = 0; - virtual void queue_note_item_cmd(netcmd_item_type ty, id item) = 0; - virtual void queue_note_shared_subtree_cmd(netcmd_item_type ty, - prefix const & pref, - size_t level) = 0; - virtual void queue_done_cmd(size_t level, - netcmd_item_type ty) = 0; + virtual void queue_refine_cmd(refinement_type ty, + merkle_node const & our_node) = 0; + virtual void queue_done_cmd(netcmd_item_type ty, + size_t n_items) = 0; virtual ~refiner_callbacks() {} }; @@ -49,11 +46,13 @@ refiner { netcmd_item_type type; + protocol_voice voice; refiner_callbacks & cb; - bool exchanged_data_since_last_done_cmd; - size_t finished_refinement; - bool calculated_items_to_send_and_receive; + bool sent_initial_query; + size_t queries_in_flight; + bool calculated_items_to_send; + std::set local_items; std::set peer_items; merkle_table table; @@ -63,33 +62,27 @@ void refine_synthetic_singleton_subtree(merkle_node const & their_node, merkle_node const & our_node, size_t slot); - void inform_peer_of_item_in_slot(merkle_node const & our_node, size_t slot); - void inform_peer_of_subtree_in_slot(merkle_node const & our_node, size_t slot); - void note_subtree_shared_with_peer(merkle_node const & our_subtree); - void compare_subtrees_and_maybe_refine(merkle_node const & their_node, - merkle_node const & our_node, - size_t slot); + void note_subtree_shared_with_peer(merkle_node const & our_node, size_t slot); + void send_subquery(merkle_node const & our_node, size_t slot); void note_item_in_peer(merkle_node const & their_node, size_t slot); void load_merkle_node(size_t level, prefix const & pref, merkle_ptr & node); bool merkle_node_exists(size_t level, prefix const & pref); - void calculate_items_to_send_and_receive(); + void calculate_items_to_send(); public: - refiner(netcmd_item_type type, refiner_callbacks & cb); - void note_item_in_peer(id const & item); - void note_subtree_shared_with_peer(prefix const & pref, size_t lev); + refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb); void note_local_item(id const & item); void reindex_local_items(); void begin_refinement(); - bool done() const; - void process_done_command(size_t level); - void process_peer_node(merkle_node const & their_node); + void process_done_command(size_t n_items); + void process_refinement_command(refinement_type ty, merkle_node const & their_node); // These are populated as the 'done' packets arrive. + bool done; std::set items_to_send; - std::set items_to_receive; + size_t items_to_receive; };