[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9768: handle persistant and non pers
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9768: handle persistant and non persistant network connections based on the Keep-Alive fields. |
Date: |
Fri, 21 Nov 2008 22:12:52 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9768
committer: address@hidden
branch nick: rtmp
timestamp: Fri 2008-11-21 22:12:52 -0700
message:
handle persistant and non persistant network connections based on the
Keep-Alive fields.
modified:
cygnal/cygnal.cpp
libnet/http.cpp
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-11-21 16:41:31 +0000
+++ b/cygnal/cygnal.cpp 2008-11-22 05:12:52 +0000
@@ -175,7 +175,7 @@
{ 't', "testing", Arg_parser::no },
{ 'a', "admin", Arg_parser::no },
{ 'r', "root", Arg_parser::yes },
- { 'c', "threads", Arg_parser::no }
+ { 'm', "multithreaded", Arg_parser::no }
};
Arg_parser parser(argc, argv, opts);
@@ -234,7 +234,7 @@
case 'r':
docroot = parser.argument(i).c_str();
break;
- case 'c':
+ case 'm':
crcfile.setThreadingFlag(true);
break;
case 'n':
@@ -497,7 +497,50 @@
// Start a server on this tcp/ip port.
fd = net.createServer(args->port);
log_debug("Starting Connection Handler for fd #%d, port %hd", fd,
args->port);
- // FIXME: this runs forever, we probably want a cleaner way to test for
the end of time.
+
+ // Get the number of cpus in this system. For multicore
+ // systems we'll get better load balancing if we keep all the
+ // cpus busy. So a pool of threrads is started for each cpu,
+ // the default being just one. Each thread is reponsible for
+ // handling part of the total active file descriptors.
+#ifdef HAVE_SYSCONF
+ long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
+ log_debug("This system has %d cpus.", ncpus);
+#endif
+ size_t nfds = crcfile.getFDThread();
+
+ log_debug("This system is configured for %d file descriptors to be watched
by each thread.", nfds);
+
+ // cap the number of threads
+ int cpu = 0;
+ cpu = (cpu % ncpus);
+
+ // Get the next thread ID to hand off handling this file
+ // descriptor to. If the limit for threads per cpu hasn't been
+ // set or is set to 0, assume one thread per processor by
+ // default. There won't even be threads for each cpu if
+ // threading has been disabled in the cygnal config file.
+ int spawn_limit = 0;
+ if (nfds == 0) {
+ spawn_limit = ncpus;
+ } else {
+ spawn_limit = ncpus * nfds;
+ }
+
+ // Rotate in a range of 0 to the limit.
+ tid = (tid + 1) % (spawn_limit + 1);
+ log_debug("thread ID %d for fd #%d", tid, args->netfd);
+
+ Handler *hand = new Handler;
+ args->handler = hand;
+ if (crcfile.getThreadingFlag()) {
+ log_debug("Multi-threaded mode for server on fd #%d", fd);
+// log_debug("Starting handler: %x for fd #%d", (void *)hand,
args->netfd);
+ boost::thread handler(boost::bind(&dispatch_handler, args));
+ }
+
+ // FIXME: this runs forever, we probably want a cleaner way to
+ // test for the end of time.
do {
net.setPort(args->port);
if (netdebug) {
@@ -511,39 +554,6 @@
args->netfd = net.newConnection(true, fd);
log_debug("New network connection for fd #%d", args->netfd);
- // Get the number of cpus in this system. For multicore
- // systems we'll get better load balancing if we keep all the
- // cpus busy. So a pool of threrads is started for each cpu,
- // the default being just one. Each thread is reponsible for
- // handling part of the total active file descriptors.
-#ifdef HAVE_SYSCONF
- long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
- log_debug("This system has %d cpus.", ncpus);
-#endif
- size_t nfds = crcfile.getFDThread();
-
- log_debug("This system is configured for %d file descriptors to be
watched by each thread.", nfds);
-
- // cap the number of threads
- int cpu = (cpu % ncpus);
-
- // Get the next thread ID to hand off handling this file
- // descriptor to. If the limit for threads per cpu hasn't been
- // set or is set to 0, assume one thread per processor by
- // default. There won't even be threads for each cpu if
- // threading has been disabled in the cygnal config file.
- int spawn_limit = 0;
- if (nfds == 0) {
- spawn_limit = ncpus;
- } else {
- spawn_limit = ncpus * nfds;
- }
-
- // Rotate in a range of 0 to the limit.
- tid = (tid + 1) % (spawn_limit + 1);
- log_debug("thread ID %d for fd #%d", tid, args->netfd);
-
- Handler *hand = new Handler;
struct pollfd fds;
fds.fd = args->netfd;
fds.events = POLLIN |POLLRDHUP;
@@ -554,28 +564,24 @@
// hand->addPollFD(fds, rtmp_handler);
// }
// if supporting multiple threads
- args->handler = hand;
- if (crcfile.getThreadingFlag()) {
- log_debug("Multi-threaded mode for fd #%d", args->netfd);
- log_debug("Starting handler: %x for fd #%d", (void *)hand,
args->netfd);
+ if (!crcfile.getThreadingFlag()) {
+ log_debug("Single threaded mode for fd #%d", args->netfd);
+ dispatch_handler(args);
#if 0
- if (args->port == (port_offset + RTMPT_PORT)) {
- boost::thread handler(boost::bind(&http_handler, args));
- }
- if (args->port == (port_offset + RTMP_PORT)) {
- boost::thread handler(boost::bind(&rtmp_handler, args));
- }
-#endif
- boost::thread handler(boost::bind(&dispatch_handler, args));
+ if (args->port == (port_offset + RTMPT_PORT)) {
+ boost::thread handler(boost::bind(&http_handler, args));
+ }
+ if (args->port == (port_offset + RTMP_PORT)) {
+ boost::thread handler(boost::bind(&rtmp_handler, args));
+ }
} else { // single threaded
- log_debug("Single threaded mode for fd #%d", args->netfd);
- dispatch_handler(args);
+#endif
}
// net.closeNet(args->netfd); // this shuts down this socket
connection
log_debug("Restarting loop for next connection for port %d...",
args->port);
} while(!done);
- // All threads should exit now.
+ // All threads should wake up now.
alldone.notify_all();
} // end of connection_handler
@@ -592,6 +598,7 @@
while(!hand->timetodie()) {
int limit = hand->getPollFDSize();
net.setTimeout(timeout);
+ cerr << "LIMIT is: " << limit << endl;
if (limit > 0) {
struct pollfd *fds = hand->getPollFDPtr();
boost::shared_ptr< vector<struct pollfd> > hits;
@@ -607,10 +614,14 @@
it->revents, it->fd);
hand->erasePollFD(it);
net.closeNet(it->fd);
+ continue;
}
log_debug("Got something on fd #%d, 0x%x", it->fd,
it->revents);
hand->getEntry(it->fd)(args);
- hand->erasePollFD(it);
+// if (!crcfile.getThreadingFlag()) {
+// hand->die();
+// }
+ hand->erasePollFD(it->fd);
net.closeNet(it->fd);
}
} catch (std::exception& e) {
@@ -625,8 +636,10 @@
}
}
} else {
- log_debug("nothing to wait for.");
- hand->die();
+ log_debug("nothing to wait for...");
+ sleep(1);
+ return;
+ //hand->wait();
}
}
} // end of dispatch_handler
=== modified file 'libnet/http.cpp'
--- a/libnet/http.cpp 2008-11-22 00:28:11 +0000
+++ b/libnet/http.cpp 2008-11-22 05:12:52 +0000
@@ -1501,11 +1501,11 @@
void
http_handler(Handler::thread_params_t *args)
{
- GNASH_REPORT_FUNCTION;
+// GNASH_REPORT_FUNCTION;
// struct thread_params thread_data;
string url, filespec, parameters;
string::size_type pos;
- Network *net = reinterpret_cast<Network *>(args->handler);
+ Handler *hand = reinterpret_cast<Handler *>(args->handler);
HTTP www;
bool done = false;
// www.setHandler(net);
@@ -1519,30 +1519,12 @@
// Wait for data, and when we get it, process it.
do {
-#ifdef THREADED_IO
- hand->wait();
- if (hand->timetodie()) {
- log_debug("Not waiting no more, no more for more HTTP data for fd
#%d...", args->netfd);
- map<int, Handler *>::iterator hit = handlers.find(args->netfd);
- if ((*hit).second) {
- log_debug("Removing handle %x for HTTP on fd #%d",
- (void *)hand, args->netfd);
- handlers.erase(args->netfd);
- delete (*hit).second;
- }
- return;
- }
-#endif
#ifdef USE_STATISTICS
struct timespec start;
clock_gettime (CLOCK_REALTIME, &start);
#endif
-// conndata->statistics->setFileType(NetStats::RTMPT);
-// conndata->statistics->startClock();
-// args->netfd = www.getFileFd();
-// www.recvMsg(5);
www.recvMsg(args->netfd);
if (!www.processGetRequest()) {
@@ -1655,11 +1637,12 @@
}
log_debug("http_handler all done transferring requested file...");
// cache.dump();
- done = true;
+// done = true;
+ // Unless the Keep-Alive flag is set, this isn't a persisant network
+ // connection.
if (!www.keepAlive()) {
log_debug("Keep-Alive is off", www.keepAlive());
-// www.closeConnection();
done = true;
}
#if 0
@@ -1679,7 +1662,7 @@
// See if this is a persistant connection
// if (!www.keepAlive()) {
// log_debug("Keep-Alive is off", www.keepAlive());
- hand->closeConnection();
+ hand->closeNet();
// }
}
}
@@ -1698,6 +1681,10 @@
// } while(!hand->timetodie());
} while(done != true);
+// www.closeNet(args->netfd);
+// hand->erasePollFD(args->netfd);
+ hand->notify();
+
log_debug("http_handler all done now finally...");
} // end of httphandler
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9768: handle persistant and non persistant network connections based on the Keep-Alive fields.,
rob <=