commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_ms


From: git
Subject: [Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_msg_sink to derive from gr::block
Date: Tue, 13 Jan 2015 01:04:29 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 7bf8b05bde26bfb1e2df684e2ab55b878b25350c
Author: Johnathan Corgan <address@hidden>
Date:   Mon Jan 12 15:50:00 2015 -0800

    zeromq: cleanup and converted rep_msg_sink to derive from gr::block
---
 gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h |  14 +--
 gr-zeromq/lib/rep_msg_sink_impl.cc               | 111 ++++++++++-------------
 gr-zeromq/lib/rep_msg_sink_impl.h                |   4 +-
 3 files changed, 54 insertions(+), 75 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
index b0fd23c..97f3d83 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
@@ -24,22 +24,22 @@
 #define INCLUDED_ZEROMQ_REP_MSG_SINK_H
 
 #include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
 
 namespace gr {
   namespace zeromq {
 
     /*!
-     * \brief Sink the contents of a stream to a ZMQ REP socket
+     * \brief Sink the contents of a msg port to a ZMQ REP socket
      * \ingroup zeromq
      *
      * \details
-     * This block acts a a streaming sink for a GNU Radio flowgraph
-     * and writes its contents to a ZMQ REP socket.  A REP socket will
-     * only send its contents to an attached REQ socket when it
-     * requests items.
+     * This block acts a message port receiver and writes individual
+     * messages to a ZMQ REP socket.  The corresponding receiving ZMQ
+     * REQ socket can be either another gr-zeromq source block or a
+     * non-GNU Radio ZMQ socket.
      */
-    class ZEROMQ_API rep_msg_sink : virtual public gr::sync_block
+    class ZEROMQ_API rep_msg_sink : virtual public gr::block
     {
     public:
       typedef boost::shared_ptr<rep_msg_sink> sptr;
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc 
b/gr-zeromq/lib/rep_msg_sink_impl.cc
index 0a18a8b..be86f83 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -39,25 +39,26 @@ namespace gr {
     }
 
     rep_msg_sink_impl::rep_msg_sink_impl(char *address, int timeout)
-      : gr::sync_block("rep_msg_sink",
-                       gr::io_signature::make(0, 0, 0),
-                       gr::io_signature::make(0, 0, 0)),
-       d_timeout(timeout)
+      : gr::block("rep_msg_sink",
+                  gr::io_signature::make(0, 0, 0),
+                  gr::io_signature::make(0, 0, 0)),
+        d_timeout(timeout)
     {
       int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
+      zmq::version(&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
       d_socket->bind (address);
 
       message_port_register_in(pmt::mp("in"));
-//      set_msg_handler( pmt::mp("in"), 
-//        boost::bind(&rep_msg_sink_impl::handler, this, _1));
     }
 
     rep_msg_sink_impl::~rep_msg_sink_impl()
@@ -67,75 +68,55 @@ namespace gr {
       delete d_context;
     }
 
-    bool rep_msg_sink_impl::start(){
+    bool rep_msg_sink_impl::start()
+    {
       d_finished = false;
-      d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop 
, this ) );
+      d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop, 
this));
       return true;
     }
 
-    bool rep_msg_sink_impl::stop(){
+    bool rep_msg_sink_impl::stop()
+    {
       d_finished = true;
       d_thread->join();
       return true;
     }
 
-/*
-    void rep_msg_sink_impl::handler(pmt::pmt_t msg){
-      std::stringbuf sb("");
-      pmt::serialize( msg, sb );
-      std::string s = sb.str();
-      zmq::message_t zmsg(s.size());
-      memcpy( zmsg.data(), s.c_str(), s.size() );
-      d_socket->send(zmsg);
-    }
-*/
-
-    int
-    rep_msg_sink_impl::work(int noutput_items,
-                        gr_vector_const_void_star &input_items,
-                        gr_vector_void_star &output_items)
+    void rep_msg_sink_impl::readloop()
     {
-        return noutput_items;
+      while(!d_finished) {
+
+        // while we have data, wait for query...
+        while(!empty_p(pmt::mp("in"))) {
+
+          // wait for query...
+          zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+          zmq::poll (&items[0], 1, d_timeout);
+
+          //  If we got a reply, process
+          if (items[0].revents & ZMQ_POLLIN) {
+
+            // receive data request
+            zmq::message_t request;
+            d_socket->recv(&request);
+
+            int req_output_items = *(static_cast<int*>(request.data()));
+            if(req_output_items != 1)
+              throw std::runtime_error("Request was not 1 msg for rep/req 
request!!");
+
+            // create message copy and send
+            pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
+            std::stringbuf sb("");
+            pmt::serialize( msg, sb );
+            std::string s = sb.str();
+            zmq::message_t zmsg(s.size());
+            memcpy( zmsg.data(), s.c_str(), s.size() );
+            d_socket->send(zmsg);
+          } // if req
+        } // while !empty
+
+      } // while !d_finished
     }
 
-    void rep_msg_sink_impl::readloop(){
-
-     while(!d_finished){
-
-      // while we have data, wait for query...
-      while(!empty_p(pmt::mp("in"))){
-
-      //std::cout << "wait for req ...\n";
-      // wait for query...
-      zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&items[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (items[0].revents & ZMQ_POLLIN) {
-        //std::cout << "wait for req ... got req\n";
-        // receive data request
-        zmq::message_t request;
-        d_socket->recv(&request);
-        int req_output_items = *(static_cast<int*>(request.data()));
-        if(req_output_items != 1)
-            throw std::runtime_error("Request was not 1 msg for rep/req 
request!!");
-        
-        // create message copy and send
-        //std::cout << "get pmt in\n";
-        pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
-        std::stringbuf sb("");
-        pmt::serialize( msg, sb );
-        std::string s = sb.str();
-        zmq::message_t zmsg(s.size());
-        memcpy( zmsg.data(), s.c_str(), s.size() );
-        //std::cout << "send pmt zmq\n";
-        d_socket->send(zmsg);
-      } // if req
-
-     } // while !empty
-
-     } // while !d_finished
-
-    }
   } /* namespace zeromq */
 } /* namespace gr */
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h 
b/gr-zeromq/lib/rep_msg_sink_impl.h
index 25bd0e8..d37a409 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.h
+++ b/gr-zeromq/lib/rep_msg_sink_impl.h
@@ -37,15 +37,13 @@ namespace gr {
       zmq::socket_t   *d_socket;
       boost::thread   *d_thread;
       bool            d_finished;
+
       void            readloop();
 
     public:
       rep_msg_sink_impl(char *address, int timeout);
       ~rep_msg_sink_impl();
 
-      int work(int noutput_items,
-               gr_vector_const_void_star &input_items,
-               gr_vector_void_star &output_items);
       bool start();
       bool stop();
     };



reply via email to

[Prev in Thread] Current Thread [Next in Thread]