commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 03/29: zmq: default to not pass tags (compa


From: git
Subject: [Commit-gnuradio] [gnuradio] 03/29: zmq: default to not pass tags (compatible wire format)
Date: Tue, 13 Jan 2015 01:04:26 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 6a3efa633309fff2834321dfbadb343064b0ab50
Author: Tim O'Shea <address@hidden>
Date:   Mon Oct 27 16:25:16 2014 -0400

    zmq: default to not pass tags (compatible wire format)
---
 gr-zeromq/include/gnuradio/zeromq/pub_sink.h   |  2 +-
 gr-zeromq/include/gnuradio/zeromq/sub_source.h |  2 +-
 gr-zeromq/lib/pub_sink_impl.cc                 | 16 ++++++++++------
 gr-zeromq/lib/pub_sink_impl.h                  |  3 ++-
 gr-zeromq/lib/sub_source_impl.cc               |  9 ++++++---
 gr-zeromq/lib/sub_source_impl.h                |  3 ++-
 6 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
index a60fb15..11f251e 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
@@ -53,7 +53,7 @@ namespace gr {
        * \param address  ZMQ socket address specifier
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100, bool pass_tags=false);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h 
b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
index 9deaa7f..f97dc5a 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
@@ -51,7 +51,7 @@ namespace gr {
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100, bool pass_tags=false);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 8115d72..7b57e27 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -33,17 +33,17 @@ namespace gr {
   namespace zeromq {
 
     pub_sink::sptr
-    pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+    pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
     {
       return gnuradio::get_initial_sptr
         (new pub_sink_impl(itemsize, vlen, address, timeout));
     }
 
-    pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout)
+    pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags)
       : gr::sync_block("pub_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
@@ -72,9 +72,11 @@ namespace gr {
       const char *in = (const char *)input_items[0];
 
       // encode the current offset, # tags, and tags into header
+    size_t headlen(0);
+    std::stringstream ss;
+    if(d_pass_tags){
       std::vector<gr::tag_t> tags;
       get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
-      std::stringstream ss;
       size_t ntags = tags.size();
       ss.write( reinterpret_cast< const char* >( nitems_read(0) ), 
sizeof(uint64_t) );  // offset
       ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) );   
   // num tags
@@ -87,11 +89,13 @@ namespace gr {
         pmt::serialize( tags[i].srcid, sb );                                   
      // srcid
         ss.write( sb.str().c_str() , sb.str().length() );   // offset
         }
-      size_t headlen( ss.gcount() );
+      headlen = ss.gcount();
+      }
 
       // create message copy and send
       zmq::message_t msg(headlen + d_itemsize*d_vlen*noutput_items);
-      memcpy((void*) msg.data(), ss.str().c_str(), ss.str().length() );
+      if(d_pass_tags)
+        memcpy((void*) msg.data(), ss.str().c_str(), ss.str().length() );
       memcpy((uint8_t *)msg.data() + headlen, in, 
d_itemsize*d_vlen*noutput_items);
       d_socket->send(msg);
 
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 9c956ef..049e587 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
       float           d_timeout;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      bool            d_pass_tags;
 
     public:
-      pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+      pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags=false);
       ~pub_sink_impl();
 
       int work(int noutput_items,
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 4cc9c25..6c9da1b 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -31,17 +31,17 @@ namespace gr {
   namespace zeromq {
 
     sub_source::sptr
-    sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout)
+    sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
     {
       return gnuradio::get_initial_sptr
         (new sub_source_impl(itemsize, vlen, address, timeout));
     }
 
-    sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout)
+    sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags)
       : gr::sync_block("sub_source",
                        gr::io_signature::make(0, 0, 0),
                        gr::io_signature::make(1, 1, itemsize * vlen)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
@@ -84,6 +84,8 @@ namespace gr {
 
         // Deserialize header data / tags
         std::istringstream iss( std::string(static_cast<char*>(msg.data()), 
msg.size()));
+
+        if(d_pass_tags){
         uint64_t rcv_offset;
         size_t   rcv_ntags;
         iss.read( (char*)&rcv_offset, sizeof(uint64_t ) );
@@ -99,6 +101,7 @@ namespace gr {
             add_item_tag(0, new_tag_offset, key, val, src);
             iss.str(sb.str());
             }
+        }
 
         // Pass sample data along
         std::vector<char> samp(iss.gcount());
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 4464752..f528294 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -37,9 +37,10 @@ namespace gr {
       int             d_timeout; // microseconds, -1 is blocking
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      bool            d_pass_tags;
 
      public:
-      sub_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout);
+      sub_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags=false);
       ~sub_source_impl();
 
       int work(int noutput_items,



reply via email to

[Prev in Thread] Current Thread [Next in Thread]