commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 01/01: gr-zeromq: Big rework for performanc


From: git
Subject: [Commit-gnuradio] [gnuradio] 01/01: gr-zeromq: Big rework for performance and correctness
Date: Wed, 27 Jan 2016 21:14:37 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch maint
in repository gnuradio.

commit 6e482c5bb6bf49f000f6b8d35a1ca84127e38c46
Author: Sylvain Munaut <address@hidden>
Date:   Wed Jan 27 12:58:50 2016 +0100

    gr-zeromq: Big rework for performance and correctness
    
     - Use class hierarchy trying to maximize code re-use.
     - Dont' drop samples on receive if the output buffer doesn't have
       enough space.
     - Don't drop tags on receive by putting tags in the future.
     - Better metadata creation/parsing avoiding copying lots data.
     - Always do as much work as possible in a single call to work()
       to avoid scheduler overhead as long as possible.
     - Allow setting the  high watermark to avoid older version of
       zeromq's default of buffering infinite messages and causing a
       paging thrash to/from disk when the flow graph can't keep up.
    
    Signed-off-by: Sylvain Munaut <address@hidden>
---
 gr-zeromq/grc/zeromq_pub_sink.xml               |  20 ++-
 gr-zeromq/grc/zeromq_pull_source.xml            |  20 ++-
 gr-zeromq/grc/zeromq_push_sink.xml              |  20 ++-
 gr-zeromq/grc/zeromq_rep_sink.xml               |  20 ++-
 gr-zeromq/grc/zeromq_req_source.xml             |  20 ++-
 gr-zeromq/grc/zeromq_sub_source.xml             |  20 ++-
 gr-zeromq/include/gnuradio/zeromq/pub_sink.h    |   3 +-
 gr-zeromq/include/gnuradio/zeromq/pull_source.h |   3 +-
 gr-zeromq/include/gnuradio/zeromq/push_sink.h   |   3 +-
 gr-zeromq/include/gnuradio/zeromq/rep_sink.h    |   3 +-
 gr-zeromq/include/gnuradio/zeromq/req_source.h  |   3 +-
 gr-zeromq/include/gnuradio/zeromq/sub_source.h  |   3 +-
 gr-zeromq/lib/CMakeLists.txt                    |   1 +
 gr-zeromq/lib/base_impl.cc                      | 198 ++++++++++++++++++++++++
 gr-zeromq/lib/base_impl.h                       |  77 +++++++++
 gr-zeromq/lib/pub_sink_impl.cc                  |  55 +------
 gr-zeromq/lib/pub_sink_impl.h                   |  15 +-
 gr-zeromq/lib/pull_source_impl.cc               |  93 ++++-------
 gr-zeromq/lib/pull_source_impl.h                |  15 +-
 gr-zeromq/lib/push_sink_impl.cc                 |  69 ++-------
 gr-zeromq/lib/push_sink_impl.h                  |  15 +-
 gr-zeromq/lib/rep_sink_impl.cc                  |  91 +++++------
 gr-zeromq/lib/rep_sink_impl.h                   |  15 +-
 gr-zeromq/lib/req_source_impl.cc                | 107 ++++++-------
 gr-zeromq/lib/req_source_impl.h                 |  18 +--
 gr-zeromq/lib/sub_source_impl.cc                |  94 ++++-------
 gr-zeromq/lib/sub_source_impl.h                 |  23 +--
 gr-zeromq/lib/tag_headers.cc                    |  96 +++++++-----
 gr-zeromq/lib/tag_headers.h                     |   5 +-
 29 files changed, 651 insertions(+), 474 deletions(-)

diff --git a/gr-zeromq/grc/zeromq_pub_sink.xml 
b/gr-zeromq/grc/zeromq_pub_sink.xml
index 7babc9e..1b2f9ec 100644
--- a/gr-zeromq/grc/zeromq_pub_sink.xml
+++ b/gr-zeromq/grc/zeromq_pub_sink.xml
@@ -4,7 +4,7 @@
   <key>zeromq_pub_sink</key>
   <category>ZeroMQ Interfaces</category>
   <import>from gnuradio import zeromq</import>
-  <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, 
$pass_tags)</make>
+  <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, 
$hwm)</make>
 
   <param>
     <name>IO Type</name>
@@ -61,7 +61,23 @@
     <name>Pass Tags</name>
     <key>pass_tags</key>
     <value>False</value>
-    <type>bool</type>
+    <type>enum</type>
+    <option>
+      <name>Yes</name>
+      <key>True</key>
+    </option>
+    <option>
+      <name>No</name>
+      <key>False</key>
+    </option>
+  </param>
+
+  <param>
+    <name>High Watermark</name>
+    <key>hwm</key>
+    <value>-1</value>
+    <type>int</type>
+    <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
   </param>
 
   <sink>
diff --git a/gr-zeromq/grc/zeromq_pull_source.xml 
b/gr-zeromq/grc/zeromq_pull_source.xml
index c8a7b89..8158b47 100644
--- a/gr-zeromq/grc/zeromq_pull_source.xml
+++ b/gr-zeromq/grc/zeromq_pull_source.xml
@@ -4,7 +4,7 @@
   <key>zeromq_pull_source</key>
   <category>ZeroMQ Interfaces</category>
   <import>from gnuradio import zeromq</import>
-  <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, 
$pass_tags)</make>
+  <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, 
$pass_tags, $hwm)</make>
 
   <param>
     <name>IO Type</name>
@@ -61,7 +61,23 @@
     <name>Pass Tags</name>
     <key>pass_tags</key>
     <value>False</value>
-    <type>bool</type>
+    <type>enum</type>
+    <option>
+      <name>Yes</name>
+      <key>True</key>
+    </option>
+    <option>
+      <name>No</name>
+      <key>False</key>
+    </option>
+  </param>
+
+  <param>
+    <name>High Watermark</name>
+    <key>hwm</key>
+    <value>-1</value>
+    <type>int</type>
+    <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
   </param>
 
   <source>
