commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 02/29: zmq: tags should now be serializing


From: git
Subject: [Commit-gnuradio] [gnuradio] 02/29: zmq: tags should now be serializing and deserializing correctly for pub_sink/sub_source
Date: Tue, 13 Jan 2015 01:04:26 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit ffad094471e749351aa09cbd149d53a7a0e77a61
Author: Tim O'Shea <address@hidden>
Date:   Mon Oct 27 16:05:43 2014 -0400

    zmq: tags should now be serializing and deserializing correctly for 
pub_sink/sub_source
---
 gr-zeromq/lib/pub_sink_impl.cc   |  1 +
 gr-zeromq/lib/sub_source_impl.cc | 33 +++++++++++++++++++++++++++------
 2 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 5680203..8115d72 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -84,6 +84,7 @@ namespace gr {
         sb.str("");
         pmt::serialize( tags[i].key, sb );                                     
      // key
         pmt::serialize( tags[i].value, sb );                                   
      // value
+        pmt::serialize( tags[i].srcid, sb );                                   
      // srcid
         ss.write( sb.str().c_str() , sb.str().length() );   // offset
         }
       size_t headlen( ss.gcount() );
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 38ddc78..4cc9c25 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -81,16 +81,37 @@ namespace gr {
         // Receive data
         zmq::message_t msg;
         d_socket->recv(&msg);
-        // Copy to ouput buffer and return
-        if (msg.size() >= d_itemsize*d_vlen*noutput_items) {
-          memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items);
 
+        // Deserialize header data / tags
+        std::istringstream iss( std::string(static_cast<char*>(msg.data()), 
msg.size()));
+        uint64_t rcv_offset;
+        size_t   rcv_ntags;
+        iss.read( (char*)&rcv_offset, sizeof(uint64_t ) );
+        iss.read( (char*)&rcv_ntags,  sizeof(size_t   ) );
+        for(size_t i=0; i<rcv_ntags; i++){
+            uint64_t tag_offset;
+            iss.read( (char*)&tag_offset, sizeof(uint64_t ) );
+            std::stringbuf sb( iss.str() );
+            pmt::pmt_t key = pmt::deserialize( sb );
+            pmt::pmt_t val = pmt::deserialize( sb );
+            pmt::pmt_t src = pmt::deserialize( sb );
+            uint64_t new_tag_offset = tag_offset + nitems_read(0) - rcv_offset;
+            add_item_tag(0, new_tag_offset, key, val, src);
+            iss.str(sb.str());
+            }
+
+        // Pass sample data along
+        std::vector<char> samp(iss.gcount());
+        iss.read( &samp[0], iss.gcount() );
+
+        // Copy to ouput buffer and return
+        if (samp.size() >= d_itemsize*d_vlen*noutput_items) {
+          memcpy(out, (void *)&samp[0], d_itemsize*d_vlen*noutput_items);
           return noutput_items;
         }
         else {
-          memcpy(out, (void *)msg.data(), msg.size());
-
-          return msg.size()/(d_itemsize*d_vlen);
+          memcpy(out, (void *)&samp[0], samp.size());
+          return samp.size()/(d_itemsize*d_vlen);
         }
       }
       else {



reply via email to

[Prev in Thread] Current Thread [Next in Thread]