[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9913: fix usage of boost::shared_ptr
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9913: fix usage of boost::shared_ptr. |
Date: |
Sat, 27 Dec 2008 18:44:57 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9913
committer: address@hidden
branch nick: rtmp
timestamp: Sat 2008-12-27 18:44:57 -0700
message:
fix usage of boost::shared_ptr.
refactor heavily to deal with the new network engine.
completely change how sendMsg() and recvMsg() work.
modified:
libnet/rtmp.cpp
libnet/rtmp.h
=== modified file 'libnet/rtmp.cpp'
--- a/libnet/rtmp.cpp 2008-12-20 17:11:55 +0000
+++ b/libnet/rtmp.cpp 2008-12-28 01:44:57 +0000
@@ -210,58 +210,59 @@
}
void
-RTMP::addProperty(boost::shared_ptr<amf::Element> el)
+RTMP::addProperty(amf::Element &el)
{
// GNASH_REPORT_FUNCTION;
- _properties[el->getName()] = el;
+ _properties[el.getName()] = el;
}
void
-RTMP::addProperty(char *name, boost::shared_ptr<amf::Element> el)
+RTMP::addProperty(char *name, amf::Element &el)
{
// GNASH_REPORT_FUNCTION;
_properties[name] = el;
}
-boost::shared_ptr<amf::Element>
+amf::Element &
RTMP::getProperty(const std::string &name)
{
// GNASH_REPORT_FUNCTION;
// return _properties[name.c_str()];
- map<const char *, boost::shared_ptr<amf::Element> >::iterator it;
+ map<const char *, amf::Element &>::iterator it;
for (it = _properties.begin(); it != _properties.end(); it++) {
const char *title = it->first;
- boost::shared_ptr<amf::Element> el = it->second;
+ amf::Element &el = it->second;
if (name == title) {
// log_debug("found variable in RTMP packet: %s", name);
return el;
}
}
- boost::shared_ptr<amf::Element> el;
- return el;
}
-RTMP::rtmp_head_t *
-RTMP::decodeHeader(boost::shared_ptr<amf::Buffer> buf)
+boost::shared_ptr<RTMP::rtmp_head_t>
+RTMP::decodeHeader(amf::Buffer &buf)
{
// GNASH_REPORT_FUNCTION;
- return decodeHeader(buf->reference());
+ return decodeHeader(buf.reference());
}
-RTMP::rtmp_head_t *
+boost::shared_ptr<RTMP::rtmp_head_t>
RTMP::decodeHeader(boost::uint8_t *in)
{
GNASH_REPORT_FUNCTION;
-
+
+ boost::shared_ptr<RTMP::rtmp_head_t> head(new RTMP::rtmp_head_t);
boost::uint8_t *tmpptr = in;
-
- _header.channel = *tmpptr & RTMP_INDEX_MASK;
- log_debug (_("The AMF channel index is %d"), _header.channel);
-
- _header.head_size = headerSize(*tmpptr++);
- log_debug (_("The header size is %d"), _header.head_size);
-
- if (_header.head_size >= 4) {
+
+ head->channel = *tmpptr & RTMP_INDEX_MASK;
+ log_debug (_("The AMF channel index is %d"), head->channel);
+
+ head->head_size = headerSize(*tmpptr++);
+ log_debug (_("The header size is %d"), head->head_size);
+
+ cerr << "FIXME3: " << hexify(in, head->head_size, false) << endl;
+
+ if (head->head_size >= 4) {
_mystery_word = *tmpptr++;
_mystery_word = (_mystery_word << 8) + *tmpptr++;
_mystery_word = (_mystery_word << 8) + *tmpptr++;
@@ -269,36 +270,36 @@
log_debug(_("The mystery word is: %d"), _mystery_word);
}
- if (_header.head_size >= 8) {
- _header.bodysize = *tmpptr++;
- _header.bodysize = (_header.bodysize << 8) + *tmpptr++;
- _header.bodysize = (_header.bodysize << 8) + *tmpptr++;
- _header.bodysize = _header.bodysize & 0xffffff;
- log_debug(_("The body size is: %d"), _header.bodysize);
+ if (head->head_size >= 8) {
+ head->bodysize = *tmpptr++;
+ head->bodysize = (head->bodysize << 8) + *tmpptr++;
+ head->bodysize = (head->bodysize << 8) + *tmpptr++;
+ head->bodysize = head->bodysize & 0xffffff;
+ log_debug(_("The body size is: %d"), head->bodysize);
}
- if (_header.head_size >= 8) {
+ if (head->head_size >= 8) {
boost::uint8_t byte = *tmpptr;
- _header.type = (content_types_e)byte;
+ head->type = (content_types_e)byte;
tmpptr++;
- if (_header.type <= RTMP::INVOKE ) {
- log_debug(_("The type is: %s"), content_str[_header.type]);
+ if (head->type <= RTMP::INVOKE ) {
+ log_debug(_("The type is: %s"), content_str[head->type]);
} else {
- log_debug(_("The type is: 0x%x"), _header.type);
+ log_debug(_("The type is: 0x%x"), head->type);
}
}
- if (_header.head_size == 1) {
- if (_header.channel == RTMP_SYSTEM_CHANNEL) {
- _header.bodysize = sizeof(boost::uint16_t) * 3;
- log_debug("Got a one byte header system message: %s", hexify(in,
_header.bodysize, false));
+ if (head->head_size == 1) {
+ if (head->channel == RTMP_SYSTEM_CHANNEL) {
+ head->bodysize = sizeof(boost::uint16_t) * 3;
+ log_debug("Got a one byte header system message: %s", hexify(in,
head->bodysize, false));
} else {
- log_debug("Got a continuation packet for channel #%d",
_header.channel);
- _header.bodysize = 0;
+ log_debug("Got a continuation packet for channel #%d",
head->channel);
+ head->bodysize = 0;
}
}
-// switch(_header.type) {
+// switch(head->type) {
// case CHUNK_SIZE:
// case BYTES_READ:
// case PING:
@@ -318,13 +319,14 @@
// break;
// };
- if (_header.head_size == 12) {
- _header.src_dest = *(reinterpret_cast<RTMPMsg::rtmp_source_e
*>(tmpptr));
+ if (head->head_size == 12) {
+ head->src_dest = *(reinterpret_cast<RTMPMsg::rtmp_source_e *>(tmpptr));
tmpptr += sizeof(unsigned int);
- log_debug(_("The source/destination is: %x"), _header.src_dest);
+ log_debug(_("The source/destination is: %x"), head->src_dest);
}
- return &_header;
+ return head;
+
}
/// \brief \ Each RTMP header consists of the following:
@@ -412,36 +414,39 @@
return buf;
}
+#if 0
bool
-RTMP::packetRead(boost::shared_ptr<amf::Buffer> buf)
+RTMP::packetRead(amf::Buffer &buf)
{
GNASH_REPORT_FUNCTION;
// int packetsize = 0;
size_t amf_index, headersize;
- boost::uint8_t *ptr = buf->reference();
- boost::uint8_t *tooFar = ptr+buf->size();
+ boost::uint8_t *ptr = buf.reference();
+ boost::uint8_t *tooFar = ptr+buf.size();
AMF amf;
+
+ ptr += 1; // skip past the RTMP header byte
- amf_index = *buf->reference() & RTMP_INDEX_MASK;
- headersize = headerSize(*buf->reference());
+ amf_index = *ptr & RTMP_INDEX_MASK;
+ headersize = headerSize(*buf.reference());
log_debug (_("The Header size is: %d"), headersize);
log_debug (_("The AMF index is: 0x%x"), amf_index);
-// if (headersize > 1) {
-// packetsize = decodeHeader(ptr);
+ if (headersize > 1) {
+ RTMP::rtmp_head_t *rthead = decodeHeader(ptr);
// if (packetsize) {
// log_debug (_("Read first RTMP packet header of size %d"),
packetsize);
// } else {
// log_error (_("Couldn't read first RTMP packet header"));
// return false;
// }
-// }
+ }
#if 1
- boost::uint8_t *end = buf->remove(0xc3);
+ boost::uint8_t *end = buf.remove(0xc3);
#else
- boost::uint8_t *end = buf->find(0xc3);
+ boost::uint8_t *end = buf.find(0xc3);
log_debug("END is %x", (void *)end);
*end = '*';
#endif
@@ -462,13 +467,13 @@
}
ptr += 1;
size_t actual_size = static_cast<size_t>(_header.bodysize -
AMF_HEADER_SIZE);
- log_debug("Total size in header is %d, buffer size is: %d",
_header.bodysize, buf->size());
+ log_debug("Total size in header is %d, buffer size is: %d",
_header.bodysize, buf.size());
// buf->dump();
- if (buf->size() < actual_size) {
+ if (buf.size() < actual_size) {
log_debug("FIXME: MERGING");
// buf = _handler->merge(buf); FIXME needs to use shared_ptr
}
- while ((ptr - buf->begin()) < static_cast<int>(actual_size)) {
+ while ((ptr - buf.begin()) < static_cast<int>(actual_size)) {
boost::shared_ptr<amf::Element> el = amf.extractProperty(ptr, tooFar);
addProperty(el);
// el->dump(); // FIXME: dump the AMF objects as they are read
in
@@ -492,16 +497,17 @@
return true;
}
+#endif
void
RTMP::dump()
{
cerr << "RTMP packet contains " << _properties.size() << " variables." <<
endl;
- map<const char *, boost::shared_ptr<amf::Element> >::iterator it;
+ map<const char *, amf::Element &>::iterator it;
for (it = _properties.begin(); it != _properties.end(); it++) {
// const char *name = it->first;
- boost::shared_ptr<amf::Element> el = it->second;
- el->dump();
+ amf::Element &el = it->second;
+ el.dump();
}
}
@@ -527,14 +533,13 @@
// two paramters are required.
// This seems to be a ping message, 12 byte header, system channel 2
// 02 00 00 00 00 00 06 04 00 00 00 00 00 00 00 00 00 00
-RTMP::rtmp_ping_t *
+boost::shared_ptr<RTMP::rtmp_ping_t>
RTMP::decodePing(boost::uint8_t *data)
{
GNASH_REPORT_FUNCTION;
boost::uint8_t *ptr = reinterpret_cast<boost::uint8_t *>(data);
- rtmp_ping_t *ping = new rtmp_ping_t;
- memset(ping, 0, sizeof(rtmp_ping_t));
+ boost::shared_ptr<rtmp_ping_t> ping(new rtmp_ping_t);
// All the data fields in a ping message are 2 bytes long.
boost::uint16_t type = ntohs(*reinterpret_cast<boost::uint16_t *>(ptr));
@@ -555,11 +560,11 @@
return ping;
}
-RTMP::rtmp_ping_t *
-RTMP::decodePing(boost::shared_ptr<amf::Buffer> buf)
+boost::shared_ptr<RTMP::rtmp_ping_t>
+RTMP::decodePing(amf::Buffer &buf)
{
GNASH_REPORT_FUNCTION;
- return decodePing(buf->reference());
+ return decodePing(buf.reference());
}
// Decode the result we get from the server after we've made a request.
@@ -635,7 +640,7 @@
// automatically deallocated.
RTMPMsg *msg = new RTMPMsg;
// memset(msg, 0, sizeof(RTMPMsg));
-
+
msg->setMethodName(name->to_string());
double swapped = streamid->to_number();
// swapBytes(&swapped, amf::AMF0_NUMBER_SIZE);
@@ -665,10 +670,10 @@
}
RTMPMsg *
-RTMP::decodeMsgBody(boost::shared_ptr<amf::Buffer> buf)
+RTMP::decodeMsgBody(amf::Buffer &buf)
{
// GNASH_REPORT_FUNCTION;
- return decodeMsgBody(buf->reference(), buf->size());
+ return decodeMsgBody(buf.reference(), buf.size());
}
boost::shared_ptr<amf::Buffer>
@@ -807,42 +812,49 @@
// Send a message, usually a single ActionScript object. This message
// may be broken down into a series of packets on a regular byte
-// interval. (128 bytes for video data). Each message main contain
-// multiple packets.
+// interval. The byte boundary defaults to 128 bytes (video data), but can
+// be changed by the ChunkSize() command.
bool
-RTMP::sendMsg(boost::shared_ptr<amf::Buffer> data)
+RTMP::sendMsg(int fd, int channel, rtmp_headersize_e head_size,
+ size_t total_size, content_types_e type,
+ RTMPMsg::rtmp_source_e routing, amf::Buffer &data)
{
GNASH_REPORT_FUNCTION;
+ boost::shared_ptr<amf::Buffer> head = encodeHeader(channel, head_size,
total_size,
+ type, routing);
size_t partial = RTMP_VIDEO_PACKET_SIZE;
size_t nbytes = 0;
- boost::uint8_t header = 0xc3;
-
- while (nbytes <= data->size()) {
- if ((data->size() - nbytes) < static_cast<signed
int>(RTMP_VIDEO_PACKET_SIZE)) {
- partial = data->size() - nbytes;
- }
- writeNet(data->reference() + nbytes, partial);
- if (partial == static_cast<signed int>(RTMP_VIDEO_PACKET_SIZE)) {
- writeNet(&header, 1);
+
+// boost::uint8_t head = 0xc3; // FIXME: this won't always be 0xc3 !
+
+ // now send the data
+ while (nbytes <= data.allocated()) {
+ // Send the header first. There is one between every packet.
+ int ret = writeNet(*head);
+ // The last bit of data is usually less than the packet size, so we
write less data
+ if ((data.allocated() - nbytes) < static_cast<signed
int>(RTMP_VIDEO_PACKET_SIZE)) {
+ partial = data.allocated() - nbytes;
}
+ // write the data to the client
+ ret = writeNet(fd, data.reference() + nbytes, partial);
+ // adjust the accumulator.
nbytes += RTMP_VIDEO_PACKET_SIZE;
};
return true;
}
-
+
+#if 0
// Send a Msg, and expect a response back of some kind.
RTMPMsg *
-RTMP::sendRecvMsg(int amf_index, rtmp_headersize_e head_size,
- size_t total_size, content_types_e type,
- RTMPMsg::rtmp_source_e routing,
boost::shared_ptr<amf::Buffer> bufin)
+RTMP::sendRecvMsg(amf::Buffer &bufin)
{
GNASH_REPORT_FUNCTION;
// size_t total_size = buf2->size() - 6; // FIXME: why drop 6 bytes ?
boost::shared_ptr<amf::Buffer> head = encodeHeader(amf_index, head_size,
total_size,
type, routing);
// int ret = 0;
- int ret = writeNet(head->reference(), head->size());
+ int ret = writeNet(head->reference(), head->size()); // send the header
first
// if (netDebug()) {
// head->dump();
// bufin->dump();
@@ -982,6 +994,7 @@
return msg;
}
+#endif
// Receive a message, which is a series of AMF elements, seperated
// by a one byte header at regular byte intervals. (128 bytes for
@@ -990,7 +1003,7 @@
RTMP::recvMsg()
{
GNASH_REPORT_FUNCTION;
- return recvMsg(_timeout);
+ return recvMsg(getFileFd());
}
// Read big chunks of NETBUFSIZE, which is the default for a Buffer as it's
@@ -998,32 +1011,49 @@
// also include the RTMP header every _chunksize bytes, this raw data will
// need to be processed later on.
boost::shared_ptr<amf::Buffer>
-RTMP::recvMsg(int timeout)
+RTMP::recvMsg(int fd)
{
GNASH_REPORT_FUNCTION;
int ret = 0;
bool nopacket = true;
- boost::shared_ptr<amf::Buffer> buf(new Buffer(NETBUFSIZE));
- while (nopacket) {
- ret = readNet(buf->reference(), buf->size(), timeout);
- if (ret <= 0) {
- log_error("Never got any data at line %d", __LINE__);
- buf.reset(); // no point in returning a buffer, right?
- return buf;
+ // Read really big packets, they get split into the smaller ones when
'split'
+ boost::shared_ptr<amf::Buffer> buf(new Buffer(7096));
+ do {
+ ret = readNet(fd, buf->reference()+ret, buf->size()-ret, _timeout);
+ cerr << __PRETTY_FUNCTION__ << ": " << ret << endl;
+ // We got data. Resize the buffer if necessary.
+ if (ret > 0) {
+ buf->setSeekPointer(buf->reference() + ret);
+// cerr << "XXXXX: " << (char *)buf->reference() << endl;
}
+ // the read timed out as there was no data, but the socket is still
open.
+ if (ret == 0) {
+ log_debug("no data for fd #%d, done reading this packet...", fd);
+ break;
+ }
if ((ret == 1) && (*(buf->reference()) == 0xff)) {
log_debug("Got an empty packet from the server at line %d",
__LINE__);
continue;
}
- nopacket = false;
- }
- buf->resize(ret);
+ if (ret < 0) {
+ log_error("Never got any data at line %d", __LINE__);
+ break;
+ }
+ // ret is "no position" when the socket is closed from the other end of
the connection,
+ // so we're done.
+ if ((ret == static_cast<int>(string::npos)) || (ret == -1)) {
+ log_debug("socket for fd #%d was closed...", fd);
+ break;
+ }
+ } while (ret);
+
// if (netDebug()) {
// buf->dump();
// }
-
+
+ // RTMP::split pushes the data into seperate queues, one for each channel.
return buf;
}
@@ -1034,23 +1064,23 @@
// bytes another 1 byte RTMP header. The header itself is not part of the byte
// count.
RTMP::queues_t *
-RTMP::split(boost::shared_ptr<Buffer> buf)
+RTMP::split(amf::Buffer &buf)
{
GNASH_REPORT_FUNCTION;
- if (buf == 0) {
+ if (buf.reference() == 0) {
log_error("Buffer pointer is invalid.");
}
// split the buffer at the chunksize boundary
boost::uint8_t *ptr = 0;
- rtmp_head_t *rthead = 0;
+ boost::shared_ptr<rtmp_head_t> rthead(new rtmp_head_t);
size_t pktsize = 0;
size_t nbytes = 0;
- ptr = buf->reference();
+ ptr = buf.reference();
boost::shared_ptr<amf::Buffer> chunk;
- while ((ptr - buf->reference()) < buf->size()) {
+ while ((ptr - buf.reference()) < static_cast<int>(buf.size())) {
rthead = decodeHeader(ptr);
if (rthead->channel == RTMP_SYSTEM_CHANNEL) {
log_debug("Got a message on the system channel");
=== modified file 'libnet/rtmp.h'
--- a/libnet/rtmp.h 2008-12-20 17:11:55 +0000
+++ b/libnet/rtmp.h 2008-12-28 01:44:57 +0000
@@ -20,6 +20,7 @@
#include <boost/cstdint.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <string>
#include <vector>
@@ -35,7 +36,7 @@
{
const boost::uint8_t RTMP_HANDSHAKE = 0x3;
-const int RTMP_BODY_SIZE = 1536;
+const int RTMP_HANDSHAKE_SIZE = 1536;
const int MAX_AMF_INDEXES = 64;
const int RTMP_HEADSIZE_MASK = 0xc0;
@@ -211,19 +212,16 @@
virtual ~RTMP();
// Decode
- rtmp_head_t *decodeHeader(boost::uint8_t *header);
- rtmp_head_t *decodeHeader(boost::shared_ptr<amf::Buffer> data);
+ boost::shared_ptr<rtmp_head_t> decodeHeader(boost::uint8_t *header);
+ boost::shared_ptr<rtmp_head_t> decodeHeader(amf::Buffer &data);
boost::shared_ptr<amf::Buffer> encodeHeader(int amf_index,
rtmp_headersize_e head_size,
size_t total_size, content_types_e type,
RTMPMsg::rtmp_source_e routing);
boost::shared_ptr<amf::Buffer> encodeHeader(int amf_index,
rtmp_headersize_e head_size);
- bool packetSend(boost::shared_ptr<amf::Buffer> buf);
- bool packetRead(boost::shared_ptr<amf::Buffer> buf);
-
- void addProperty(boost::shared_ptr<amf::Element> el);
- void addProperty(char *name, boost::shared_ptr<amf::Element> el);
- void addProperty(std::string &name, boost::shared_ptr<amf::Element> el);
- boost::shared_ptr<amf::Element> getProperty(const std::string &name);
+ void addProperty(amf::Element &el);
+ void addProperty(char *name, amf::Element &el);
+ void addProperty(std::string &name, amf::Element &el);
+ amf::Element &getProperty(const std::string &name);
void setHandler(Handler *hand) { _handler = hand; };
int headerSize(boost::uint8_t header);
@@ -237,10 +235,10 @@
// Decode an RTMP message
RTMPMsg *decodeMsgBody(boost::uint8_t *data, size_t size);
- RTMPMsg *decodeMsgBody(boost::shared_ptr<amf::Buffer> buf);
+ RTMPMsg *decodeMsgBody(amf::Buffer &buf);
- virtual rtmp_ping_t *decodePing(boost::uint8_t *data);
- rtmp_ping_t *decodePing(boost::shared_ptr<amf::Buffer> buf);
+ virtual boost::shared_ptr<rtmp_ping_t> decodePing(boost::uint8_t *data);
+ boost::shared_ptr<rtmp_ping_t> decodePing(amf::Buffer &buf);
// These are handlers for the various types
virtual boost::shared_ptr<amf::Buffer> encodeChunkSize();
@@ -271,31 +269,36 @@
// Receive a message, which is a series of AMF elements, seperated
// by a one byte header at regular byte intervals. (128 bytes for
- // video data by default). Each message main contain multiple packets.
+ // video data by default). Each message may contain multiple packets.
boost::shared_ptr<amf::Buffer> recvMsg();
- boost::shared_ptr<amf::Buffer> recvMsg(int timeout);
+ boost::shared_ptr<amf::Buffer> recvMsg(int fd);
// Send a message, usually a single ActionScript object. This message
// may be broken down into a series of packets on a regular byte
// interval. (128 bytes for video data by default). Each message main
// contain multiple packets.
- bool sendMsg(boost::shared_ptr<amf::Buffer> data);
+ bool sendMsg(amf::Buffer &data);
+ bool sendMsg(int fd, int channel, rtmp_headersize_e head_size,
+ size_t total_size, content_types_e type,
+ RTMPMsg::rtmp_source_e routing, amf::Buffer &data);
+#if 0
// Send a Msg, and expect a response back of some kind.
RTMPMsg *sendRecvMsg(int amf_index, rtmp_headersize_e head_size,
size_t total_size, content_types_e type,
- RTMPMsg::rtmp_source_e routing,
boost::shared_ptr<amf::Buffer> buf);
+ RTMPMsg::rtmp_source_e routing, amf::Buffer &buf);
+#endif
// Split a large buffer into multiple smaller ones of the default chunksize
// of 128 bytes. We read network data in big chunks because it's more
efficient,
// but RTMP uses a weird scheme of a standard header, and then every
chunksize
// bytes another 1 byte RTMP header. The header itself is not part of the
byte
// count.
- queues_t *split(boost::shared_ptr<amf::Buffer> buf);
+ queues_t *split(amf::Buffer &buf);
CQue &operator[] (size_t x) { return _queues[x]; }
void dump();
protected:
- std::map<const char *, boost::shared_ptr<amf::Element> > _properties;
+ std::map<const char *, amf::Element &> _properties;
amf::Buffer *_handshake;
Handler *_handler;
rtmp_head_t _header;
@@ -305,6 +308,7 @@
int _timeout;
CQue _queues[MAX_AMF_INDEXES];
queues_t _channels;
+ amf::Buffer _buffer;
};
} // end of gnash namespace
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9913: fix usage of boost::shared_ptr.,
rob <=