commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 21/29: zeromq: cleanup and convert sub_msg_


From: git
Subject: [Commit-gnuradio] [gnuradio] 21/29: zeromq: cleanup and convert sub_msg_source to derive from gr::block
Date: Tue, 13 Jan 2015 01:04:28 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 9bf7123e477772bcb5fc53d3139e75c4d63a044a
Author: Johnathan Corgan <address@hidden>
Date:   Mon Jan 12 13:46:33 2015 -0800

    zeromq: cleanup and convert sub_msg_source to derive from gr::block
---
 gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h |  8 +--
 gr-zeromq/lib/sub_msg_source_impl.cc               | 65 ++++++++++------------
 gr-zeromq/lib/sub_msg_source_impl.h                |  9 ++-
 3 files changed, 37 insertions(+), 45 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h 
b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
index 3af8fdb..2e3fc7b 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
@@ -30,14 +30,14 @@ namespace gr {
   namespace zeromq {
 
     /*!
-     * \brief Receive messages on ZMQ SUB socket and source stream
+     * \brief Receive messages on ZMQ SUB socket and output async messages
      * \ingroup zeromq
      *
      * \details
-     * This block will connect to a ZMQ PUB socket, then produce all
-     * incoming messages as streaming output.
+     * This block will connect to a ZMQ PUB socket, then convert them
+     * to outgoing async messages
      */
-    class ZEROMQ_API sub_msg_source : virtual public gr::sync_block
+    class ZEROMQ_API sub_msg_source : virtual public gr::block
     {
     public:
       typedef boost::shared_ptr<sub_msg_source> sptr;
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc 
b/gr-zeromq/lib/sub_msg_source_impl.cc
index f7a9bc9..b016405 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.cc
+++ b/gr-zeromq/lib/sub_msg_source_impl.cc
@@ -39,19 +39,21 @@ namespace gr {
     }
 
     sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout)
-      : gr::sync_block("sub_msg_source",
-                       gr::io_signature::make(0, 0, 0),
-                       gr::io_signature::make(0, 0, 0)),
+      : gr::block("sub_msg_source",
+                  gr::io_signature::make(0, 0, 0),
+                  gr::io_signature::make(0, 0, 0)),
         d_timeout(timeout)
     {
       int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
+      zmq::version(&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
-      //int time = 0;
+
       d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
       d_socket->connect (address);
 
@@ -65,52 +67,43 @@ namespace gr {
       delete d_context;
     }
 
-    bool sub_msg_source_impl::start(){
+    bool sub_msg_source_impl::start()
+    {
       d_finished = false;
-      d_thread = new boost::thread( boost::bind( 
&sub_msg_source_impl::readloop , this ) );
+      d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, 
this));
       return true;
     }
 
-    bool sub_msg_source_impl::stop(){
+    bool sub_msg_source_impl::stop()
+    {
       d_finished = true;
-      d_thread->join(); 
+      d_thread->join();
       return true;
     }
 
-    void sub_msg_source_impl::readloop(){
+    void sub_msg_source_impl::readloop()
+    {
       while(!d_finished){
-        //std::cout << "readloop\n";
-      
+
         zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-        zmq::poll (&items[0], 1, d_timeout);
+        zmq::poll(&items[0], 1, d_timeout);
 
         //  If we got a reply, process
         if (items[0].revents & ZMQ_POLLIN) {
 
-            // Receive data
-            zmq::message_t msg;
-            d_socket->recv(&msg);
-        
-            //std::cout << "got msg...\n";
-
-            std::string buf(static_cast<char*>(msg.data()), msg.size());
-            std::stringbuf sb(buf);
-            pmt::pmt_t m = pmt::deserialize(sb);
-            //std::cout << m << "\n";
-            message_port_pub(pmt::mp("out"), m);
-
-          } else {
-            usleep(100);
-          }
-        } 
-    }
+          // Receive data
+          zmq::message_t msg;
+          d_socket->recv(&msg);
 
-    int
-    sub_msg_source_impl::work(int noutput_items,
-                           gr_vector_const_void_star &input_items,
-                           gr_vector_void_star &output_items)
-    {
-        return noutput_items;
+          std::string buf(static_cast<char*>(msg.data()), msg.size());
+          std::stringbuf sb(buf);
+          pmt::pmt_t m = pmt::deserialize(sb);
+
+          message_port_pub(pmt::mp("out"), m);
+        } else {
+          usleep(100);
+        }
+      }
     }
 
   } /* namespace zeromq */
diff --git a/gr-zeromq/lib/sub_msg_source_impl.h 
b/gr-zeromq/lib/sub_msg_source_impl.h
index 424eb47..09bede9 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.h
+++ b/gr-zeromq/lib/sub_msg_source_impl.h
@@ -35,19 +35,18 @@ namespace gr {
       int             d_timeout; // microseconds, -1 is blocking
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      boost::thread   *d_thread;
+
       void readloop();
-      boost::thread     *d_thread;
 
      public:
+      bool d_finished;
+
       sub_msg_source_impl(char *address, int timeout);
       ~sub_msg_source_impl();
 
       bool start();
       bool stop();
-      bool d_finished;
-      int work(int noutput_items,
-                  gr_vector_const_void_star &input_items,
-                  gr_vector_void_star &output_items);
     };
 
   } // namespace zeromq



reply via email to

[Prev in Thread] Current Thread [Next in Thread]