diff --git a/gr-zeromq/grc/zeromq_push_sink.xml 
b/gr-zeromq/grc/zeromq_push_sink.xml
index eb6ead5..528da94 100644
--- a/gr-zeromq/grc/zeromq_push_sink.xml
+++ b/gr-zeromq/grc/zeromq_push_sink.xml
@@ -4,7 +4,7 @@
   <key>zeromq_push_sink</key>
   <category>ZeroMQ Interfaces</category>
   <import>from gnuradio import zeromq</import>
-  <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, 
$pass_tags)</make>
+  <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, 
$pass_tags, $hwm)</make>
 
   <param>
     <name>IO Type</name>
@@ -61,7 +61,23 @@
     <name>Pass Tags</name>
     <key>pass_tags</key>
     <value>False</value>
-    <type>bool</type>
+    <type>enum</type>
+    <option>
+      <name>Yes</name>
+      <key>True</key>
+    </option>
+    <option>
+      <name>No</name>
+      <key>False</key>
+    </option>
+  </param>
+
+  <param>
+    <name>High Watermark</name>
+    <key>hwm</key>
+    <value>-1</value>
+    <type>int</type>
+    <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
   </param>
 
   <sink>
diff --git a/gr-zeromq/grc/zeromq_rep_sink.xml 
b/gr-zeromq/grc/zeromq_rep_sink.xml
index 2209b4f..db735a3 100644
--- a/gr-zeromq/grc/zeromq_rep_sink.xml
+++ b/gr-zeromq/grc/zeromq_rep_sink.xml
@@ -4,7 +4,7 @@
   <key>zeromq_rep_sink</key>
   <category>ZeroMQ Interfaces</category>
   <import>from gnuradio import zeromq</import>
-  <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, 
$pass_tags)</make>
+  <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, 
$hwm)</make>
 
   <param>
     <name>IO Type</name>
@@ -61,7 +61,23 @@
     <name>Pass Tags</name>
     <key>pass_tags</key>
     <value>False</value>
-    <type>bool</type>
+    <type>enum</type>
+    <option>
+      <name>Yes</name>
+      <key>True</key>
+    </option>
+    <option>
+      <name>No</name>
+      <key>False</key>
+    </option>
+  </param>
+
+  <param>
+    <name>High Watermark</name>
+    <key>hwm</key>
+    <value>-1</value>
+    <type>int</type>
+    <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
   </param>
 
   <sink>
diff --git a/gr-zeromq/grc/zeromq_req_source.xml 
b/gr-zeromq/grc/zeromq_req_source.xml
index 050718c..2ef2243 100644
--- a/gr-zeromq/grc/zeromq_req_source.xml
+++ b/gr-zeromq/grc/zeromq_req_source.xml
@@ -4,7 +4,7 @@
   <key>zeromq_req_source</key>
   <category>ZeroMQ Interfaces</category>
   <import>from gnuradio import zeromq</import>
-  <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, 
$pass_tags)</make>
+  <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, 
$pass_tags, $hwm)</make>
 
   <param>
     <name>IO Type</name>
@@ -61,7 +61,23 @@
     <name>Pass Tags</name>
     <key>pass_tags</key>
     <value>False</value>
-    <type>bool</type>
+    <type>enum</type>
+    <option>
+      <name>Yes</name>
+      <key>True</key>
+    </option>
+    <option>
+      <name>No</name>
+      <key>False</key>
+    </option>
+  </param>
+
+  <param>
+    <name>High Watermark</name>
+    <key>hwm</key>
+    <value>-1</value>
+    <type>int</type>
+    <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
   </param>
 
   <source>
diff --git a/gr-zeromq/grc/zeromq_sub_source.xml 
b/gr-zeromq/grc/zeromq_sub_source.xml
index 86af506..268a893 100644
--- a/gr-zeromq/grc/zeromq_sub_source.xml
+++ b/gr-zeromq/grc/zeromq_sub_source.xml
@@ -4,7 +4,7 @@
   <key>zeromq_sub_source</key>
   <category>ZeroMQ Interfaces</category>
   <import>from gnuradio import zeromq</import>
-  <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, 
$pass_tags)</make>
+  <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, 
$pass_tags, $hwm)</make>
 
   <param>
     <name>IO Type</name>
@@ -61,7 +61,23 @@
     <name>Pass Tags</name>
     <key>pass_tags</key>
     <value>False</value>
