[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9679: move http_handler to here. Add
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9679: move http_handler to here. Add new options. |
Date: |
Sat, 01 Nov 2008 08:58:30 -0600 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9679
committer: address@hidden
branch nick: rtmp
timestamp: Sat 2008-11-01 08:58:30 -0600
message:
move http_handler to here. Add new options.
modified:
cygnal/cygnal.cpp
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-09-09 03:49:35 +0000
+++ b/cygnal/cygnal.cpp 2008-11-01 14:58:30 +0000
@@ -15,8 +15,7 @@
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-//
-
+//
#ifdef HAVE_CONFIG_H
#include "gnashconfig.h"
@@ -51,12 +50,15 @@
#include "log.h"
#include "crc.h"
#include "rtmp.h"
+#include "rtmp_server.h"
#include "http.h"
+#include "utility.h"
#include "limits.h"
#include "netstats.h"
#include "statistics.h"
//#include "stream.h"
#include "gmemory.h"
+#include "diskstream.h"
#include "arg_parser.h"
// classes internal to Cygnal
@@ -73,13 +75,20 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition.hpp>
-using gnash::log_debug;
+//using gnash::log_debug;
using namespace std;
+using namespace gnash;
using namespace cygnal;
-using namespace gnash;
using namespace amf;
+// Keep a list of all active network connections
+namespace gnash {
+extern map<int, gnash::Handler *> handlers;
+}
+
static void usage();
static void version_and_copyright();
static void cntrlc_handler(int sig);
@@ -89,9 +98,6 @@
LogFile& dbglogfile = LogFile::getDefaultInstance();
-// The rcfile is loaded and parsed here:
-CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
-
// Toggles very verbose debugging info from the network Network class
static bool netdebug = false;
@@ -113,16 +119,34 @@
// conflict with apache on the same machine.
static int port_offset = 0;
-// Keep a list of all active network connections
-namespace gnash {
-extern map<int, Handler *> handlers;
-}
+// Toggle the admin thread
+static bool admin = false;
// Admin commands are small
const int ADMINPKTSIZE = 80;
// end of globals
+// The rcfile is loaded and parsed here:
+CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
+
+// This mutex is used to signify when all the threads are done.
+static boost::condition alldone;
+static boost::mutex alldone_mutex;
+
+static void
+usage()
+{
+ cout << _("cygnal -- a streaming media server.") << endl
+ << endl
+ << _("Usage: cygnal [options...]") << endl
+ << _(" -h, --help Print this help and exit") << endl
+ << _(" -V, --version Print version information and exit") <<
endl
+ << _(" -v, --verbose Output verbose debug info") << endl
+ << _(" -p --port-offset RTMPT port offset") << endl
+ << endl;
+}
+
int
main(int argc, char *argv[])
{
@@ -140,7 +164,9 @@
{ 'p', "port-offset", Arg_parser::yes },
{ 'v', "verbose", Arg_parser::no },
{ 'd', "dump", Arg_parser::no },
- { 'n', "netdebug", Arg_parser::no }
+ { 'n', "netdebug", Arg_parser::no },
+ { 't', "testing", Arg_parser::no },
+ { 'a', "admin", Arg_parser::no }
};
Arg_parser parser(argc, argv, opts);
@@ -180,6 +206,12 @@
case 'V':
version_and_copyright();
exit(0);
+ case 't':
+ crcfile.setTestingFlag(true);
+ break;
+ case 'a':
+ admin = true;
+ break;
case 'v':
dbglogfile.setVerbosity();
log_debug (_("Verbose output turned on"));
@@ -204,44 +236,48 @@
act.sa_handler = cntrlc_handler;
sigaction (SIGINT, &act, NULL);
+ boost::mutex::scoped_lock lk(alldone_mutex);
+
// struct thread_params rtmp_data;
// struct thread_params ssl_data;
// rtmp_data.port = port_offset + 1935;
// boost::thread rtmp_port(boost::bind(&rtmp_thread, &rtmp_data));
-#if 1
- // Admin handler
- Handler::thread_params_t admin_data;
- admin_data.port = gnash::ADMIN_PORT;
- boost::thread adminhandler(boost::bind(&admin_handler, &admin_data));
-#endif
-
-#if 1
- // Incomming connection handler for port 80, HTTP and RTMPT. As port 80
requires
- // root access, cygnal supports a "port offset" for debugging and
development of
- // the server. Since this port offset changes the constant to test for
which protocol,
- // we pass the info to the start thread so it knows which handler to
invoke.
+ // Incomming connection handler for port 80, HTTP and
+ // RTMPT. As port 80 requires root access, cygnal supports a
+ // "port offset" for debugging and development of the
+ // server. Since this port offset changes the constant to test
+ // for which protocol, we pass the info to the start thread so
+ // it knows which handler to invoke.
Handler::thread_params_t http_data;
http_data.port = port_offset + gnash::RTMPT_PORT;
http_data.netfd = 0;
http_data.filespec = docroot;
boost::thread http_thread(boost::bind(&connection_handler, &http_data));
-#endif
-#if 1
- // Incomming connection handler for port 1935, RTMP. As RTMP is not a
priviledged port,
- // we just open it without an offset.
+ // Incomming connection handler for port 1935, RTMP. As RTMP
+ // is not a priviledged port, we just open it without an offset.
Handler::thread_params_t rtmp_data;
rtmp_data.port = gnash::RTMP_PORT;
rtmp_data.netfd = 0;
rtmp_data.filespec = docroot;
boost::thread rtmp_thread(boost::bind(&connection_handler, &rtmp_data));
-#endif
+
+ // Admin handler
+ if (admin) {
+ Handler::thread_params_t admin_data;
+ admin_data.port = gnash::ADMIN_PORT;
+ boost::thread admin_thread(boost::bind(&admin_handler, &admin_data));
+ admin_thread.join();
+ }
// wait for the thread to finish
-// adminhandler.join();
// http_thread.join();
- rtmp_thread.join();
+// rtmp_thread.join();
+
+ // Wait for all the threads to die
+ alldone.wait(lk);
+
log_debug (_("Cygnal done..."));
return(0);
@@ -298,20 +334,6 @@
<< endl;
}
-
-static void
-usage()
-{
- cout << _("cygnal -- a streaming media server.") << endl
- << endl
- << _("Usage: cygnal [options...]") << endl
- << _(" -h, --help Print this help and exit") << endl
- << _(" -V, --version Print version information and exit") <<
endl
- << _(" -v, --verbose Output verbose debug info") << endl
- << _(" -p --port-offset RTMPT port offset") << endl
- << endl;
-}
-
// FIXME: this function could be tweaked for better performance
void
admin_handler(Handler::thread_params_t *args)
@@ -419,6 +441,9 @@
net.closeNet(); // this shuts down this socket connection
}
net.closeConnection(); // this shuts down the server on this
connection
+
+ // All threads should exit now.
+ alldone.notify_all();
}
void
@@ -428,12 +453,12 @@
int fd = 0;
// list<Handler *> *handlers = reinterpret_cast<list<Handler *>
*>(args->handle);
- log_debug("Starting Connection Handler for fd #%d, port %hd", args->netfd,
args->port);
Network net;
if (netdebug) {
net.toggleDebug(true);
}
fd = net.createServer(args->port);
+ log_debug("Starting Connection Handler for fd #%d, port %hd", fd,
args->port);
// FIXME: this runs forever, we probably want a cleaner way to test for
the end of time.
do {
Handler *hand = new Handler;
@@ -445,12 +470,189 @@
args->handle = hand;
log_debug("Adding handler: %x for fd #%d", (void *)hand, args->netfd);
handlers[args->netfd] = hand;
- hand->start(args);
+
+ if (crcfile.getThreadingFlag()) {
+ hand->start(args);
+ } else {
+ log_debug(_("Starting Handlers for port %d, tid %ld"),
+ args->port, get_thread_id());
+
+ if (args->port == (port_offset + RTMPT_PORT)) {
+ boost::thread handler(boost::bind(&http_handler, args));
+ }
+ if (args->port == RTMP_PORT) {
+ boost::thread handler(boost::bind(&rtmp_handler, args));
+ }
+ }
log_debug("Restarting loop for next connection for port %d...",
args->port);
} while(1);
+
+ // All threads should exit now.
+ alldone.notify_all();
+
} // end of connection_handler
-//} // end of cygnal namespace
+extern "C" {
+void
+http_handler(Handler::thread_params_t *args)
+{
+ GNASH_REPORT_FUNCTION;
+// struct thread_params thread_data;
+ string url, filespec, parameters;
+ string::size_type pos;
+ Handler *hand = reinterpret_cast<Handler *>(args->handle);
+ HTTP www;
+ www.setHandler(hand);
+
+ log_debug(_("Starting HTTP Handler for fd #%d, tid %ld"),
+ args->netfd, get_thread_id());
+
+ string docroot = args->filespec;
+
+ log_debug("Starting to wait for data in net for fd #%d", args->netfd);
+
+ // Wait for data, and when we get it, process it.
+ do {
+#ifdef THREADED_IO
+ hand->wait();
+ if (hand->timetodie()) {
+ log_debug("Not waiting no more, no more for more HTTP data for fd
#%d...", args->netfd);
+ map<int, Handler *>::iterator hit = handlers.find(args->netfd);
+ if ((*hit).second) {
+ log_debug("Removing handle %x for HTTP on fd #%d",
+ (void *)hand, args->netfd);
+ handlers.erase(args->netfd);
+ delete (*hit).second;
+ }
+ return;
+ }
+#endif
+
+#ifdef USE_STATISTICS
+ struct timespec start;
+ clock_gettime (CLOCK_REALTIME, &start);
+#endif
+
+// conndata->statistics->setFileType(NetStats::RTMPT);
+// conndata->statistics->startClock();
+// args->netfd = www.getFileFd();
+// www.recvMsg(5);
+ www.recvMsg(args->netfd);
+
+ if (!www.processGetRequest()) {
+ hand->die(); // tell all the threads for this connection to
die
+ hand->notifyin();
+ log_debug("Net HTTP done for fd #%d...", args->netfd);
+// hand->closeNet(args->netfd);
+ return;
+ }
+ url = docroot;
+ url += www.getURL();
+ pos = url.find("?");
+ filespec = url.substr(0, pos);
+ parameters = url.substr(pos + 1, url.size());
+ // Get the file size for the HTTP header
+
+ if (www.getFileStats(filespec) == amf::AMF::FILETYPE_ERROR) {
+ www.formatErrorResponse(HTTP::NOT_FOUND);
+ }
+ // Send the reply
+ www.formatGetReply(HTTP::LIFE_IS_GOOD);
+// cerr << "Size = " << www.getHeader().size() << " " <<
www.getHeader() << endl;
+
+ hand->Network::writeNet(args->netfd, (boost::uint8_t
*)www.getHeader().c_str(), www.getHeader().size());
+// hand->writeNet(args->netfd, www.getHeader(), www.getHeader().size());
+// strcpy(thread_data.filespec, filespec.c_str());
+// thread_data.statistics = conndata->statistics;
+
+ // Keep track of the network statistics
+// conndata->statistics->stopClock();
+// log_debug (_("Bytes read: %d"), www.getBytesIn());
+// log_debug (_("Bytes written: %d"), www.getBytesOut());
+// st.setBytes(www.getBytesIn() + www.getBytesOut());
+// conndata->statistics->addStats();
+
+ if (filespec[filespec.size()-1] == '/') {
+ filespec += "index.html";
+ }
+
+// DiskStream filestream;
+// filestream.open(filespec);
+#if 0
+ if (url != docroot) {
+ log_debug (_("File to load is: %s"), filespec.c_str());
+ log_debug (_("Parameters are: %s"), parameters.c_str());
+ struct stat st;
+ int filefd;
+ size_t ret;
+#ifdef USE_STATISTICS
+ struct timespec start;
+ clock_gettime (CLOCK_REALTIME, &start);
+#endif
+ if (stat(filespec.c_str(), &st) == 0) {
+ filefd = ::open(filespec.c_str(), O_RDONLY);
+ log_debug (_("File \"%s\" is %lld bytes in size, disk fd #%d"),
filespec,
+ st.st_size, filefd);
+ do {
+ boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
+ ret = read(filefd, buf->reference(), buf->size());
+ if (ret == 0) { // the file is done
+ break;
+ }
+ if (ret != buf->size()) {
+ buf->resize(ret);
+ log_debug("Got last data block from disk file, size
%d", buf->size());
+ }
+ log_debug("Read %d bytes from %s.", ret, filespec);
+#if 0
+ hand->pushout(buf);
+ hand->notifyout();
+#else
+ // Don't bother with the outgoing que
+ if (ret > 0) {
+ ret = hand->writeNet(buf);
+ }
+#endif
+ } while(ret > 0);
+ log_debug("Done transferring %s to net fd #%d",
+ filespec, args->netfd);
+ ::close(filefd); // close the disk file
+ // See if this is a persistant connection
+// if (!www.keepAlive()) {
+// log_debug("Keep-Alive is off", www.keepAlive());
+// // hand->closeConnection();
+// }
+#ifdef USE_STATISTICS
+ struct timespec end;
+ clock_gettime (CLOCK_REALTIME, &end);
+ log_debug("Read %d bytes from \"%s\" in %f seconds",
+ st.st_size, filespec,
+ (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec -
start.tv_nsec)/1e9)));
+#endif
+ }
+
+// memset(args->filespec, 0, 256);
+// memcpy(->filespec, filespec.c_str(), filespec.size());
+// boost::thread sendthr(boost::bind(&stream_thread, args));
+// sendthr.join();
+ }
+#endif
+
+#ifdef USE_STATISTICS
+ struct timespec end;
+ clock_gettime (CLOCK_REALTIME, &end);
+ log_debug("Processing time for GET request was %f seconds",
+ (float)((end.tv_sec - start.tv_sec) + ((end.tv_nsec -
start.tv_nsec)/1e9)));
+#endif
+// conndata->statistics->dump();
+// }
+ } while(!hand->timetodie());
+
+ log_debug("httphandler all done now finally...");
+
+} // end of httphandler
+
+} // end of extern C
// local Variables:
// mode: C++
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9679: move http_handler to here. Add new options.,
rob <=