# # # patch "packet.cc" # from [f39c8a694ef3c87a5d262c712fb567c2cbaa54e3] # to [2d1b7869384bbb9c35573e3aa29d6f5fc81549d1] # # patch "packet.hh" # from [eb176ecd88759d676169b7809fdf6ada44ddfb17] # to [41eff9ec50dfb14b240cfbf962d95c84bf5474da] # ============================================================ --- packet.cc f39c8a694ef3c87a5d262c712fb567c2cbaa54e3 +++ packet.cc 2d1b7869384bbb9c35573e3aa29d6f5fc81549d1 @@ -92,104 +92,13 @@ // in this usage, we simply never add any prerequisites to any packet, and // just call apply_delayed_packet when the valve opens. -typedef enum - { - prereq_revision, - prereq_file - } -prereq_type; - -class delayed_packet; - -class -prerequisite -{ - hexenc ident; - prereq_type type; - set< shared_ptr > delayed; -public: - prerequisite(hexenc const & i, prereq_type pt) - : ident(i), type(pt) - {} - void add_dependent(shared_ptr p); - bool has_live_dependents(); - void satisfy(shared_ptr self, - packet_db_writer & pw); - bool operator<(prerequisite const & other) - { - return type < other.type || - (type == other.type && ident < other.ident); - } - // we need to be able to avoid circular dependencies between prerequisite and - // delayed_packet shared_ptrs. - void cleanup() { delayed.clear(); } -}; - -class +struct delayed_packet { - set< shared_ptr > unsatisfied_prereqs; - set< shared_ptr > satisfied_prereqs; -public: - void add_prerequisite(shared_ptr p); - bool all_prerequisites_satisfied(); - void prerequisite_satisfied(shared_ptr p, - packet_db_writer & pw); virtual void apply_delayed_packet(packet_db_writer & pw) = 0; virtual ~delayed_packet() {} }; -void -prerequisite::add_dependent(shared_ptr d) -{ - delayed.insert(d); -} - -void -prerequisite::satisfy(shared_ptr self, - packet_db_writer & pw) -{ - set< shared_ptr > dead; - for (set< shared_ptr >::const_iterator i = delayed.begin(); - i != delayed.end(); ++i) - { - (*i)->prerequisite_satisfied(self, pw); - if ((*i)->all_prerequisites_satisfied()) - dead.insert(*i); - } - for (set< shared_ptr >::const_iterator i = dead.begin(); - i != dead.end(); ++i) - { - delayed.erase(*i); - } -} - -void -delayed_packet::add_prerequisite(shared_ptr p) -{ - unsatisfied_prereqs.insert(p); -} - -bool -delayed_packet::all_prerequisites_satisfied() -{ - return unsatisfied_prereqs.empty(); -} - -void -delayed_packet::prerequisite_satisfied(shared_ptr p, - packet_db_writer & pw) -{ - I(unsatisfied_prereqs.find(p) != unsatisfied_prereqs.end()); - unsatisfied_prereqs.erase(p); - satisfied_prereqs.insert(p); - if (all_prerequisites_satisfied()) - { - apply_delayed_packet(pw); - } -} - - // concrete delayed packets class @@ -204,7 +113,6 @@ : ident(i), dat(md) {} virtual void apply_delayed_packet(packet_db_writer & pw); - virtual ~delayed_revision_data_packet(); }; @@ -220,7 +128,6 @@ : ident(i), dat(fd) {} virtual void apply_delayed_packet(packet_db_writer & pw); - virtual ~delayed_file_data_packet(); }; class @@ -241,7 +148,6 @@ : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full) {} virtual void apply_delayed_packet(packet_db_writer & pw); - virtual ~delayed_file_delta_packet(); }; @@ -255,7 +161,6 @@ : c(c) {} virtual void apply_delayed_packet(packet_db_writer & pw); - virtual ~delayed_revision_cert_packet(); }; class @@ -270,7 +175,6 @@ : id(id), key(key) {} virtual void apply_delayed_packet(packet_db_writer & pw); - virtual ~delayed_public_key_packet(); }; class @@ -285,7 +189,6 @@ : id(id), kp(kp) {} virtual void apply_delayed_packet(packet_db_writer & pw); - virtual ~delayed_keypair_packet(); }; void @@ -295,13 +198,6 @@ pw.consume_revision_data(ident, dat); } -delayed_revision_data_packet::~delayed_revision_data_packet() -{ - if (!all_prerequisites_satisfied()) - W(F("discarding revision data packet %s with unmet dependencies\n") % ident); -} - - void delayed_file_data_packet::apply_delayed_packet(packet_db_writer & pw) { @@ -309,13 +205,6 @@ pw.consume_file_data(ident, dat); } -delayed_file_data_packet::~delayed_file_data_packet() -{ - // files have no prerequisites - I(all_prerequisites_satisfied()); -} - - void delayed_file_delta_packet::apply_delayed_packet(packet_db_writer & pw) { @@ -330,13 +219,6 @@ pw.consume_file_reverse_delta(new_id, old_id, del); } -delayed_file_delta_packet::~delayed_file_delta_packet() -{ - if (!all_prerequisites_satisfied()) - W(F("discarding file delta packet %s -> %s with unmet dependencies\n") - % old_id % new_id); -} - void delayed_revision_cert_packet::apply_delayed_packet(packet_db_writer & pw) { @@ -344,13 +226,6 @@ pw.consume_revision_cert(c); } -delayed_revision_cert_packet::~delayed_revision_cert_packet() -{ - if (!all_prerequisites_satisfied()) - W(F("discarding revision cert packet %s with unmet dependencies\n") - % c.inner().ident); -} - void delayed_public_key_packet::apply_delayed_packet(packet_db_writer & pw) { @@ -358,12 +233,6 @@ pw.consume_public_key(id, key); } -delayed_public_key_packet::~delayed_public_key_packet() -{ - // keys don't have dependencies - I(all_prerequisites_satisfied()); -} - void delayed_keypair_packet::apply_delayed_packet(packet_db_writer & pw) { @@ -371,182 +240,48 @@ pw.consume_key_pair(id, kp); } -delayed_keypair_packet::~delayed_keypair_packet() -{ - // keys don't have dependencies - I(all_prerequisites_satisfied()); -} - - void packet_consumer::set_on_revision_written(boost::function1 const & x) + revision_id> const & x) { on_revision_written=x; } void packet_consumer::set_on_cert_written(boost::function1 const & x) + cert const &> const & x) { on_cert_written=x; } void packet_consumer::set_on_pubkey_written(boost::function1 - const & x) + const & x) { on_pubkey_written=x; } void packet_consumer::set_on_keypair_written(boost::function1 - const & x) + const & x) { on_keypair_written=x; } -struct packet_db_writer::impl -{ - app_state & app; - bool take_keys; - size_t count; - - map > revision_prereqs; - map > file_prereqs; - - // ticker cert; - // ticker manc; - // ticker manw; - // ticker filec; - - bool revision_exists_in_db(revision_id const & r); - bool file_version_exists_in_db(file_id const & f); - - void get_revision_prereq(revision_id const & revision, shared_ptr & p); - void get_file_prereq(file_id const & file, shared_ptr & p); - - void accepted_revision(revision_id const & r, packet_db_writer & dbw); - void accepted_file(file_id const & f, packet_db_writer & dbw); - - impl(app_state & app, bool take_keys) - : app(app), take_keys(take_keys), count(0) - // cert("cert", 1), manc("manc", 1), manw("manw", 1), filec("filec", 1) - {} - - ~impl(); -}; - packet_db_writer::packet_db_writer(app_state & app, bool take_keys) - : pimpl(new impl(app, take_keys)) + : app(app), take_keys(take_keys) {} packet_db_writer::~packet_db_writer() {} -packet_db_writer::impl::~impl() -{ - - // break any circular dependencies for unsatisfied prerequisites - for (map >::const_iterator i = - revision_prereqs.begin(); i != revision_prereqs.end(); i++) - { - i->second->cleanup(); - } - for (map >::const_iterator i = - file_prereqs.begin(); i != file_prereqs.end(); i++) - { - i->second->cleanup(); - } -} - -bool -packet_db_writer::impl::revision_exists_in_db(revision_id const & r) -{ - return app.db.revision_exists(r); -} - - -bool -packet_db_writer::impl::file_version_exists_in_db(file_id const & f) -{ - return app.db.file_version_exists(f); -} - void -packet_db_writer::impl::get_file_prereq(file_id const & file, - shared_ptr & p) -{ - map >::const_iterator i; - i = file_prereqs.find(file); - if (i != file_prereqs.end()) - p = i->second; - else - { - p = shared_ptr(new prerequisite(file.inner(), prereq_file)); - file_prereqs.insert(make_pair(file, p)); - } -} - - -void -packet_db_writer::impl::get_revision_prereq(revision_id const & rev, - shared_ptr & p) -{ - map >::const_iterator i; - i = revision_prereqs.find(rev); - if (i != revision_prereqs.end()) - p = i->second; - else - { - p = shared_ptr(new prerequisite(rev.inner(), prereq_revision)); - revision_prereqs.insert(make_pair(rev, p)); - } -} - - -void -packet_db_writer::impl::accepted_revision(revision_id const & r, packet_db_writer & dbw) -{ - L(F("noting acceptence of revision %s\n") % r); - map >::iterator i = revision_prereqs.find(r); - if (i != revision_prereqs.end()) - { - shared_ptr prereq = i->second; - revision_prereqs.erase(i); - prereq->satisfy(prereq, dbw); - } -} - - -void -packet_db_writer::impl::accepted_file(file_id const & f, packet_db_writer & dbw) -{ - L(F("noting acceptence of file %s\n") % f); - map >::iterator i = file_prereqs.find(f); - if (i != file_prereqs.end()) - { - shared_ptr prereq = i->second; - file_prereqs.erase(i); - prereq->satisfy(prereq, dbw); - } -} - - -void packet_db_writer::consume_file_data(file_id const & ident, file_data const & dat) { - transaction_guard guard(pimpl->app.db); - if (! pimpl->file_version_exists_in_db(ident)) - { - pimpl->app.db.put_file(ident, dat); - pimpl->accepted_file(ident, *this); - } - else - L(F("skipping existing file version %s\n") % ident); - ++(pimpl->count); + transaction_guard guard(app.db); + app.db.put_file(ident, dat); guard.commit(); } @@ -564,45 +299,40 @@ file_delta const & del, bool write_full) { - transaction_guard guard(pimpl->app.db); - if (! pimpl->file_version_exists_in_db(new_id)) + transaction_guard guard(app.db); + + if (app.db.file_version_exists(new_id)) { - if (pimpl->file_version_exists_in_db(old_id)) - { - file_id confirm; - file_data old_dat; - data new_dat; - pimpl->app.db.get_file_version(old_id, old_dat); - patch(old_dat.inner(), del.inner(), new_dat); - calculate_ident(file_data(new_dat), confirm); - if (confirm == new_id) - { - if (!write_full) - pimpl->app.db.put_file_version(old_id, new_id, del); - else - pimpl->app.db.put_file(new_id, file_data(new_dat)); - pimpl->accepted_file(new_id, *this); - } - else - { - W(F("reconstructed file from delta '%s' -> '%s' has wrong id '%s'\n") - % old_id % new_id % confirm); - } - } + L(F("file version '%s' already exists in db\n") % new_id); + return; + } + + if (!app.db.file_version_exists(old_id)) + { + W(F("file preimage '%s' missing in db") % old_id); + W(F("dropping delta '%s' -> '%s'") % old_id % new_id); + return; + } + + file_id confirm; + file_data old_dat; + data new_dat; + app.db.get_file_version(old_id, old_dat); + patch(old_dat.inner(), del.inner(), new_dat); + calculate_ident(file_data(new_dat), confirm); + if (confirm == new_id) + { + if (!write_full) + app.db.put_file_version(old_id, new_id, del); else - { - L(F("delaying file delta %s -> %s for preimage\n") % old_id % new_id); - shared_ptr dp; - dp = shared_ptr(new delayed_file_delta_packet(old_id, new_id, del, true, write_full)); - shared_ptr fp; - pimpl->get_file_prereq(old_id, fp); - dp->add_prerequisite(fp); - fp->add_dependent(dp); - } + app.db.put_file(new_id, file_data(new_dat)); } else - L(F("skipping delta to existing file version %s\n") % new_id); - ++(pimpl->count); + { + W(F("reconstructed file from delta '%s' -> '%s' has wrong id '%s'\n") + % old_id % new_id % confirm); + } + guard.commit(); } @@ -611,42 +341,36 @@ file_id const & old_id, file_delta const & del) { - transaction_guard guard(pimpl->app.db); - if (! pimpl->file_version_exists_in_db(old_id)) + transaction_guard guard(app.db); + + if (app.db.file_version_exists(old_id)) { - if (pimpl->file_version_exists_in_db(new_id)) - { - file_id confirm; - file_data new_dat; - data old_dat; - pimpl->app.db.get_file_version(new_id, new_dat); - patch(new_dat.inner(), del.inner(), old_dat); - calculate_ident(file_data(old_dat), confirm); - if (confirm == old_id) - { - pimpl->app.db.put_file_reverse_version(new_id, old_id, del); - pimpl->accepted_file(old_id, *this); - } - else - { - W(F("reconstructed file from reverse delta '%s' -> '%s' has wrong id '%s'\n") - % new_id % old_id % confirm); - } - } - else - { - L(F("delaying reverse file delta %s -> %s for preimage\n") % new_id % old_id); - shared_ptr dp; - dp = shared_ptr(new delayed_file_delta_packet(old_id, new_id, del, false)); - shared_ptr fp; - pimpl->get_file_prereq(new_id, fp); - dp->add_prerequisite(fp); - fp->add_dependent(dp); - } + L(F("file version '%s' already exists in db\n") % old_id); + return; } + + if (!app.db.file_version_exists(new_id)) + { + W(F("file reverse-preimage '%s' missing in db") % new_id); + W(F("dropping reverse-delta '%s' -> '%s'") % new_id % old_id); + return; + } + + file_id confirm; + file_data new_dat; + data old_dat; + app.db.get_file_version(new_id, new_dat); + patch(new_dat.inner(), del.inner(), old_dat); + calculate_ident(file_data(old_dat), confirm); + if (confirm == old_id) + { + app.db.put_file_reverse_version(new_id, old_id, del); + } else - L(F("skipping reverse delta to existing file version %s\n") % old_id); - ++(pimpl->count); + { + W(F("reconstructed file from reverse delta '%s' -> '%s' has wrong id '%s'\n") + % new_id % old_id % confirm); + } guard.commit(); } @@ -654,114 +378,94 @@ packet_db_writer::consume_revision_data(revision_id const & ident, revision_data const & dat) { - transaction_guard guard(pimpl->app.db); - if (! pimpl->revision_exists_in_db(ident)) + transaction_guard guard(app.db); + if (app.db.revision_exists(ident)) { + L(F("revision '%s' already exists in db\n") % ident); + return; + } - shared_ptr dp; - dp = shared_ptr(new delayed_revision_data_packet(ident, dat)); + revision_set rev; + read_revision_set(dat, rev); - revision_set rev; - read_revision_set(dat, rev); + for (edge_map::const_iterator i = rev.edges.begin(); + i != rev.edges.end(); ++i) + { + if (!edge_old_revision(i).inner()().empty() + && !app.db.revision_exists(edge_old_revision(i))) + { + W(F("missing prerequisite revision '%s'\n") % edge_old_revision(i)); + W(F("dropping revision '%s'\n") % ident); + return; + } - for (edge_map::const_iterator i = rev.edges.begin(); - i != rev.edges.end(); ++i) + for (std::map::const_iterator a + = edge_changes(i).files_added.begin(); + a != edge_changes(i).files_added.end(); ++a) { - if (! (edge_old_revision(i).inner()().empty() - || pimpl->revision_exists_in_db(edge_old_revision(i)))) + if (! app.db.file_version_exists(a->second)) { - L(F("delaying revision %s for old revision %s\n") - % ident % edge_old_revision(i)); - shared_ptr fp; - pimpl->get_revision_prereq(edge_old_revision(i), fp); - dp->add_prerequisite(fp); - fp->add_dependent(dp); - } + W(F("missing prerequisite file '%s'\n") % a->second); + W(F("dropping revision '%s'\n") % ident); + return; + } + } - for (std::map::const_iterator a - = edge_changes(i).files_added.begin(); - a != edge_changes(i).files_added.end(); ++a) - { - if (! pimpl->file_version_exists_in_db(a->second)) - { - L(F("delaying revision %s for added file %s\n") - % ident % a->second); - shared_ptr fp; - pimpl->get_file_prereq(a->second, fp); - dp->add_prerequisite(fp); - fp->add_dependent(dp); - } - } + for (std::map >::const_iterator d + = edge_changes(i).deltas_applied.begin(); + d != edge_changes(i).deltas_applied.end(); ++d) + { + I(!delta_entry_src(d).inner()().empty()); + I(!delta_entry_dst(d).inner()().empty()); - for (std::map >::const_iterator d - = edge_changes(i).deltas_applied.begin(); - d != edge_changes(i).deltas_applied.end(); ++d) + if (! app.db.file_version_exists(delta_entry_src(d))) { - I(!delta_entry_src(d).inner()().empty()); - I(!delta_entry_dst(d).inner()().empty()); - if (! pimpl->file_version_exists_in_db(delta_entry_src(d))) - { - L(F("delaying revision %s for old file %s\n") - % ident % delta_entry_src(d)); - shared_ptr fp; - pimpl->get_file_prereq(delta_entry_src(d), fp); - dp->add_prerequisite(fp); - fp->add_dependent(dp); - } - if (! pimpl->file_version_exists_in_db(delta_entry_dst(d))) - { - L(F("delaying revision %s for new file %s\n") - % ident % delta_entry_dst(d)); - shared_ptr fp; - pimpl->get_file_prereq(delta_entry_dst(d), fp); - dp->add_prerequisite(fp); - fp->add_dependent(dp); - } - } - } + W(F("missing prerequisite file pre-delta '%s'\n") + % delta_entry_src(d)); + W(F("dropping revision '%s'\n") % ident); + return; + } + + if (! app.db.file_version_exists(delta_entry_dst(d))) + { + W(F("missing prerequisite file post-delta '%s'\n") + % delta_entry_dst(d)); + W(F("dropping revision '%s'\n") % ident); + return; + } + } + } - if (dp->all_prerequisites_satisfied()) - { - pimpl->app.db.put_revision(ident, dat); - if(on_revision_written) on_revision_written(ident); - pimpl->accepted_revision(ident, *this); - } - } - else - L(F("skipping existing revision %s\n") % ident); - ++(pimpl->count); + app.db.put_revision(ident, dat); + if (on_revision_written) + on_revision_written(ident); guard.commit(); } void packet_db_writer::consume_revision_cert(revision const & t) { - transaction_guard guard(pimpl->app.db); - if (! pimpl->app.db.revision_cert_exists(t)) + transaction_guard guard(app.db); + + if (app.db.revision_cert_exists(t)) { - if (pimpl->revision_exists_in_db(revision_id(t.inner().ident))) - { - pimpl->app.db.put_revision_cert(t); - if(on_cert_written) on_cert_written(t.inner()); - } - else - { - L(F("delaying revision cert on %s\n") % t.inner().ident); - shared_ptr dp; - dp = shared_ptr(new delayed_revision_cert_packet(t)); - shared_ptr fp; - pimpl->get_revision_prereq(revision_id(t.inner().ident), fp); - dp->add_prerequisite(fp); - fp->add_dependent(dp); - } + L(F("revision cert on '%s' already exists in db\n") + % t.inner().ident); + return; } - else + + if (!app.db.revision_exists(revision_id(t.inner().ident))) { - string s; - cert_signable_text(t.inner(), s); - L(F("skipping existing revision cert %s\n") % s); + W(F("cert revision '%s' does not exist in db\n") + % t.inner().ident); + W(F("dropping cert\n")); + return; } - ++(pimpl->count); + + app.db.put_revision_cert(t); + if (on_cert_written) + on_cert_written(t.inner()); + guard.commit(); } @@ -770,27 +474,29 @@ packet_db_writer::consume_public_key(rsa_keypair_id const & ident, base64< rsa_pub_key > const & k) { - transaction_guard guard(pimpl->app.db); - if (! pimpl->take_keys) + transaction_guard guard(app.db); + + if (! take_keys) { - P(F("skipping prohibited public key %s\n") % ident); + W(F("skipping prohibited public key %s\n") % ident); return; } - if (! pimpl->app.db.public_key_exists(ident)) + + if (app.db.public_key_exists(ident)) { - P(F("puttingskipping public key %s\n") % ident); - pimpl->app.db.put_key(ident, k); - if(on_pubkey_written) on_pubkey_written(ident); - } - else - { base64 tmp; - pimpl->app.db.get_key(ident, tmp); + app.db.get_key(ident, tmp); if (!keys_match(ident, tmp, ident, k)) W(F("key '%s' is not equal to key '%s' in database\n") % ident % ident); P(F("skipping existing public key %s\n") % ident); + return; } - ++(pimpl->count); + + P(F("putting public key %s\n") % ident); + app.db.put_key(ident, k); + if (on_pubkey_written) + on_pubkey_written(ident); + guard.commit(); } @@ -798,20 +504,23 @@ packet_db_writer::consume_key_pair(rsa_keypair_id const & ident, keypair const & kp) { - transaction_guard guard(pimpl->app.db); - if (! pimpl->take_keys) + transaction_guard guard(app.db); + if (! take_keys) { W(F("skipping prohibited key pair %s\n") % ident); return; } - if (! pimpl->app.keys.key_pair_exists(ident)) + + if (app.keys.key_pair_exists(ident)) { - pimpl->app.keys.put_key_pair(ident, kp); - if(on_keypair_written) on_keypair_written(ident); + L(F("skipping existing key pair %s\n") % ident); + return; } - else - L(F("skipping existing key pair %s\n") % ident); - ++(pimpl->count); + + app.keys.put_key_pair(ident, kp); + if (on_keypair_written) + on_keypair_written(ident); + guard.commit(); } ============================================================ --- packet.hh eb176ecd88759d676169b7809fdf6ada44ddfb17 +++ packet.hh 41eff9ec50dfb14b240cfbf962d95c84bf5474da @@ -104,10 +104,8 @@ struct packet_db_writer : public packet_consumer { -private: - struct impl; - std::auto_ptr pimpl; - + app_state & app; + bool take_keys; public: packet_db_writer(app_state & app, bool take_keys = false);