-    <type>bool</type>
+    <type>enum</type>
+    <option>
+      <name>Yes</name>
+      <key>True</key>
+    </option>
+    <option>
+      <name>No</name>
+      <key>False</key>
+    </option>
+  </param>
+
+  <param>
+    <name>High Watermark</name>
+    <key>hwm</key>
+    <value>-1</value>
+    <type>int</type>
+    <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
   </param>
 
   <source>
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
index e8871c2..e87c552 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
@@ -53,9 +53,10 @@ namespace gr {
        * \param address  ZMQ socket address specifier.
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments.
        * \param pass_tags Whether sink will serialize and pass tags over the 
link.
+       * \param hwm High Watermark to configure the socket to (-1 => zmq's 
default)
        */
       static sptr make(size_t itemsize, size_t vlen, char *address,
-                       int timeout=100, bool pass_tags=false);
+                       int timeout=100, bool pass_tags=false, int hwm=-1);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_source.h 
b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
index ca7b407..07cf6af 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pull_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
@@ -50,9 +50,10 @@ namespace gr {
        * \param address  ZMQ socket address specifier.
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments.
        * \param pass_tags Whether source will look for and deserialize tags.
+       * \param hwm High Watermark to configure the socket to (-1 => zmq's 
default)
        */
       static sptr make(size_t itemsize, size_t vlen, char *address,
-                       int timeout=100, bool pass_tags=false);
+                       int timeout=100, bool pass_tags=false, int hwm=-1);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index 0f21b44..e2260aa 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -54,9 +54,10 @@ namespace gr {
        * \param address  ZMQ socket address specifier.
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments.
        * \param pass_tags Whether sink will serialize and pass tags over the 
link.
+       * \param hwm High Watermark to configure the socket to (-1 => zmq's 
default)
        */
       static sptr make(size_t itemsize, size_t vlen, char *address,
-                       int timeout=100, bool pass_tags=false);
+                       int timeout=100, bool pass_tags=false, int hwm=-1);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
index 33fd38b..220bd34 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
@@ -52,9 +52,10 @@ namespace gr {
        * \param address  ZMQ socket address specifier.
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments.
        * \param pass_tags Whether sink will serialize and pass tags over the 
link.
+       * \param hwm High Watermark to configure the socket to (-1 => zmq's 
default)
        */
       static sptr make(size_t itemsize, size_t vlen, char *address,
-                       int timeout=100, bool pass_tags=false);
+                       int timeout=100, bool pass_tags=false, int hwm=-1);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h 
b/gr-zeromq/include/gnuradio/zeromq/req_source.h
index 9936406..461f653 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h
@@ -50,9 +50,10 @@ namespace gr {
        * \param address  ZMQ socket address specifier.
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments.
        * \param pass_tags Whether source will look for and deserialize tags.
+       * \param hwm High Watermark to configure the socket to (-1 => zmq's 
default)
        */
       static sptr make(size_t itemsize, size_t vlen, char *address,
-                       int timeout=100, bool pass_tags=false);
+                       int timeout=100, bool pass_tags=false, int hwm=-1);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h 
b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
index 5fdd893..def3a70 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
@@ -50,9 +50,10 @@ namespace gr {
        * \param address  ZMQ socket address specifier.
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments.
        * \param pass_tags Whether source will look for and deserialize tags.
+       * \param hwm High Watermark to configure the socket to (-1 => zmq's 
default)
        */
       static sptr make(size_t itemsize, size_t vlen, char *address,
-                       int timeout=100, bool pass_tags=false);
+                       int timeout=100, bool pass_tags=false, int hwm=-1);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/CMakeLists.txt b/gr-zeromq/lib/CMakeLists.txt
index 941e5ff..d7b03fa 100644
--- a/gr-zeromq/lib/CMakeLists.txt
+++ b/gr-zeromq/lib/CMakeLists.txt
@@ -37,6 +37,7 @@ endif(ENABLE_GR_CTRLPORT)
 # Setup library
 ########################################################################
 list(APPEND zeromq_sources
+  base_impl.cc
   pub_sink_impl.cc
   pub_msg_sink_impl.cc
   sub_source_impl.cc
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
new file mode 100644
index 0000000..f41e5cb
--- /dev/null
+++ b/gr-zeromq/lib/base_impl.cc
@@ -0,0 +1,198 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2016 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "base_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+  namespace zeromq {
+
+    base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, 
bool pass_tags)
+      : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+    {
+      /* "Fix" timeout value (ms for new API, us for old API) */
+      int major, minor, patch;
+      zmq::version (&major, &minor, &patch);
+
+      if (major < 3) {
+        d_timeout *= 1000;
+      }
+
+      /* Create context & socket */
+      d_context = new zmq::context_t(1);
+      d_socket = new zmq::socket_t(*d_context, type);
+    }
+
+    base_impl::~base_impl()
+    {
+        d_socket->close();
+        delete d_socket;
+        delete d_context;
+    }
+
+
+    base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen, 
char *address, int timeout, bool pass_tags, int hwm)
+        : base_impl(type, itemsize, vlen, timeout, pass_tags)
+    {
+      /* Set high watermark */
+      if (hwm >= 0) {
+#ifdef ZMQ_SNDHWM
+        d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+#else // major < 3
+        uint64_t tmp = hwm;
+        d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+#endif
+      }
+
+      /* Bind */
+      d_socket->bind(address);
+    }
+
+    int
+    base_sink_impl::send_message(const void *in_buf, const int in_nitems, 
const uint64_t in_offset)
+    {
+      /* Meta-data header */
+      std::string header("");
+      if(d_pass_tags){
+        std::vector<gr::tag_t> tags;
+        get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems);
+        header = gen_tag_header(in_offset, tags);
+      }
+
+      /* Create message */
+      size_t payload_len = in_nitems * d_vsize;
+      size_t msg_len = d_pass_tags ? payload_len + header.length() : 
payload_len;
+      zmq::message_t msg(msg_len);
+
+      if(d_pass_tags){
+        memcpy(msg.data(), header.c_str(), header.length());
+        memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len);
+      } else {
+        memcpy(msg.data(), in_buf, payload_len);
+      }
+
+      /* Send */
+      d_socket->send(msg);
+
+      /* Report back */
+      return in_nitems;
+    }
+
+    base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen, 
char *address, int timeout, bool pass_tags, int hwm)
+        : base_impl(type, itemsize, vlen, timeout, pass_tags),
+          d_consumed_bytes(0), d_consumed_items(0)
+    {
+      /* Set high watermark */
+      if (hwm >= 0) {
+#ifdef ZMQ_RCVHWM
+        d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+#else // major < 3
+        uint64_t tmp = hwm;
+        d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+#endif
+      }
+
+      /* Connect */
+      d_socket->connect(address);
+    }
+
+    bool
+    base_source_impl::has_pending()
+    {
+      return d_msg.size() > d_consumed_bytes;
+    }
+
+    int
+    base_source_impl::flush_pending(void *out_buf, const int out_nitems, const 
uint64_t out_offset)
+    {
+      /* How much to copy in this call */
+      int to_copy_items = std::min(out_nitems, (int)((d_msg.size() - 
d_consumed_bytes) / d_vsize));
+      int to_copy_bytes = d_vsize * to_copy_items;
+
+      /* Copy actual data */
+      memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, 
to_copy_bytes);
+
+      /* Add tags matching this segment of samples */
+      for (unsigned int i=0; i<d_tags.size(); i++)
+      {
+        if ((d_tags[i].offset >= (uint64_t)d_consumed_items) &&
+            (d_tags[i].offset  < (uint64_t)d_consumed_items + to_copy_items))
+        {
+          gr::tag_t nt = d_tags[i];
+          nt.offset += out_offset - d_consumed_items;
+          add_item_tag(0, nt);
+        }
+      }
+
+      /* Update pointer */
+      d_consumed_items += to_copy_items;
+      d_consumed_bytes += to_copy_bytes;
+
+      return to_copy_items;
+    }
+
+    bool
+    base_source_impl::load_message(bool wait)
+    {
+      /* Poll for input */
+      zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+      zmq::poll(&items[0], 1, wait ? d_timeout : 0);
+
+      if (!(items[0].revents & ZMQ_POLLIN))
+        return false;
+
+      /* Reset */
+      d_msg.rebuild();
+      d_tags.clear();
+      d_consumed_items = 0;
+      d_consumed_bytes = 0;
+
+      /* Get the message */
+      d_socket->recv(&d_msg);
+
+      /* Parse header */
+      if (d_pass_tags)
+      {
+        uint64_t rcv_offset;
+
+        /* Parse header */
+        d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags);
+
+        /* Fixup the tags offset to be relative to the start of this message */
+        for (unsigned int i=0; i<d_tags.size(); i++) {
+          d_tags[i].offset -= rcv_offset;
+        }
+      }
+
+      /* We got one ! */
+      return true;
+    }
+
+  } /* namespace zeromq */
+} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h
new file mode 100644
index 0000000..ed16951
--- /dev/null
+++ b/gr-zeromq/lib/base_impl.h
@@ -0,0 +1,77 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2016 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_BASE_IMPL_H
+#define INCLUDED_ZEROMQ_BASE_IMPL_H
+
+#include <zmq.hpp>
+
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+  namespace zeromq {
+
+    class base_impl : public virtual gr::sync_block
+    {
+    public:
+      base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool 
pass_tags);
+      virtual ~base_impl();
+
+    protected:
+      zmq::context_t  *d_context;
+      zmq::socket_t   *d_socket;
+      size_t          d_vsize;
+      int             d_timeout;
+      bool            d_pass_tags;
+    };
+
+    class base_sink_impl : public base_impl
+    {
+    public:
+      base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags, int hwm);
+
+    protected:
+      int send_message(const void *in_buf, const int in_nitems, const uint64_t 
in_offset);
+    };
+
+    class base_source_impl : public base_impl
+    {
+    public:
+      base_source_impl(int type, size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags, int hwm);
+
+    protected:
+      zmq::message_t d_msg;
+      std::vector<gr::tag_t> d_tags;
+      size_t d_consumed_bytes;
+      int    d_consumed_items;
+
+      bool has_pending();
+      int  flush_pending(void *out_buf, const int out_nitems, const uint64_t 
out_offset);
+      bool load_message(bool wait);
+    };
+
+  } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_BASE_IMPL_H */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index c103069..b602bc8 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -32,35 +32,19 @@ namespace gr {
   namespace zeromq {
 
     pub_sink::sptr
-    pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
+    pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm)
     {
       return gnuradio::get_initial_sptr
-        (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+        (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
     }
 
-    pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags)
+    pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags, int hwm)
       : gr::sync_block("pub_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
+        base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, 
hwm)
     {
-      int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
-      if (major < 3) {
-        d_timeout = timeout*1000;
-      }
-      d_context = new zmq::context_t(1);
-      d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
-      int time = 0;
-      d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
-      d_socket->bind(address);
-    }
-
-    pub_sink_impl::~pub_sink_impl()
-    {
-      d_socket->close();
-      delete d_socket;
-      delete d_context;
+      /* All is delegated */
     }
 
     int
@@ -68,33 +52,10 @@ namespace gr {
                         gr_vector_const_void_star &input_items,
                         gr_vector_void_star &output_items)
     {
-      const char *in = (const char *)input_items[0];
-
-      // encode the current offset, # tags, and tags into header
-    std::string header("");
-    if(d_pass_tags){
-      uint64_t offset = nitems_read(0);
-      std::vector<gr::tag_t> tags;
-      get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
-      header = gen_tag_header( offset, tags );  
-      }
-
-      // create message copy and send
-      int payloadlen = d_itemsize * d_vlen * noutput_items;
-      int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
-      zmq::message_t msg(msglen);
-
-      if(d_pass_tags){
-        memcpy((void*) msg.data(), header.c_str(), header.length() );
-        memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*noutput_items);
-        } else {
-        memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
-        }
-    
-      d_socket->send(msg);
-
-      return noutput_items;
+      return send_message(input_items[0], noutput_items, nitems_read(0));
     }
 
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 100b0f5..8637c35 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -26,22 +26,15 @@
 #include <gnuradio/zeromq/pub_sink.h>
 #include <zmq.hpp>
 
