[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9713: add new dispatch thread for a
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9713: add new dispatch thread for a single I/O thread that can handle multiple |
Date: |
Thu, 06 Nov 2008 15:45:37 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9713
committer: address@hidden
branch nick: rtmp
timestamp: Thu 2008-11-06 15:45:37 -0700
message:
add new dispatch thread for a single I/O thread that can handle multiple
connections to reduce thread craziness.
make work single threaded to help with debugging.
modified:
cygnal/cygnal.cpp
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-11-01 14:58:30 +0000
+++ b/cygnal/cygnal.cpp 2008-11-06 22:45:37 +0000
@@ -60,6 +60,7 @@
#include "gmemory.h"
#include "diskstream.h"
#include "arg_parser.h"
+#include "GnashException.h"
// classes internal to Cygnal
#include "buffer.h"
@@ -84,16 +85,12 @@
using namespace cygnal;
using namespace amf;
-// Keep a list of all active network connections
-namespace gnash {
-extern map<int, gnash::Handler *> handlers;
-}
-
static void usage();
static void version_and_copyright();
static void cntrlc_handler(int sig);
void connection_handler(Handler::thread_params_t *args);
+void dispatch_handler(Handler::thread_params_t *args);
void admin_handler(Handler::thread_params_t *args);
LogFile& dbglogfile = LogFile::getDefaultInstance();
@@ -144,6 +141,9 @@
<< _(" -V, --version Print version information and exit") <<
endl
<< _(" -v, --verbose Output verbose debug info") << endl
<< _(" -p --port-offset RTMPT port offset") << endl
+ << _(" -n, --netdebug Turn on net debugging messages") << endl
+ << _(" -t, --testing Turn on special Gnash testing support") <<
endl
+ << _(" -a, --admin Enable the administration thread") << endl
<< endl;
}
@@ -262,7 +262,7 @@
rtmp_data.netfd = 0;
rtmp_data.filespec = docroot;
boost::thread rtmp_thread(boost::bind(&connection_handler, &rtmp_data));
-
+
// Admin handler
if (admin) {
Handler::thread_params_t admin_data;
@@ -388,6 +388,7 @@
ret = -1;
break;
case Handler::STATUS:
+#if 0
response << handlers.size() << " handlers are currently
active.";
for (hit = handlers.begin(); hit != handlers.end(); hit++) {
int fd = hit->first;
@@ -399,6 +400,7 @@
net.writeNet(response.str());
index++;
}
+#endif
index = 0;
break;
case Handler::POLL:
@@ -453,206 +455,147 @@
int fd = 0;
// list<Handler *> *handlers = reinterpret_cast<list<Handler *>
*>(args->handle);
+ static int tid = 0;
Network net;
if (netdebug) {
net.toggleDebug(true);
}
+ // Start a server on this tcp/ip port.
fd = net.createServer(args->port);
log_debug("Starting Connection Handler for fd #%d, port %hd", fd,
args->port);
// FIXME: this runs forever, we probably want a cleaner way to test for
the end of time.
do {
+ net.setPort(args->port);
+ if (netdebug) {
+ net.toggleDebug(true);
+ }
+ // Wait for a connection to this tcp/ip from a client. If set
+ // to true, this will block until a request comes in. If set
+ // to single threaded mode, this will only allow one client to
+ // connect at a time. This is to make it easier to debug
+ // things when you have a heavily threadd application.
+ args->netfd = net.newConnection(true, fd);
+ log_debug("New network connection for fd #%d", args->netfd);
+
+ // Get the number of cpus in this system. For multicore
+ // systems we'll get better load balancing if we keep all the
+ // cpus busy. So a pool of threrads is started for each cpu,
+ // the default being just one. Each thread is reponsible for
+ // handling part of the total active file descriptors.
+#ifdef HAVE_SYSCONF
+ long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
+ log_debug("This system has %d cpus.", ncpus);
+#endif
+ size_t nfds = crcfile.getFDThread();
+
+ log_debug("This system is configured for %d file descriptors to be
watched by each thread.", nfds);
+
+ // cap the number of threads
+ int cpu = (cpu % ncpus);
+
+ // Get the next thread ID to hand off handling this file
+ // descriptor to. If the limit for threads per cpu hasn't been
+ // set or is set to 0, assume one thread per processor by
+ // default. There won't even be threads for each cpu if
+ // threading has been disabled in the cygnal config file.
+ int spawn_limit = 0;
+ if (nfds == 0) {
+ spawn_limit = ncpus;
+ } else {
+ spawn_limit = ncpus * nfds;
+ }
+
+ // Rotate in a range of 0 to the limit.
+ tid = (tid + 1) % (spawn_limit + 1);
+ log_debug("thread ID %d for fd #%d", tid, args->netfd);
+
Handler *hand = new Handler;
- hand->setPort(args->port);
- if (netdebug) {
- hand->toggleDebug(true);
- }
- args->netfd = hand->newConnection(true, fd);
- args->handle = hand;
- log_debug("Adding handler: %x for fd #%d", (void *)hand, args->netfd);
- handlers[args->netfd] = hand;
-
+ struct pollfd fds;
+ fds.fd = args->netfd;
+ fds.events = POLLIN |POLLRDHUP;
+ if (args->port == (port_offset + RTMPT_PORT)) {
+ hand->addPollFD(fds, http_handler);
+ }
+ if (args->port == RTMP_PORT) {
+ hand->addPollFD(fds, rtmp_handler);
+ }
+ // if supporting multiple threads
if (crcfile.getThreadingFlag()) {
- hand->start(args);
- } else {
- log_debug(_("Starting Handlers for port %d, tid %ld"),
- args->port, get_thread_id());
-
+ hand = new Handler;
+ args->handler = hand;
+ log_debug("Starting handler: %x for fd #%d", (void *)hand,
args->netfd);
if (args->port == (port_offset + RTMPT_PORT)) {
boost::thread handler(boost::bind(&http_handler, args));
}
if (args->port == RTMP_PORT) {
boost::thread handler(boost::bind(&rtmp_handler, args));
}
+ dispatch_handler(args);
+// net.closeNet(args->netfd);
+ } else { // single threaded
+ log_debug("Single threaded mode for fd #%d", args->netfd);
+ args->handler = hand;
+ dispatch_handler(args);
}
+ //
log_debug("Restarting loop for next connection for port %d...",
args->port);
} while(1);
-
+
// All threads should exit now.
alldone.notify_all();
} // end of connection_handler
-extern "C" {
void
-http_handler(Handler::thread_params_t *args)
+dispatch_handler(Handler::thread_params_t *args)
{
GNASH_REPORT_FUNCTION;
-// struct thread_params thread_data;
- string url, filespec, parameters;
- string::size_type pos;
- Handler *hand = reinterpret_cast<Handler *>(args->handle);
- HTTP www;
- www.setHandler(hand);
-
- log_debug(_("Starting HTTP Handler for fd #%d, tid %ld"),
- args->netfd, get_thread_id());
-
- string docroot = args->filespec;
-
- log_debug("Starting to wait for data in net for fd #%d", args->netfd);
-
- // Wait for data, and when we get it, process it.
- do {
-#ifdef THREADED_IO
- hand->wait();
- if (hand->timetodie()) {
- log_debug("Not waiting no more, no more for more HTTP data for fd
#%d...", args->netfd);
- map<int, Handler *>::iterator hit = handlers.find(args->netfd);
- if ((*hit).second) {
- log_debug("Removing handle %x for HTTP on fd #%d",
- (void *)hand, args->netfd);
- handlers.erase(args->netfd);
- delete (*hit).second;
- }
- return;
- }
-#endif
-
-#ifdef USE_STATISTICS
- struct timespec start;
- clock_gettime (CLOCK_REALTIME, &start);
-#endif
-
-// conndata->statistics->setFileType(NetStats::RTMPT);
-// conndata->statistics->startClock();
-// args->netfd = www.getFileFd();
-// www.recvMsg(5);
- www.recvMsg(args->netfd);
-
- if (!www.processGetRequest()) {
- hand->die(); // tell all the threads for this connection to
die
- hand->notifyin();
- log_debug("Net HTTP done for fd #%d...", args->netfd);
-// hand->closeNet(args->netfd);
- return;
- }
- url = docroot;
- url += www.getURL();
- pos = url.find("?");
- filespec = url.substr(0, pos);
- parameters = url.substr(pos + 1, url.size());
- // Get the file size for the HTTP header
-
- if (www.getFileStats(filespec) == amf::AMF::FILETYPE_ERROR) {
- www.formatErrorResponse(HTTP::NOT_FOUND);
- }
- // Send the reply
- www.formatGetReply(HTTP::LIFE_IS_GOOD);
-// cerr << "Size = " << www.getHeader().size() << " " <<
www.getHeader() << endl;
-
- hand->Network::writeNet(args->netfd, (boost::uint8_t
*)www.getHeader().c_str(), www.getHeader().size());
-// hand->writeNet(args->netfd, www.getHeader(), www.getHeader().size());
-// strcpy(thread_data.filespec, filespec.c_str());
-// thread_data.statistics = conndata->statistics;
-
- // Keep track of the network statistics
-// conndata->statistics->stopClock();
-// log_debug (_("Bytes read: %d"), www.getBytesIn());
-// log_debug (_("Bytes written: %d"), www.getBytesOut());
-// st.setBytes(www.getBytesIn() + www.getBytesOut());
-// conndata->statistics->addStats();
-
- if (filespec[filespec.size()-1] == '/') {
- filespec += "index.html";
- }
-
-// DiskStream filestream;
-// filestream.open(filespec);
-#if 0
- if (url != docroot) {
- log_debug (_("File to load is: %s"), filespec.c_str());
- log_debug (_("Parameters are: %s"), parameters.c_str());
- struct stat st;
- int filefd;
- size_t ret;
-#ifdef USE_STATISTICS
- struct timespec start;
- clock_gettime (CLOCK_REALTIME, &start);
-#endif
- if (stat(filespec.c_str(), &st) == 0) {
- filefd = ::open(filespec.c_str(), O_RDONLY);
- log_debug (_("File \"%s\" is %lld bytes in size, disk fd #%d"),
filespec,
- st.st_size, filefd);
- do {
- boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
- ret = read(filefd, buf->reference(), buf->size());
- if (ret == 0) { // the file is done
- break;
- }
- if (ret != buf->size()) {
- buf->resize(ret);
- log_debug("Got last data block from disk file, size
%d", buf->size());
- }
- log_debug("Read %d bytes from %s.", ret, filespec);
-#if 0
- hand->pushout(buf);
- hand->notifyout();
-#else
- // Don't bother with the outgoing que
- if (ret > 0) {
- ret = hand->writeNet(buf);
- }
-#endif
- } while(ret > 0);
- log_debug("Done transferring %s to net fd #%d",
- filespec, args->netfd);
- ::close(filefd); // close the disk file
- // See if this is a persistant connection
-// if (!www.keepAlive()) {
-// log_debug("Keep-Alive is off", www.keepAlive());
-// // hand->closeConnection();
-// }
-#ifdef USE_STATISTICS
- struct timespec end;
- clock_gettime (CLOCK_REALTIME, &end);
- log_debug("Read %d bytes from \"%s\" in %f seconds",
- st.st_size, filespec,
- (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec -
start.tv_nsec)/1e9)));
-#endif
- }
-
-// memset(args->filespec, 0, 256);
-// memcpy(->filespec, filespec.c_str(), filespec.size());
-// boost::thread sendthr(boost::bind(&stream_thread, args));
-// sendthr.join();
- }
-#endif
-
-#ifdef USE_STATISTICS
- struct timespec end;
- clock_gettime (CLOCK_REALTIME, &end);
- log_debug("Processing time for GET request was %f seconds",
- (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec -
start.tv_nsec)/1e9)));
-#endif
-// conndata->statistics->dump();
-// }
- } while(!hand->timetodie());
-
- log_debug("httphandler all done now finally...");
-
-} // end of httphandler
-
-} // end of extern C
+
+ Handler *hand = reinterpret_cast<Handler *>(args->handler);
+ Network net;
+ int timeout = 5000;
+
+ while(!hand->timetodie()) {
+ int limit = hand->getPollFDSize();
+ net.setTimeout(timeout);
+ if (limit > 0) {
+ struct pollfd *fds = hand->getPollFDPtr();
+ boost::shared_ptr< vector<struct pollfd> > hits;
+ try {
+// boost::shared_ptr< vector< int > >
hits(net.waitForNetData(limit, fds));
+ hits = net.waitForNetData(limit, fds);
+ vector<struct pollfd>::iterator it;
+ cerr << "Hits: " << hits->size() << endl;
+ cerr << "Pollfds: " << hand->getPollFDSize() << endl;
+ for (it = hits->begin(); it != hits->end(); it++) {
+ if ((it->revents & POLLRDHUP) || (it->revents & POLLNVAL))
{
+ log_debug("Revents has a POLLRDHUP or POLLNVAL set to
%d for fd #%d",
+ it->revents, it->fd);
+ hand->erasePollFD(it);
+ net.closeNet(it->fd);
+ }
+ log_debug("Got something on fd #%d, 0x%x", it->fd,
it->revents);
+ hand->getEntry(it->fd)(args);
+ net.closeNet(it->fd);
+ }
+ } catch (std::exception& e) {
+ log_error("Network connection was dropped: %s", e.what());
+ vector<struct pollfd>::const_iterator it;
+ if (hits) {
+ for (it = hits->begin(); it != hits->end(); it++) {
+ log_debug("Need to disconnect fd #%d, it got an
error.", (*it).fd);
+// hand->erasePollFD(*it);
+// net.closeNet(*it);
+ }
+ }
+ }
+ } else {
+ log_debug("nothing to wait for.");
+ hand->die();
+ }
+ }
+} // end of dispatch_handler
+
// local Variables:
// mode: C++
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9713: add new dispatch thread for a single I/O thread that can handle multiple,
rob <=