commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag


From: git
Subject: [Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers
Date: Tue, 13 Jan 2015 01:04:27 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit cb4e8856fdb7b1d0f30d01852d57566efe692cd2
Author: Tim O'Shea <address@hidden>
Date:   Mon Oct 27 21:31:33 2014 -0400

    zmq: sync blocks now all support tag headers
---
 gr-zeromq/include/gnuradio/zeromq/push_sink.h |  2 +-
 gr-zeromq/include/gnuradio/zeromq/rep_sink.h  |  2 +-
 gr-zeromq/lib/pub_sink_impl.cc                |  2 --
 gr-zeromq/lib/push_sink_impl.cc               | 30 +++++++++++++++------
 gr-zeromq/lib/push_sink_impl.h                |  3 ++-
 gr-zeromq/lib/rep_sink_impl.cc                | 38 +++++++++++++++------------
 gr-zeromq/lib/rep_sink_impl.h                 |  3 ++-
 7 files changed, 49 insertions(+), 31 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index b54a1e4..1b8999e 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -55,7 +55,7 @@ namespace gr {
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100, bool pass_tags=false);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
index 1da3252..6d3c47b 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
@@ -53,7 +53,7 @@ namespace gr {
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100, bool pass_tags=false);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 43819f3..4573990 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -27,8 +27,6 @@
 #include <gnuradio/io_signature.h>
 #include "pub_sink_impl.h"
 #include "tag_headers.h"
-#include <sstream>
-#include <cstring>
 
 namespace gr {
   namespace zeromq {
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index d949a7f..4cc9ab9 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -26,22 +26,23 @@
 
 #include <gnuradio/io_signature.h>
 #include "push_sink_impl.h"
+#include "tag_headers.h"
 
 namespace gr {
   namespace zeromq {
 
     push_sink::sptr
-    push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+    push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
     {
       return gnuradio::get_initial_sptr
-        (new push_sink_impl(itemsize, vlen, address, timeout));
+        (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags));
     }
 
-    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, int timeout)
+    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags)
       : gr::sync_block("push_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
@@ -74,10 +75,23 @@ namespace gr {
 
       //  If we got a reply, process
       if (itemsout[0].revents & ZMQ_POLLOUT) {
-        // create message copy and send
-        zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
-        memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
-        d_socket->send(msg);
+
+      // encode the current offset, # tags, and tags into header
+      std::string header("");
+    
+      if(d_pass_tags){
+        uint64_t offset = nitems_read(0);
+        std::vector<gr::tag_t> tags;
+        get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
+        header = gen_tag_header( offset, tags );
+        }
+
+      // create message copy and send
+      zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+      if(d_pass_tags)
+        memcpy((void*) msg.data(), header.c_str(), header.length() );
+      memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*noutput_items);
+      d_socket->send(msg);
 
         return noutput_items;
       }
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 9a10065..2590a7f 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
       float           d_timeout;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      bool            d_pass_tags;
 
     public:
-      push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+      push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags=false);
       ~push_sink_impl();
 
       int work(int noutput_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index a8fd588..88ed6c1 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -26,22 +26,23 @@
 
 #include <gnuradio/io_signature.h>
 #include "rep_sink_impl.h"
+#include "tag_headers.h"
 
 namespace gr {
   namespace zeromq {
 
     rep_sink::sptr
-    rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+    rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
     {
       return gnuradio::get_initial_sptr
-        (new rep_sink_impl(itemsize, vlen, address, timeout));
+        (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags));
     }
 
-    rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout)
+    rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags)
       : gr::sync_block("rep_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
@@ -78,22 +79,25 @@ namespace gr {
         zmq::message_t request;
         d_socket->recv(&request);
         int req_output_items = *(static_cast<int*>(request.data()));
+        int nitems_send = std::min(noutput_items, req_output_items);
+        
+        // encode the current offset, # tags, and tags into header
+        std::string header("");
+        if(d_pass_tags){
+            uint64_t offset = nitems_read(0);
+            std::vector<gr::tag_t> tags;
+            get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
+            header = gen_tag_header( offset, tags );
+            }
 
         // create message copy and send
-        if (noutput_items < req_output_items) {
-          zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
-          memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
-          d_socket->send(msg);
+        zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
+        if(d_pass_tags)
+            memcpy((void*) msg.data(), header.c_str(), header.length() );
+        memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*nitems_send);
+        d_socket->send(msg);
 
-          return noutput_items;
-        }
-        else {
-          zmq::message_t msg(d_itemsize*d_vlen*req_output_items);
-          memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items);
-          d_socket->send(msg);
-
-          return req_output_items;
-        }
+        return nitems_send;
       }
 
       return 0;
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index ff69735..68bb9eb 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
       int             d_timeout;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      bool            d_pass_tags;
 
     public:
-      rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+      rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags=false);
       ~rep_sink_impl();
 
       int work(int noutput_items,



reply via email to

[Prev in Thread] Current Thread [Next in Thread]