+#include "base_impl.h"
+
 namespace gr {
   namespace zeromq {
 
-    class pub_sink_impl : public pub_sink
+    class pub_sink_impl : public pub_sink, public base_sink_impl
     {
-    private:
-      size_t          d_itemsize;
-      size_t          d_vlen;
-      float           d_timeout;
-      zmq::context_t  *d_context;
-      zmq::socket_t   *d_socket;
-      bool            d_pass_tags;
-
     public:
-      pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags);
-      ~pub_sink_impl();
+      pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm);
 
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/pull_source_impl.cc 
b/gr-zeromq/lib/pull_source_impl.cc
index 3215096..4045dd7 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -32,41 +32,19 @@ namespace gr {
   namespace zeromq {
 
     pull_source::sptr
-    pull_source::make(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags)
+    pull_source::make(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags, int hwm)
     {
       return gnuradio::get_initial_sptr
-        (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags));
+        (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags, 
hwm));
     }
 
-    pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags)
+    pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags, int hwm)
       : gr::sync_block("pull_source",
                        gr::io_signature::make(0, 0, 0),
                        gr::io_signature::make(1, 1, itemsize * vlen)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
+        base_source_impl(ZMQ_PULL, itemsize, vlen, address, timeout, 
pass_tags, hwm)
     {
-      int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
-
-      if (major < 3) {
-        d_timeout = timeout*1000;
-      }
-
-      d_context = new zmq::context_t(1);
-      d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
-
-      int time = 0;
-      d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
-      d_socket->connect (address);
-    }
-
-    /*
-     * Our virtual destructor.
-     */
-    pull_source_impl::~pull_source_impl()
-    {
-      d_socket->close();
-      delete d_socket;
-      delete d_context;
+      /* All is delegated */
     }
 
     int
@@ -74,44 +52,37 @@ namespace gr {
                            gr_vector_const_void_star &input_items,
                            gr_vector_void_star &output_items)
     {
-      char *out = (char*)output_items[0];
-
-      zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&items[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (items[0].revents & ZMQ_POLLIN) {
-
-        // Receive data
-        zmq::message_t msg;
-        d_socket->recv(&msg);
-
-        // check header for tags...
-        std::string buf(static_cast<char*>(msg.data()), msg.size());
-        if(d_pass_tags){
-          uint64_t rcv_offset;
-          std::vector<gr::tag_t> tags;
-          buf = parse_tag_header(buf, rcv_offset, tags);
-          for(size_t i=0; i<tags.size(); i++){
-            tags[i].offset -= rcv_offset - nitems_written(0);
-            add_item_tag(0, tags[i]);
-          }
-        }
-
-        // Copy to ouput buffer and return
-        if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
-          memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
-          return noutput_items;
+      uint8_t *out = (uint8_t *) output_items[0];
+      bool first = true;
+      int done = 0;
+
+      /* Process as much as we can */
+      while (1)
+      {
+        if (has_pending())
+        {
+          /* Flush anything pending */
+          done += flush_pending(out + (done * d_vsize), noutput_items - done, 
nitems_written(0) + done);
+
+          /* No more space ? */
+          if (done == noutput_items)
+            break;
         }
-        else {
-          memcpy(out, (void *)&buf[0], buf.size());
-          return buf.size()/(d_itemsize*d_vlen);
+        else
+        {
+          /* Try to get the next message */
+          if (!load_message(first))
+            break;  /* No message, we're done for now */
+
+          /* Not the first anymore */
+          first = false;
         }
       }
-      else {
-        return 0; // FIXME: someday when the scheduler does all the 
poll/selects
-      }
+
+      return done;
     }
 
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h
index 7578679..7d8ab53 100644
--- a/gr-zeromq/lib/pull_source_impl.h
+++ b/gr-zeromq/lib/pull_source_impl.h
@@ -26,22 +26,15 @@
 #include <gnuradio/zeromq/pull_source.h>
 #include <zmq.hpp>
 
+#include "base_impl.h"
+
 namespace gr {
   namespace zeromq {
 
-    class pull_source_impl : public pull_source
+    class pull_source_impl : public pull_source, public base_source_impl
     {
-    private:
-      size_t          d_itemsize;
-      size_t          d_vlen;
-      int             d_timeout; // microseconds, -1 is blocking
-      zmq::context_t  *d_context;
-      zmq::socket_t   *d_socket;
-      bool            d_pass_tags;
-
     public:
-      pull_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags);
-      ~pull_source_impl();
+      pull_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags, int hwm);
 
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 7c06dc5..a5aec2c 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -32,38 +32,19 @@ namespace gr {
   namespace zeromq {
 
     push_sink::sptr
-    push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
+    push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm)
     {
       return gnuradio::get_initial_sptr
-        (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+        (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
     }
 
-    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags)
+    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags, int hwm)
       : gr::sync_block("push_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
+        base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags, 
hwm)
     {
-      int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
-
-      if (major < 3) {
-        d_timeout = timeout*1000;
-      }
-
-      d_context = new zmq::context_t(1);
-      d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
-
-      int time = 0;
-      d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
-      d_socket->bind (address);
-    }
-
-    push_sink_impl::~push_sink_impl()
-    {
-      d_socket->close();
-      delete d_socket;
-      delete d_context;
+      /* All is delegated */
     }
 
     int
@@ -71,43 +52,19 @@ namespace gr {
                          gr_vector_const_void_star &input_items,
                          gr_vector_void_star &output_items)
     {
-      const char *in = (const char *) input_items[0];
-
+      // Poll with a timeout (FIXME: scheduler can't wait for us)
       zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
       zmq::poll(&itemsout[0], 1, d_timeout);
 
-      //  If we got a reply, process
-      if (itemsout[0].revents & ZMQ_POLLOUT) {
-
-        // encode the current offset, # tags, and tags into header
-        std::string header("");
-
-        if(d_pass_tags){
-          uint64_t offset = nitems_read(0);
-          std::vector<gr::tag_t> tags;
-          get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
-          header = gen_tag_header(offset, tags);
-        }
+      // If we can send something, do it
+      if (itemsout[0].revents & ZMQ_POLLOUT)
+        return send_message(input_items[0], noutput_items, nitems_read(0));
 
-        // create message copy and send
-        int payloadlen = d_itemsize * d_vlen * noutput_items;
-        int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
-        zmq::message_t msg(msglen);
-
-        if(d_pass_tags){
-          memcpy((void*) msg.data(), header.c_str(), header.length() );
-          memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*noutput_items);
-          } else {
-          memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
-          }
-
-        d_socket->send(msg);
-        return noutput_items;
-      }
-      else {
-        return 0; // FIXME: when scheduler supports return blocking
-      }
+      // If not, do nothing
+      return 0;
     }
 
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 924dee3..0a5de10 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -26,22 +26,15 @@
 #include <gnuradio/zeromq/push_sink.h>
 #include <zmq.hpp>
 
+#include "base_impl.h"
+
 namespace gr {
   namespace zeromq {
 
-    class push_sink_impl : public push_sink
+    class push_sink_impl : public push_sink, public base_sink_impl
     {
-    private:
-      size_t          d_itemsize;
-      size_t          d_vlen;
-      float           d_timeout;
-      zmq::context_t  *d_context;
-      zmq::socket_t   *d_socket;
-      bool            d_pass_tags;
-
     public:
-      push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags);
-      ~push_sink_impl();
+      push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm);
 
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 034a5b0..ac6fc9c 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -32,38 +32,19 @@ namespace gr {
   namespace zeromq {
 
     rep_sink::sptr
-    rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
+    rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm)
     {
       return gnuradio::get_initial_sptr
-        (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+        (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
     }
 
-    rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags)
+    rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout, bool pass_tags, int hwm)
       : gr::sync_block("rep_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
+        base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, 
hwm)
     {
-      int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
-
-      if (major < 3) {
-        d_timeout = timeout*1000;
-      }
-
-      d_context = new zmq::context_t(1);
-      d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
-
-      int time = 0;
-      d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
-      d_socket->bind (address);
-    }
-
-    rep_sink_impl::~rep_sink_impl()
-    {
-      d_socket->close();
-      delete d_socket;
-      delete d_context;
+      /* All is delegated */
     }
 
     int
@@ -71,46 +52,44 @@ namespace gr {
                         gr_vector_const_void_star &input_items,
                         gr_vector_void_star &output_items)
     {
-      const char *in = (const char *) input_items[0];
-
-      zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll(&items[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (items[0].revents & ZMQ_POLLIN) {
-        // receive data request
+      const uint8_t *in = (const uint8_t *) input_items[0];
+      bool first = true;
+      int done = 0;
+
+      /* Process as much as we can */
+      while (1)
+      {
+        /* Wait for a small time (FIXME: scheduler can't wait for us) */
+          /* We only wait if its the first iteration, for the others we'll
+           * let the scheduler retry */
+        zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+        zmq::poll(&items[0], 1, first ? d_timeout : 0);
+
+          /* If we dont have anything, we're done */
+        if (!(items[0].revents & ZMQ_POLLIN))
+          break;
+
+        /* Get and parse the request */
         zmq::message_t request;
         d_socket->recv(&request);
-        int req_output_items = *(static_cast<int*>(request.data()));
-        int nitems_send = std::min(noutput_items, req_output_items);
 
-        // encode the current offset, # tags, and tags into header
-        std::string header("");
-        if(d_pass_tags){
-          uint64_t offset = nitems_read(0);
-          std::vector<gr::tag_t> tags;
-          get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
-          header = gen_tag_header( offset, tags );
+        int nitems_send = noutput_items - done;
+        if (request.size() >= sizeof(uint32_t))
+        {
+          int req = (int)*(static_cast<uint32_t*>(request.data()));
+          nitems_send = std::min(nitems_send, req);
         }
 
+        /* Delegate the actual send */
+        done += send_message(in + (done * d_vsize), nitems_send, 
nitems_read(0) + done);
 
-        // create message copy and send
-        int payloadlen = d_itemsize * d_vlen * noutput_items;
-        int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
-        zmq::message_t msg(msglen);
-
-        if(d_pass_tags){
-          memcpy((void*) msg.data(), header.c_str(), header.length() );
-          memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*noutput_items);
-          } else {
-          memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
-          }
-        d_socket->send(msg);
-
-        return nitems_send;
+        /* Not the first anymore */
+        first = false;
       }
 
-      return 0;
+      return done;
     }
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index 55ebb69..012fc45 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -26,22 +26,15 @@
 #include <gnuradio/zeromq/rep_sink.h>
 #include <zmq.hpp>
 
+#include "base_impl.h"
+
 namespace gr {
   namespace zeromq {
 
-    class rep_sink_impl : public rep_sink
+    class rep_sink_impl : public rep_sink, public base_sink_impl
     {
-    private:
-      size_t          d_itemsize;
-      size_t          d_vlen;
-      int             d_timeout;
-      zmq::context_t  *d_context;
-      zmq::socket_t   *d_socket;
-      bool            d_pass_tags;
-
     public:
-      rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags);
-      ~rep_sink_impl();
+      rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm);
 
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index f69d447..5267363 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -32,38 +32,20 @@ namespace gr {
   namespace zeromq {
 
     req_source::sptr
-    req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
+    req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm)
     {
       return gnuradio::get_initial_sptr
-        (new req_source_impl(itemsize, vlen, address, timeout, pass_tags));
+        (new req_source_impl(itemsize, vlen, address, timeout, pass_tags, 
hwm));
     }
 
-    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags)
+    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags, int hwm)
       : gr::sync_block("req_source",
                        gr::io_signature::make(0, 0, 0),
                        gr::io_signature::make(1, 1, itemsize * vlen)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
+        base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, 
hwm),
+        d_req_pending(false)
     {
-      int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
-
-      if (major < 3) {
-        d_timeout = timeout*1000;
-      }
-
-      d_context = new zmq::context_t(1);
-      d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
-
-      int time = 0;
-      d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
-      d_socket->connect (address);
-    }
-
-    req_source_impl::~req_source_impl()
-    {
-      d_socket->close();
-      delete d_socket;
-      delete d_context;
+      /* All is delegated */
     }
 
     int
@@ -71,49 +53,56 @@ namespace gr {
                           gr_vector_const_void_star &input_items,
                           gr_vector_void_star &output_items)
     {
-      char *out = (char*)output_items[0];
+#if 0
+#endif
+      uint8_t *out = (uint8_t *) output_items[0];
+      bool first = true;
+      int done = 0;
+
+      /* Process as much as we can */
+      while (1)
+      {
+        if (has_pending())
+        {
+          /* Flush anything pending */
+          done += flush_pending(out + (done * d_vsize), noutput_items - done, 
nitems_written(0) + done);
+
+          /* No more space ? */
+          if (done == noutput_items)
+            break;
+        }
+        else
+        {
+          /* Send request if needed */
+          if (!d_req_pending)
+          {
+            /* The REP/REQ pattern state machine guarantees we can send at 
this point */
+            uint32_t req_len = noutput_items - done;
+            zmq::message_t request(sizeof(uint32_t));
+            memcpy ((void *) request.data (), &req_len, sizeof(uint32_t));
+            d_socket->send(request);
+
+            d_req_pending = true;
+          }
 
-      zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
-      zmq::poll (&itemsout[0], 1, d_timeout);
+          /* Try to get the next message */
+          if (!load_message(first))
+            break;  /* No message, we're done for now */
 
-      //  If we got a reply, process
-      if (itemsout[0].revents & ZMQ_POLLOUT) {
-        // Request data, FIXME non portable?
-        zmq::message_t request(sizeof(int));
-        memcpy ((void *) request.data (), &noutput_items, sizeof(int));
-        d_socket->send(request);
-      }
+          /* Got response */
+          d_req_pending = false;
 
-      zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll(&itemsin[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (itemsin[0].revents & ZMQ_POLLIN) {
-        // Receive data
-        zmq::message_t reply;
-        d_socket->recv(&reply);
-
-        // Deserialize header data / tags
-        std::string buf(static_cast<char*>(reply.data()), reply.size());
-
-        if(d_pass_tags){
-          uint64_t rcv_offset;
-          std::vector<gr::tag_t> tags;
-          buf = parse_tag_header(buf, rcv_offset, tags);
-          for(size_t i=0; i<tags.size(); i++){
-            tags[i].offset -= rcv_offset - nitems_written(0);
-            add_item_tag(0, tags[i]);
-          }
+          /* Not the first anymore */
+          first = false;
         }
-
-
-        // Copy to ouput buffer and return
-        memcpy(out, (void *)&buf[0], buf.size());
-        return buf.size()/(d_itemsize*d_vlen);
       }
 
+      return done;
+
       return 0;
     }
 
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index 7c6bc53..8bdbd33 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -26,26 +26,22 @@
 #include <gnuradio/zeromq/req_source.h>
 #include <zmq.hpp>
 
+#include "base_impl.h"
+
 namespace gr {
   namespace zeromq {
 
-    class req_source_impl : public req_source
+    class req_source_impl : public req_source, public base_source_impl
     {
-    private:
-      size_t          d_itemsize;
-      size_t          d_vlen;
-      int             d_timeout;
-      zmq::context_t  *d_context;
-      zmq::socket_t   *d_socket;
-      bool            d_pass_tags;
-
     public:
-      req_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags);
-      ~req_source_impl();
+      req_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags, int hwm);
 
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
                gr_vector_void_star &output_items);
+
+    private:
+      bool d_req_pending;
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 1242688..9a2e0bf 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -32,40 +32,20 @@ namespace gr {
   namespace zeromq {
 
     sub_source::sptr
-    sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags)
+    sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool pass_tags, int hwm)
     {
       return gnuradio::get_initial_sptr
-        (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags));
+        (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, 
hwm));
     }
 
-    sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags)
+    sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool pass_tags, int hwm)
       : gr::sync_block("sub_source",
                        gr::io_signature::make(0, 0, 0),
                        gr::io_signature::make(1, 1, itemsize * vlen)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_pass_tags(pass_tags)
+        base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, 
hwm)
     {
-      int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
-
-      if (major < 3) {
-        d_timeout = timeout*1000;
-      }
-
-      d_context = new zmq::context_t(1);
-      d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
-
+      /* Subscribe */
       d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
-      d_socket->connect (address);
-    }
-
-    /*
-     * Our virtual destructor.
-     */
-    sub_source_impl::~sub_source_impl()
-    {
-      d_socket->close();
-      delete d_socket;
-      delete d_context;
     }
 
     int
@@ -73,47 +53,37 @@ namespace gr {
                            gr_vector_const_void_star &input_items,
                            gr_vector_void_star &output_items)
     {
-      char *out = (char*)output_items[0];
-
-      zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&items[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (items[0].revents & ZMQ_POLLIN) {
-
-        // Receive data
-        zmq::message_t msg;
-        d_socket->recv(&msg);
-
-        // Deserialize header data / tags
-        std::string buf(static_cast<char*>(msg.data()), msg.size());
-
-        if(d_pass_tags){
-          uint64_t rcv_offset;
-          std::vector<gr::tag_t> tags;
-
-          buf = parse_tag_header(buf, rcv_offset, tags);
-
-          for(size_t i=0; i<tags.size(); i++) {
-            tags[i].offset -= rcv_offset - nitems_written(0);
-            add_item_tag(0, tags[i]);
-          }
+      uint8_t *out = (uint8_t *) output_items[0];
+      bool first = true;
+      int done = 0;
+
+      /* Process as much as we can */
+      while (1)
+      {
+        if (has_pending())
+        {
+          /* Flush anything pending */
+          done += flush_pending(out + (done * d_vsize), noutput_items - done, 
nitems_written(0) + done);
+
+          /* No more space ? */
+          if (done == noutput_items)
+            break;
         }
-
-        // Copy to ouput buffer and return
-        if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
-          memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
-          return noutput_items;
+        else
+        {
+          /* Try to get the next message */
+          if (!load_message(first))
+            break;  /* No message, we're done for now */
+
+          /* Not the first anymore */
+          first = false;
         }
-        else {
-          memcpy(out, (void *)&buf[0], buf.size());
-          return buf.size()/(d_itemsize*d_vlen);
-        }
-      }
-      else {
-        return 0; // FIXME: someday when the scheduler does all the 
poll/selects
       }
+
+      return done;
     }
 
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 0fa8d17..8f82a9a 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -24,28 +24,21 @@
 #define INCLUDED_ZEROMQ_SUB_SOURCE_IMPL_H
 
 #include <gnuradio/zeromq/sub_source.h>
-#include "zmq.hpp"
+#include <zmq.hpp>
+
+#include "base_impl.h"
 
 namespace gr {
   namespace zeromq {
 
-    class sub_source_impl : public sub_source
+    class sub_source_impl : public sub_source,  public base_source_impl
     {
-     private:
-      size_t          d_itemsize;
-      size_t          d_vlen;
-      int             d_timeout; // microseconds, -1 is blocking
-      zmq::context_t  *d_context;
-      zmq::socket_t   *d_socket;
-      bool            d_pass_tags;
-
-     public:
-      sub_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags);
-      ~sub_source_impl();
+    public:
+      sub_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool pass_tags, int hwm);
 
       int work(int noutput_items,
-                  gr_vector_const_void_star &input_items,
-                  gr_vector_void_star &output_items);
+               gr_vector_const_void_star &input_items,
+               gr_vector_void_star &output_items);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/tag_headers.cc b/gr-zeromq/lib/tag_headers.cc
index c970666..5a5a417 100644
--- a/gr-zeromq/lib/tag_headers.cc
+++ b/gr-zeromq/lib/tag_headers.cc
@@ -24,78 +24,88 @@
 #include <gnuradio/block.h>
 #include <sstream>
 #include <cstring>
+#include <zmq.hpp>
 
-#define GR_HEADER_MAGIC 0x5FF0
+#define GR_HEADER_MAGIC   0x5FF0
 #define GR_HEADER_VERSION 0x01
 
 namespace gr {
   namespace zeromq {
 
+    struct membuf: std::streambuf
+    {
+      membuf(void *b, size_t len)
+      {
+        char *bc = static_cast<char*>(b);
+        this->setg(bc, bc, bc+len);
+      }
+    };
+
     std::string 
-    gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags) {
+    gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags)
+    {
+      std::stringbuf sb("");
+      std::ostream ss(&sb);
 
       uint16_t header_magic = GR_HEADER_MAGIC;
       uint8_t  header_version = GR_HEADER_VERSION;
+      uint64_t ntags = (uint64_t)tags.size();
 
-      std::stringstream ss;
-      size_t ntags = tags.size();
-      ss.write( reinterpret_cast< const char* >( &header_magic ), 
sizeof(uint16_t) );
-      ss.write( reinterpret_cast< const char* >( &header_version ), 
sizeof(uint8_t) );
-    
-      ss.write( reinterpret_cast< const char* >( &offset ), sizeof(uint64_t) 
);  // offset
-      ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) );   
   // num tags
-      std::stringbuf sb("");
-      //std::cout << "TX TAGS: (offset="<<offset<<" ntags="<<ntags<<")\n";
-      for(size_t i=0; i<tags.size(); i++){
-        //std::cout << "TX TAG: (" << tags[i].offset << ", " << tags[i].key << 
", " << tags[i].value << ", " << tags[i].srcid << ")\n";
-        ss.write( reinterpret_cast< const char* >( &tags[i].offset ), 
sizeof(uint64_t) );   // offset
-        sb.str("");
-        pmt::serialize( tags[i].key, sb );                                     
      // key
-        pmt::serialize( tags[i].value, sb );                                   
      // value
-        pmt::serialize( tags[i].srcid, sb );                                   
      // srcid
-        ss.write( sb.str().c_str() , sb.str().length() );
+      ss.write( (const char*)&header_magic,   sizeof(uint16_t) );
+      ss.write( (const char*)&header_version, sizeof(uint8_t) );
+      ss.write( (const char*)&offset,         sizeof(uint64_t) );
+      ss.write( (const char*)&ntags,          sizeof(uint64_t) );
+
+      for(size_t i=0; i<tags.size(); i++)
+      {
+        ss.write( (const char *)&tags[i].offset, sizeof(uint64_t) );
+        pmt::serialize( tags[i].key, sb );
+        pmt::serialize( tags[i].value, sb );
+        pmt::serialize( tags[i].srcid, sb );
       }
 
-      return ss.str();
+      return sb.str();
     }
 
-    std::string
-    parse_tag_header(std::string &buf_in, uint64_t &offset_out, 
std::vector<gr::tag_t> &tags_out) {
+    size_t
+    parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, 
std::vector<gr::tag_t> &tags_out)
+    {
+      membuf sb(msg.data(), msg.size());
+      std::istream iss(&sb);
 
-      std::istringstream iss( buf_in );
-      size_t   rcv_ntags;
+      size_t min_len = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + 
sizeof(uint64_t);
+      if (msg.size() < min_len)
+        throw std::runtime_error("incoming zmq msg too small to hold gr tag 
header!");
 
       uint16_t header_magic;
-      uint8_t header_version;
+      uint8_t  header_version;
+      uint64_t rcv_ntags;
+
+      iss.read( (char*)&header_magic,   sizeof(uint16_t) );
+      iss.read( (char*)&header_version, sizeof(uint8_t) );
 
-      iss.read( (char*)&header_magic, sizeof(uint16_t ) );
-      iss.read( (char*)&header_version, sizeof(uint8_t ) );
-      if(header_magic != GR_HEADER_MAGIC){
+      if(header_magic != GR_HEADER_MAGIC)
         throw std::runtime_error("gr header magic does not match!");
-        }
-      if(header_version != 1){
+
+      if(header_version != 1)
         throw std::runtime_error("gr header version too high!");
-        }
 
-      iss.read( (char*)&offset_out, sizeof(uint64_t ) );
-      iss.read( (char*)&rcv_ntags,  sizeof(size_t   ) );
-      //std::cout << "RX TAGS: (offset="<<offset_out<<" 
ntags="<<rcv_ntags<<")\n";
-      int rd_offset = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + 
sizeof(size_t);
-      std::stringbuf sb( iss.str().substr(rd_offset) );
+      iss.read( (char*)&offset_out, sizeof(uint64_t) );
+      iss.read( (char*)&rcv_ntags,  sizeof(uint64_t) );
 
-      for(size_t i=0; i<rcv_ntags; i++){
-        gr::tag_t newtag;       
-        sb.sgetn( (char*) &(newtag.offset), sizeof(uint64_t) );
+      for(size_t i=0; i<rcv_ntags; i++)
+      {
+        gr::tag_t newtag;
+        sb.sgetn( (char*)&(newtag.offset), sizeof(uint64_t) );
         newtag.key   = pmt::deserialize( sb );
         newtag.value = pmt::deserialize( sb );
         newtag.srcid = pmt::deserialize( sb );
-        //std::cout << "RX TAG: (" << newtag.offset << ", " << newtag.key << 
", " << newtag.value << ", " << newtag.srcid << ")\n";
         tags_out.push_back(newtag);
-        iss.str(sb.str());
       }
 
-      int ndata = sb.in_avail();
-      return iss.str().substr(iss.str().size() - ndata);
+      return msg.size() - sb.in_avail();
     }
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h
index 4c7a812..dede5e9 100644
--- a/gr-zeromq/lib/tag_headers.h
+++ b/gr-zeromq/lib/tag_headers.h
@@ -27,12 +27,13 @@
 #include <gnuradio/block.h>
 #include <sstream>
 #include <cstring>
+#include <zmq.hpp>
 
 namespace gr {
   namespace zeromq {
 
-    std::string gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags);
-    std::string parse_tag_header(std::string &buf_in, uint64_t &offset_out, 
std::vector<gr::tag_t> &tags_out);
+    std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags);
+    size_t parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, 
std::vector<gr::tag_t> &tags_out);
     
   } /* namespace zeromq */
 } /* namespace gr */



reply via email to

[Prev in Thread] Current Thread [Next in Thread]