# # # patch "netsync.cc" # from [bbf01b7c90245d9e8403aa2ba4bcabf5b5edd599] # to [591fee6b1242f90dd4679276fb1244c7699278c0] # # patch "netxx_pipe.cc" # from [21274ec223e9a3dedade6975b3f18cb9c0a175eb] # to [1212f6dce28e21811c42d88e081cbfe047e3ceb2] # # patch "netxx_pipe.hh" # from [2086562743c62a48b6062c2e7ece72ad89d49872] # to [e3cae796264771899b8ccf76aca8d7593419f66a] # # patch "netxx_pipe_stdio_main.cc" # from [50a1f32b8503d9e31034799a767899cb89ee368f] # to [97b4393bdddf54aa2009e0d2a7af23ad76f76817] # ============================================================ --- netsync.cc bbf01b7c90245d9e8403aa2ba4bcabf5b5edd599 +++ netsync.cc 591fee6b1242f90dd4679276fb1244c7699278c0 @@ -2287,6 +2287,9 @@ session::arm() return armed; } +// Return values: +// 'false' - Session encountered an error or is completed, and should be disconnected +// 'true' - Session is healthy and has more to do bool session::process(transaction_guard & guard) { if (encountered_error) @@ -2487,6 +2490,19 @@ call_server(protocol_role role, "peer %s, disconnecting") % sess.peer_id)); } + + // When the server connection is file:, we don't detect the server + // closing its socket. So check for protocol done here. + if (sess.protocol_state == session::confirmed_state) + { + // write last message + sess.write_some(); + + guard.commit(); + + P(F("successful exchange with %s") % sess.peer_id); + return; + } } } @@ -2498,6 +2514,7 @@ drop_session_associated_with_fd(map sess = i->second; fd = sess->str->get_socketfd(); + L(FL("disconnecting from fd %d (peer %s)") % fd % sess->peer_id); sessions.erase(fd); } @@ -2687,8 +2704,7 @@ reap_dead_sessions(map > & sessions, unsigned long timeout_seconds) { - // Kill any clients which haven't done any i/o inside the timeout period - // or who have exchanged all items and flushed their output buffers. + // Kill any clients which haven't done any i/o inside the timeout period. set dead_clients; time_t now = ::time(NULL); for (map >::const_iterator @@ -2971,10 +2987,11 @@ serve_single_connection(protocol_role ro app_state & app, unsigned long timeout_seconds) { - shared_ptr str(new Netxx::StdioStream); + shared_ptr str(new Netxx::StdioStream); shared_ptr sess (new session (role, server_voice, include_pattern, exclude_pattern, app, "stdio", str)); + bool single_fd; Netxx::StdioProbe probe; Netxx::Timeout @@ -2982,7 +2999,7 @@ serve_single_connection(protocol_role ro timeout(static_cast(timeout_seconds)), instant(0,1); - P(F("beginning service on %s") % sess->peer_id); + P(F("beginning service on %s (fd %d)") % sess->peer_id % str->get_readfd()); sess->begin_service(); @@ -2991,9 +3008,20 @@ serve_single_connection(protocol_role ro map > sessions; set armed_sessions; - sessions[sess->str->get_socketfd()]=sess; + // StdioStream has two file descriptors, although they may be the same. We + // need to register both, because probe.ready will return events on both. + // Therefore, we are careful to never use str->get_socketfd() except in + // informative messages. In particular, we don't call other functions that + // might use str->get_socketfd(). - while (!sessions.empty()) + sessions[str->get_readfd()] = sess; + + single_fd = str->get_readfd() == str->get_writefd(); + + if (!single_fd) + sessions[str->get_writefd()] = sess; + + while (true) { probe.clear(); armed_sessions.clear(); @@ -3009,8 +3037,13 @@ serve_single_connection(protocol_role ro if (fd == -1) { if (armed_sessions.empty()) - L(FL("timed out waiting for I/O (listening on %s)") - % sess->peer_id); + { + L(FL("timed out waiting for I/O (listening on %s)") % sess->peer_id); + + drop_session_associated_with_fd(sessions, fd); + // No need to drope the other entry in sessions, if any. + break; + } } // an existing session woke up @@ -3038,12 +3071,36 @@ serve_single_connection(protocol_role ro P(F("got some OOB data on fd %d (peer %s), disconnecting") % fd % sess->peer_id); drop_session_associated_with_fd(sessions, fd); + break; } } } - process_armed_sessions(sessions, armed_sessions, guard); - reap_dead_sessions(sessions, timeout_seconds); - } + + { + Netxx::socket_type fd = str->get_readfd(); + + // This is the same logic as process_armed_sessions, but we have only + // one session, and we need to terminate when it is done. + if (!sess->process(guard)) + { + P(F("peer %s processing finished, disconnecting") + % sess->peer_id); + drop_session_associated_with_fd(sessions, fd); + break; + } + + // This is the same logic as 'reap_dead_sessions', but we only have + // one session to check, and we terminate if it's timed out. + if (static_cast(sess->last_io_time + timeout_seconds) + < static_cast(::time(NULL))) + { + P(F("fd %d (peer %s) has been idle too long, disconnecting") + % fd % sess->peer_id); + drop_session_associated_with_fd(sessions, fd); + break; + } + } + } // end while } ============================================================ --- netxx_pipe.cc 21274ec223e9a3dedade6975b3f18cb9c0a175eb +++ netxx_pipe.cc 1212f6dce28e21811c42d88e081cbfe047e3ceb2 @@ -185,6 +185,7 @@ Netxx::StdioStream::close (void) Netxx::StdioStream::close (void) { // close socket so client knows we disconnected + L(FL("closing StdioStream")); #ifdef WIN32 // readfd, writefd are the same socket; only close it once if (readfd != -1) @@ -211,12 +212,21 @@ Netxx::StdioStream::get_socketfd (void) Netxx::socket_type Netxx::StdioStream::get_socketfd (void) const { - // This is used netsync only to register the session for deletion, so it - // doesn't matter whether we return readfd or writefd. The unit test needs - // readfd in netxx_pipe_stdio_main.cc + return readfd; // only used for informative messages +} + +Netxx::socket_type +Netxx::StdioStream::get_readfd (void) const +{ return readfd; } +Netxx::socket_type +Netxx::StdioStream::get_writefd (void) const +{ + return writefd; +} + const Netxx::ProbeInfo* Netxx::StdioStream::get_probe_info (void) const { @@ -377,9 +387,24 @@ Netxx::SpawnedStream::close (void) void Netxx::SpawnedStream::close (void) { - // We assume the child process has exited; there's no point in waiting for it. + // We need to wait for the child to exit, so it reads our last message, + // releases the database, closes redirected output files etc. before we do + // whatever is next. + L(FL("waiting for spawned child")); +#ifdef WIN32 + if (child != INVALID_HANDLE_VALUE) + WaitForSingleObject(child, INFINITE); + child = INVALID_HANDLE_VALUE; +#else + if (child != -1) + while (waitpid(child,0,0) == -1 && errno == EINTR); + child = -1; +#endif + L(FL("...done")); + Child_Socket.close(); Parent_Socket.close(); + } Netxx::socket_type @@ -489,7 +514,7 @@ UNIT_TEST(pipe, stdio_stream) probe.add(stream, Netxx::Probe::ready_read); probe_result = probe.ready(short_time); I(probe_result.second == Netxx::Probe::ready_read); - I(probe_result.first == stream.get_socketfd()); + I(probe_result.first == stream.get_readfd()); bytes_read = stream.read (stream_read_buffer, sizeof(stream_read_buffer)); I(bytes_read == 2); @@ -501,7 +526,7 @@ UNIT_TEST(pipe, stdio_stream) probe.add(stream, Netxx::Probe::ready_write); probe_result = probe.ready(short_time); I(probe_result.second & Netxx::Probe::ready_write); - I(probe_result.first == stream.get_socketfd()); + I(probe_result.first == stream.get_writefd()); bytes_written = stream.write (write_buffer, 2); I(bytes_written == 2); @@ -520,58 +545,57 @@ UNIT_TEST(pipe, stdio_stream) } -void unit_test_spawn (char *cmd, int will_disconnect) -{ try - { - // netxx_pipe_stdio_main uses StdioStream, StdioProbe - Netxx::SpawnedStream spawned (cmd, vector()); +UNIT_TEST(pipe, spawn_stdio) +{ + try + { + // netxx_pipe_stdio_main uses StdioStream, StdioProbe + Netxx::SpawnedStream spawned ("./netxx_pipe_stdio_main", vector()); - char write_buf[1024]; - char read_buf[1024]; - int bytes; - Netxx::Probe probe; - Netxx::Timeout timeout(2L), short_time(0,1000); + char write_buf[1024]; + char read_buf[1024]; + int bytes; + Netxx::Probe probe; + Netxx::Timeout timeout(2L), short_time(0,1000); - // time out because no data is available - probe.clear(); - probe.add(spawned, Netxx::Probe::ready_read); - Netxx::Probe::result_type res = probe.ready(short_time); - I(res.second==Netxx::Probe::ready_none); - I(res.first == -1); + // time out because no data is available + probe.clear(); + probe.add(spawned, Netxx::Probe::ready_read); + Netxx::Probe::result_type res = probe.ready(short_time); + I(res.second==Netxx::Probe::ready_none); + I(res.first == -1); - // write should be possible - probe.clear(); - probe.add(spawned, Netxx::Probe::ready_write); - res = probe.ready(short_time); - I(res.second & Netxx::Probe::ready_write); - I(res.first==spawned.get_socketfd()); + // write should be possible + probe.clear(); + probe.add(spawned, Netxx::Probe::ready_write); + res = probe.ready(short_time); + I(res.second & Netxx::Probe::ready_write); + I(res.first==spawned.get_socketfd()); - // test binary transparency, lots of cycles - for (int c = 0; c < 256; ++c) - { - string result; - write_buf[0] = c; - write_buf[1] = 255 - c; - spawned.write(write_buf, 2); + // test binary transparency, lots of cycles + for (int c = 0; c < 256; ++c) + { + string result; + write_buf[0] = c; + write_buf[1] = 255 - c; + spawned.write(write_buf, 2); - while (result.size() < 2) - { // wait for data to arrive - probe.clear(); - probe.add(spawned, Netxx::Probe::ready_read); - res = probe.ready(timeout); - E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c); - I(res.first == spawned.get_socketfd()); + while (result.size() < 2) + { // wait for data to arrive + probe.clear(); + probe.add(spawned, Netxx::Probe::ready_read); + res = probe.ready(timeout); + E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c); + I(res.first == spawned.get_socketfd()); - bytes = spawned.read(read_buf, sizeof(read_buf)); - result += string(read_buf, bytes); + bytes = spawned.read(read_buf, sizeof(read_buf)); + result += string(read_buf, bytes); + } + I(result.size() == 2); + I(static_cast(result[0]) == c); + I(static_cast(result[1]) == 255 - c); } - I(result.size() == 2); - I(static_cast(result[0]) == c); - I(static_cast(result[1]) == 255 - c); - } - if (will_disconnect) - { // Tell netxx_pipe_stdio_main to quit, closing its socket write_buf[0] = 'q'; write_buf[1] = 'u'; @@ -579,38 +603,23 @@ void unit_test_spawn (char *cmd, int wil write_buf[3] = 't'; spawned.write(write_buf, 4); - // Wait for socket to close; should be reported as ready_read, _not_ as timeout + // Note that probe.ready here returns timeout, _not_ ready_read. probe.clear(); probe.add(spawned, Netxx::Probe::ready_read); res = probe.ready(timeout); - I(res.second==Netxx::Probe::ready_read); - I(res.first == spawned.get_socketfd()); + I(res.second==Netxx::Probe::ready_none); + I(res.first == -1); - // now read should return 0 bytes - bytes = spawned.read(read_buf, sizeof(read_buf)); - I(bytes == 0); + // This waits for the child to exit + spawned.close(); } - - spawned.close(); - - } catch (informative_failure &e) - { - W(F("Failure %s") % e.what()); - throw; - } + { + W(F("Failure %s") % e.what()); + throw; + } } -UNIT_TEST(pipe, spawn_cat) -{ - unit_test_spawn ("cat", 0); -} - -UNIT_TEST(pipe, spawn_stdio) -{ - unit_test_spawn ("./netxx_pipe_stdio_main", 1); -} - #endif // Local Variables: ============================================================ --- netxx_pipe.hh 2086562743c62a48b6062c2e7ece72ad89d49872 +++ netxx_pipe.hh e3cae796264771899b8ccf76aca8d7593419f66a @@ -110,9 +110,14 @@ namespace Netxx virtual signed_size_type read (void *buffer, size_type length); virtual signed_size_type write (const void *buffer, size_type length); virtual void close (void); - virtual socket_type get_socketfd (void) const; virtual const ProbeInfo* get_probe_info (void) const; + // In general, we have two file descriptors that netsync needs to know + // about. Netsync should never call get_socketfd to identify this + // stream; it will throw an error. + virtual socket_type get_socketfd (void) const; + virtual socket_type get_writefd (void) const; + virtual socket_type get_readfd (void) const; private: friend class StdioStreamTest; // Unit test facilities ============================================================ --- netxx_pipe_stdio_main.cc 50a1f32b8503d9e31034799a767899cb89ee368f +++ netxx_pipe_stdio_main.cc 97b4393bdddf54aa2009e0d2a7af23ad76f76817 @@ -69,9 +69,9 @@ int main (int argc, char *argv[]) quit = 1; continue; } - else if (stream.get_socketfd() != probe_result.first) + else if (stream.get_readfd() != probe_result.first) { - fprintf (stderr, "ready returned other socket\n"); + fprintf (stderr, "ready returned unknown socket\n"); quit = 1; continue; }