[Top][All Lists]

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9713: add new dispatch thread for a

From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9713: add new dispatch thread for a single I/O thread that can handle multiple
Date: Thu, 06 Nov 2008 15:45:37 -0700
User-agent: Bazaar (1.5)

revno: 9713
committer: address@hidden
branch nick: rtmp
timestamp: Thu 2008-11-06 15:45:37 -0700
  add new dispatch thread for a single I/O thread that can handle multiple
  connections to reduce thread craziness.
  make work single threaded to help with debugging.
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-11-01 14:58:30 +0000
+++ b/cygnal/cygnal.cpp 2008-11-06 22:45:37 +0000
@@ -60,6 +60,7 @@
 #include "gmemory.h"
 #include "diskstream.h"
 #include "arg_parser.h"
+#include "GnashException.h"
 // classes internal to Cygnal
 #include "buffer.h"
@@ -84,16 +85,12 @@
 using namespace cygnal;
 using namespace amf;
-// Keep a list of all active network connections
-namespace gnash {
-extern map<int, gnash::Handler *> handlers;
 static void usage();
 static void version_and_copyright();
 static void cntrlc_handler(int sig);
 void connection_handler(Handler::thread_params_t *args);
+void dispatch_handler(Handler::thread_params_t *args);
 void admin_handler(Handler::thread_params_t *args);
 LogFile& dbglogfile = LogFile::getDefaultInstance();
@@ -144,6 +141,9 @@
        << _("  -V,  --version       Print version information and exit") << 
        << _("  -v,  --verbose       Output verbose debug info") << endl
        << _("  -p   --port-offset   RTMPT port offset") << endl
+       << _("  -n,  --netdebug      Turn on net debugging messages") << endl
+       << _("  -t,  --testing       Turn on special Gnash testing support") << 
+       << _("  -a,  --admin         Enable the administration thread") << endl
        << endl;
@@ -262,7 +262,7 @@
     rtmp_data.netfd = 0;
     rtmp_data.filespec = docroot;
     boost::thread rtmp_thread(boost::bind(&connection_handler, &rtmp_data));
     // Admin handler
     if (admin) {
        Handler::thread_params_t admin_data;
@@ -388,6 +388,7 @@
                  ret = -1;
              case Handler::STATUS:
+#if 0
                  response << handlers.size() << " handlers are currently 
                  for (hit = handlers.begin(); hit != handlers.end(); hit++) {
                      int fd = hit->first;
@@ -399,6 +400,7 @@
                  index = 0;
              case Handler::POLL:
@@ -453,206 +455,147 @@
     int fd = 0;
 //    list<Handler *> *handlers = reinterpret_cast<list<Handler *> 
+    static int tid = 0;
     Network net;
     if (netdebug) {
+    // Start a server on this tcp/ip port.
     fd = net.createServer(args->port);
     log_debug("Starting Connection Handler for fd #%d, port %hd", fd, 
     // FIXME: this runs forever, we probably want a cleaner way to test for 
the end of time.
     do {
+       net.setPort(args->port);
+       if (netdebug) {
+           net.toggleDebug(true);
+       }
+       // Wait for a connection to this tcp/ip from a client. If set
+       // to true, this will block until a request comes in. If set
+       // to single threaded mode, this will only allow one client to
+       // connect at a time. This is to make it easier to debug
+       // things when you have a heavily threadd application.
+       args->netfd = net.newConnection(true, fd);
+       log_debug("New network connection for fd #%d", args->netfd);
+       // Get the number of cpus in this system. For multicore
+       // systems we'll get better load balancing if we keep all the
+       // cpus busy. So a pool of threrads is started for each cpu,
+       // the default being just one. Each thread is reponsible for
+       // handling part of the total active file descriptors.
+       long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
+       log_debug("This system has %d cpus.", ncpus);
+       size_t nfds = crcfile.getFDThread();
+       log_debug("This system is configured for %d file descriptors to be 
watched by each thread.", nfds);
+       // cap the number of threads
+       int cpu = (cpu % ncpus);
+       // Get the next thread ID to hand off handling this file
+       // descriptor to. If the limit for threads per cpu hasn't been
+       // set or is set to 0, assume one thread per processor by
+       // default. There won't even be threads for each cpu if
+       // threading has been disabled in the cygnal config file.
+       int spawn_limit = 0;
+       if (nfds == 0) {
+           spawn_limit = ncpus;
+       } else {
+           spawn_limit = ncpus * nfds;
+       }
+       // Rotate in a range of 0 to the limit.
+       tid = (tid + 1) % (spawn_limit + 1);
+       log_debug("thread ID %d for fd #%d", tid, args->netfd);
        Handler *hand = new Handler;
-       hand->setPort(args->port);
-       if (netdebug) {
-           hand->toggleDebug(true);
-       }
-       args->netfd = hand->newConnection(true, fd);
-       args->handle = hand;
-       log_debug("Adding handler: %x for fd #%d", (void *)hand, args->netfd);
-       handlers[args->netfd] = hand;
+       struct pollfd fds;
+       fds.fd = args->netfd;
+       if (args->port == (port_offset + RTMPT_PORT)) {
+           hand->addPollFD(fds, http_handler);
+       }
+       if (args->port == RTMP_PORT) {
+           hand->addPollFD(fds, rtmp_handler);
+       }
+       // if supporting multiple threads
        if (crcfile.getThreadingFlag()) {
-           hand->start(args);
-       } else {
-           log_debug(_("Starting Handlers for port %d, tid %ld"),
-                     args->port, get_thread_id());
+           hand = new Handler;
+           args->handler = hand;
+           log_debug("Starting handler: %x for fd #%d", (void *)hand, 
            if (args->port == (port_offset + RTMPT_PORT)) {
                boost::thread handler(boost::bind(&http_handler, args));
            if (args->port == RTMP_PORT) {
                boost::thread handler(boost::bind(&rtmp_handler, args));
+           dispatch_handler(args);
+//         net.closeNet(args->netfd);
+       } else {                // single threaded
+           log_debug("Single threaded mode for fd #%d", args->netfd);
+           args->handler = hand;
+           dispatch_handler(args);
+       //
        log_debug("Restarting loop for next connection for port %d...", 
     } while(1);
     // All threads should exit now.
 } // end of connection_handler
-extern "C" {
-http_handler(Handler::thread_params_t *args)
+dispatch_handler(Handler::thread_params_t *args)
-//    struct thread_params thread_data;
-    string url, filespec, parameters;
-    string::size_type pos;
-    Handler *hand = reinterpret_cast<Handler *>(args->handle);
-    HTTP www;
-    www.setHandler(hand);
-    log_debug(_("Starting HTTP Handler for fd #%d, tid %ld"),
-             args->netfd, get_thread_id());
-    string docroot = args->filespec;
-    log_debug("Starting to wait for data in net for fd #%d", args->netfd);
-    // Wait for data, and when we get it, process it.
-    do {
-       hand->wait();
-       if (hand->timetodie()) {
-           log_debug("Not waiting no more, no more for more HTTP data for fd 
#%d...", args->netfd);
-           map<int, Handler *>::iterator hit = handlers.find(args->netfd);
-           if ((*hit).second) {
-               log_debug("Removing handle %x for HTTP on fd #%d",
-                         (void *)hand, args->netfd);
-               handlers.erase(args->netfd);
-               delete (*hit).second;
-           }
-           return;
-       }
-       struct timespec start;
-       clock_gettime (CLOCK_REALTIME, &start);
-//     conndata->statistics->setFileType(NetStats::RTMPT);
-//     conndata->statistics->startClock();
-//     args->netfd = www.getFileFd();
-//     www.recvMsg(5);
-       www.recvMsg(args->netfd);
-       if (!www.processGetRequest()) {
-           hand->die();        // tell all the threads for this connection to 
-           hand->notifyin();
-           log_debug("Net HTTP done for fd #%d...", args->netfd);
-//         hand->closeNet(args->netfd);
-           return;
-       }
-       url = docroot;
-       url += www.getURL();
-       pos = url.find("?");
-       filespec = url.substr(0, pos);
-       parameters = url.substr(pos + 1, url.size());
-       // Get the file size for the HTTP header
-       if (www.getFileStats(filespec) == amf::AMF::FILETYPE_ERROR) {
-           www.formatErrorResponse(HTTP::NOT_FOUND);
-       }
-       // Send the reply
-       www.formatGetReply(HTTP::LIFE_IS_GOOD);
-//     cerr << "Size = " << www.getHeader().size() << "        " << 
www.getHeader() << endl;
-       hand->Network::writeNet(args->netfd, (boost::uint8_t 
*)www.getHeader().c_str(), www.getHeader().size());
-//     hand->writeNet(args->netfd, www.getHeader(), www.getHeader().size());
-//     strcpy(thread_data.filespec, filespec.c_str());
-//     thread_data.statistics = conndata->statistics;
-       // Keep track of the network statistics
-//     conndata->statistics->stopClock();
-//     log_debug (_("Bytes read: %d"), www.getBytesIn());
-//     log_debug (_("Bytes written: %d"), www.getBytesOut());
-//     st.setBytes(www.getBytesIn() + www.getBytesOut());
-//     conndata->statistics->addStats();
-       if (filespec[filespec.size()-1] == '/') {
-           filespec += "index.html";
-       }
-//     DiskStream filestream;
-#if 0
-       if (url != docroot) {
-           log_debug (_("File to load is: %s"), filespec.c_str());
-           log_debug (_("Parameters are: %s"), parameters.c_str());
-           struct stat st;
-           int filefd;
-           size_t ret;
-           struct timespec start;
-           clock_gettime (CLOCK_REALTIME, &start);
-           if (stat(filespec.c_str(), &st) == 0) {
-               filefd = ::open(filespec.c_str(), O_RDONLY);
-               log_debug (_("File \"%s\" is %lld bytes in size, disk fd #%d"), 
-                          st.st_size, filefd);
-               do {
-                   boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
-                   ret = read(filefd, buf->reference(), buf->size());
-                   if (ret == 0) { // the file is done
-                       break;
-                   }
-                   if (ret != buf->size()) {
-                       buf->resize(ret);
-                       log_debug("Got last data block from disk file, size 
%d", buf->size());
-                   }
-                   log_debug("Read %d bytes from %s.", ret, filespec);
-#if 0
-                   hand->pushout(buf);
-                   hand->notifyout();
-                   // Don't bother with the outgoing que
-                   if (ret > 0) {
-                       ret = hand->writeNet(buf);
-                   }
-               } while(ret > 0);
-               log_debug("Done transferring %s to net fd #%d",
-                         filespec, args->netfd);
-               ::close(filefd); // close the disk file
-               // See if this is a persistant connection
-//             if (!www.keepAlive()) {
-//                 log_debug("Keep-Alive is off", www.keepAlive());
-// //              hand->closeConnection();
-//             }
-               struct timespec end;
-               clock_gettime (CLOCK_REALTIME, &end);
-               log_debug("Read %d bytes from \"%s\" in %f seconds",
-                         st.st_size, filespec,
-                         (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec - 
-           }
-//         memset(args->filespec, 0, 256);
-//         memcpy(->filespec, filespec.c_str(), filespec.size());
-//         boost::thread sendthr(boost::bind(&stream_thread, args));
-//         sendthr.join();
-       }
-       struct timespec end;
-       clock_gettime (CLOCK_REALTIME, &end);
-       log_debug("Processing time for GET request was %f seconds",
-                 (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec - 
-//     conndata->statistics->dump();
-//    }
-    } while(!hand->timetodie());
-    log_debug("httphandler all done now finally...");
-} // end of httphandler
-} // end of extern C
+    Handler *hand = reinterpret_cast<Handler *>(args->handler);
+    Network net;
+    int timeout = 5000;
+    while(!hand->timetodie()) {    
+       int limit = hand->getPollFDSize();
+       net.setTimeout(timeout);
+       if (limit > 0) {
+           struct pollfd *fds = hand->getPollFDPtr();
+           boost::shared_ptr< vector<struct pollfd> > hits;
+           try {
+//                boost::shared_ptr< vector< int > > 
hits(net.waitForNetData(limit, fds));
+                hits = net.waitForNetData(limit, fds);
+               vector<struct pollfd>::iterator it;
+               cerr << "Hits: " << hits->size() << endl;
+               cerr << "Pollfds: " << hand->getPollFDSize() << endl;
+               for (it = hits->begin(); it != hits->end(); it++) {
+                   if ((it->revents & POLLRDHUP) || (it->revents & POLLNVAL))  
+                       log_debug("Revents has a POLLRDHUP or POLLNVAL set to 
%d for fd #%d",
+                                 it->revents, it->fd);
+                       hand->erasePollFD(it);
+                       net.closeNet(it->fd);
+                   }
+                   log_debug("Got something on fd #%d, 0x%x", it->fd, 
+                   hand->getEntry(it->fd)(args);
+                   net.closeNet(it->fd);
+               }
+           } catch (std::exception& e) {
+               log_error("Network connection was dropped:  %s", e.what());
+               vector<struct pollfd>::const_iterator it;
+               if (hits) {
+                   for (it = hits->begin(); it != hits->end(); it++) {
+                       log_debug("Need to disconnect fd #%d, it got an 
error.", (*it).fd);
+//                 hand->erasePollFD(*it);
+//                 net.closeNet(*it);
+                   }
+               }
+           }
+        } else {
+           log_debug("nothing to wait for.");
+           hand->die();
+        }
+    }
+} // end of dispatch_handler
 // local Variables:
 // mode: C++

reply via email to

[Prev in Thread] Current Thread [Next in Thread]