[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9701: add support for an array of po
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9701: add support for an array of pollfds, need for poll(). |
Date: |
Wed, 05 Nov 2008 19:31:49 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9701
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2008-11-05 19:31:49 -0700
message:
add support for an array of pollfds, need for poll().
modified:
libnet/handler.cpp
libnet/handler.h
=== modified file 'libnet/handler.cpp'
--- a/libnet/handler.cpp 2008-11-01 15:02:59 +0000
+++ b/libnet/handler.cpp 2008-11-06 02:31:49 +0000
@@ -59,7 +59,7 @@
Handler::~Handler()
{
// GNASH_REPORT_FUNCTION;
- closeConnection();
+// closeConnection();
_die = true;
notifyout();
notifyin();
@@ -160,6 +160,26 @@
}
}
+void
+Handler::addPollFD(struct pollfd &fd)
+{
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ return _pollfds.push_back(fd);
+}
+
+struct pollfd
+&Handler::getPollFD(int index)
+{
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ return _pollfds[index];
+}
+struct pollfd *
+Handler::getPollFDPtr()
+{
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ return &_pollfds[0];
+};
+
// Dump internal data.
void
Handler::dump()
@@ -169,6 +189,7 @@
_outgoing.dump();
}
+#if 0
size_t
Handler::readPacket(int fd)
{
@@ -205,6 +226,7 @@
}
return ret;
}
+#endif
// start the two thread handlers for the queues
bool
@@ -219,8 +241,28 @@
log_debug(_("Starting Handlers for port %d, tid %ld"),
args->port, get_thread_id());
+ struct pollfd *fds;
+ int nfds = 1;
+ Network net;
+ boost::shared_ptr<vector<int> > hits = net.waitForNetData(nfds, fds);
+ vector<int>::const_iterator it;
+#if 0
+ for (it = _pollfds.begin(); it != _pollfds.end(); it++) {
+// Buffer buf;
+// net.readNet(*it, buf.reference(), buf.size());
+ args->netfd = *it;
+ if (crcfile.getThreadingFlag()) {
+ if (args->port == port_offset + gnash::RTMPT_PORT) {
+ boost::thread handler(boost::bind(&http_handler, args));
+ }
+ } else {
+ callback[*it](args);
+ }
+ }
+#endif
+
// boost::thread outport(boost::bind(&netout_handler, args));
- boost::thread inport(boost::bind(&netin_handler, args));
+// boost::thread inport(boost::bind(&netin_handler, args));
#if 0
if (args->port == 4080) { // FIXME: hack alert!
@@ -231,15 +273,6 @@
}
#endif
-// We don't want to wait for the threads to complete, we
-// want to return to the main program so it can spawn another
-// thread for the next incoming connection.
-// inport.join();
-// outport.join();
-// handler.join();
-// if (_die) {
-// log_debug("Handler done...");
-// }
return true;
}
@@ -249,13 +282,14 @@
{
GNASH_REPORT_FUNCTION;
- Handler *hand = reinterpret_cast<Handler *>(args->handle);
+ Network *net = reinterpret_cast<Network *>(args->handler);
+ size_t ret;
log_debug("Starting to wait for data in net for fd #%d", args->netfd);
do {
boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
- size_t ret = hand->readNet(args->netfd, buf->reference(), buf->size(),
1);
+// ret = hand->readNet(args->netfd, buf->reference(), buf->size(), 1);
// cerr << (char *)buf->reference() << endl;
// the read timed out as there was no data, but the socket is still
open.
@@ -275,18 +309,19 @@
// if (ret < NETBUFSIZE) {
// buf->resize(ret);
// }
- hand->push(buf);
- hand->notify();
+// hand->push(buf);
+// hand->notify();
} else {
log_debug("no more data for fd #%d, exiting...", args->netfd);
- hand->die();
+// hand->die();
break;
}
- } while (!hand->timetodie());
+// } while (!hand->timetodie());
+ } while (ret > 0);
// We're done. Notify the other threads the socket is closed, and tell
them to die.
log_debug("Net In handler done for fd #%d...", args->netfd);
- hand->notify();
- hand->closeNet(args->netfd);
+// hand->notify();
+// hand->closeNet(args->netfd);
// hand->dump();
}
=== modified file 'libnet/handler.h'
--- a/libnet/handler.h 2008-11-01 15:02:59 +0000
+++ b/libnet/handler.h 2008-11-06 02:31:49 +0000
@@ -19,10 +19,24 @@
#ifndef __HANDLER_H__
#define __HANDLER_H__ 1
+#ifdef HAVE_CONFIG_H
+#include "gnashconfig.h"
+#endif
+
#include <boost/cstdint.hpp>
+#include <boost/thread/mutex.hpp>
//#include <boost/thread/condition.hpp>
#include <string>
#include <deque>
+#include <map>
+
+#ifdef HAVE_POLL
+# include <sys/poll.h>
+#else
+# ifdef HAVE_EPOLL
+# include <sys/epoll.h>
+# endif
+#endif
#include "log.h"
#include "network.h"
@@ -35,9 +49,11 @@
namespace gnash
{
-class Handler : public gnash::Network
+
+class Handler
{
public:
+
DSOEXPORT Handler();
~Handler();
@@ -53,10 +69,12 @@
typedef struct {
int netfd;
int port;
- void *handle;
+ void *handler;
std::string filespec;
} thread_params_t ;
+ typedef void entry_t (thread_params_t *);
+
// Specify which queue should be used
typedef enum { INCOMING, OUTGOING } fifo_e;
@@ -124,11 +142,12 @@
void waitin() { _incoming.wait(); };
void waitout() { _outgoing.wait(); };
- size_t readPacket(int fd);
+// size_t readPacket(int fd);
// start the two thread handlers for the queues
bool DSOEXPORT start(thread_params_t *args);
+#if 0
/// \brief Write a Buffer the network connection.
///
/// @param fd The file descriptor to write the data too.
@@ -146,7 +165,8 @@
/// @return The number of bytes sent
int DSOEXPORT writeNet(boost::shared_ptr<amf::Buffer> &buf)
{ return Network::writeNet(buf->reference(), buf->size()); };
-
+#endif
+
// Dump internal data.
void dump();
@@ -156,12 +176,24 @@
#endif
void die() { _die = true; _outgoing.notify(); };
bool timetodie() { return _die; };
+
+ void addPollFD(struct pollfd &fd);
+ struct pollfd &getPollFD(int index);
+ struct pollfd *getPollFDPtr();
private:
- bool _die;
- int _netfd;
- CQue _incoming;
- CQue _outgoing;
+ bool _die;
+ int _netfd;
+ CQue _incoming;
+ CQue _outgoing;
+ /// \var Handler::_handlers
+ /// Keep a list of all active network connections
+ std::map<int, entry_t *> _handlers;
+#ifdef HAVE_POLL
+ // This is the mutex that controls access to the que.
+ std::vector<struct pollfd> _pollfds;
+ boost::mutex _poll_mutex;
+#endif
};
// This is the thread for all incoming network connections, which
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9701: add support for an array of pollfds, need for poll().,
rob <=