commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 22/29: zeromq: cleanup and converted push_m


From: git
Subject: [Commit-gnuradio] [gnuradio] 22/29: zeromq: cleanup and converted push_msg_sink to derive from gr::block
Date: Tue, 13 Jan 2015 01:04:29 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 7b77431d3a4971ac62c6e71a813151404c1ae5e3
Author: Johnathan Corgan <address@hidden>
Date:   Mon Jan 12 14:03:03 2015 -0800

    zeromq: cleanup and converted push_msg_sink to derive from gr::block
---
 gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h   |  2 +-
 gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h  | 14 +++--
 gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h |  2 +-
 gr-zeromq/lib/push_msg_sink_impl.cc                | 60 +++++-----------------
 gr-zeromq/lib/push_msg_sink_impl.h                 |  3 --
 5 files changed, 21 insertions(+), 60 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h
index ffcb475..8cf4bcf 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h
@@ -24,7 +24,7 @@
 #define INCLUDED_ZEROMQ_PUB_MSG_SINK_H
 
 #include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
 
 namespace gr {
   namespace zeromq {
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h
index 27a19b2..3ce6ebb 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h
@@ -24,24 +24,22 @@
 #define INCLUDED_ZEROMQ_PUSH_MSG_SINK_H
 
 #include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
 
 namespace gr {
   namespace zeromq {
 
     /*!
-     * \brief Sink the contents of a stream to a ZMQ PUSH socket
+     * \brief Sink the contents of a msg port to a ZMQ PUSH socket
      * \ingroup zeromq
      *
      * \details
-     * This block acts a a streaming sink for a GNU Radio flowgraph
-     * and writes its contents to a ZMQ PUSH socket.  A PUSH socket
-     * will round-robin send its messages to each connected ZMQ PULL
-     * socket, either another gr-zeromq source block or a regular,
+     * This block acts a message port receiver and writes individual
+     * messages to a ZMQ PUSH socket.  The corresponding receiving ZMQ
+     * PULL socket can be either another gr-zeromq source block or a
      * non-GNU Radio ZMQ socket.
-     *
      */
-    class ZEROMQ_API push_msg_sink : virtual public gr::sync_block
+    class ZEROMQ_API push_msg_sink : virtual public gr::block
     {
     public:
       typedef boost::shared_ptr<push_msg_sink> sptr;
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h 
b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
index 2e3fc7b..d06a83c 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
@@ -24,7 +24,7 @@
 #define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H
 
 #include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
 
 namespace gr {
   namespace zeromq {
diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc 
b/gr-zeromq/lib/push_msg_sink_impl.cc
index 6266cd6..e9cc5bc 100644
--- a/gr-zeromq/lib/push_msg_sink_impl.cc
+++ b/gr-zeromq/lib/push_msg_sink_impl.cc
@@ -39,25 +39,28 @@ namespace gr {
     }
 
     push_msg_sink_impl::push_msg_sink_impl(char *address, int timeout)
-      : gr::sync_block("push_msg_sink",
-                       gr::io_signature::make(0, 0, 0),
-                       gr::io_signature::make(0, 0, 0)),
+      : gr::block("push_msg_sink",
+                  gr::io_signature::make(0, 0, 0),
+                  gr::io_signature::make(0, 0, 0)),
         d_timeout(timeout)
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
-      d_socket->bind (address);
+      d_socket->bind(address);
 
       message_port_register_in(pmt::mp("in"));
-      set_msg_handler( pmt::mp("in"), 
-        boost::bind(&push_msg_sink_impl::handler, this, _1));
+      set_msg_handler(pmt::mp("in"),
+                      boost::bind(&push_msg_sink_impl::handler, this, _1));
     }
 
     push_msg_sink_impl::~push_msg_sink_impl()
@@ -67,52 +70,15 @@ namespace gr {
       delete d_context;
     }
 
-    void push_msg_sink_impl::handler(pmt::pmt_t msg){
+    void push_msg_sink_impl::handler(pmt::pmt_t msg)
+    {
       std::stringbuf sb("");
       pmt::serialize( msg, sb );
       std::string s = sb.str();
       zmq::message_t zmsg(s.size());
-      memcpy( zmsg.data(), s.c_str(), s.size() );
-      d_socket->send(zmsg);
-    }
-
-    int
-    push_msg_sink_impl::work(int noutput_items,
-                         gr_vector_const_void_star &input_items,
-                         gr_vector_void_star &output_items)
-    {
-      return noutput_items;
 
-/*      const char *in = (const char *) input_items[0];
-
-      zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
-      zmq::poll (&itemsout[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (itemsout[0].revents & ZMQ_POLLOUT) {
-
-      // encode the current offset, # tags, and tags into header
-      std::string header("");
-    
-      if(d_pass_tags){
-        uint64_t offset = nitems_read(0);
-        std::vector<gr::tag_t> tags;
-        get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
-        header = gen_tag_header( offset, tags );
-        }
-
-      // create message copy and send
-      zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
-      if(d_pass_tags)
-        memcpy((void*) msg.data(), header.c_str(), header.length() );
-      memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*noutput_items);
-      d_socket->send(msg);
-
-        return noutput_items;
-      }
-      else {
-        return 0;
-      }*/
+      memcpy(zmsg.data(), s.c_str(), s.size());
+      d_socket->send(zmsg);
     }
 
   } /* namespace zeromq */
diff --git a/gr-zeromq/lib/push_msg_sink_impl.h 
b/gr-zeromq/lib/push_msg_sink_impl.h
index b77c998..d669d32 100644
--- a/gr-zeromq/lib/push_msg_sink_impl.h
+++ b/gr-zeromq/lib/push_msg_sink_impl.h
@@ -40,9 +40,6 @@ namespace gr {
       push_msg_sink_impl(char *address, int timeout);
       ~push_msg_sink_impl();
 
-      int work(int noutput_items,
-               gr_vector_const_void_star &input_items,
-               gr_vector_void_star &output_items);
       void handler(pmt::pmt_t msg);
     };
 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]