commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_so


From: git
Subject: [Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_source derive from gr::block
Date: Tue, 13 Jan 2015 01:04:29 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 3a443cfe307c4f2c5f03806146d4ad1664faa288
Author: Johnathan Corgan <address@hidden>
Date:   Mon Jan 12 14:24:48 2015 -0800

    zeromq: cleanup and made pull_msg_source derive from gr::block
---
 .../include/gnuradio/zeromq/pull_msg_source.h      | 10 ++--
 gr-zeromq/lib/pull_msg_source_impl.cc              | 59 ++++++++++------------
 gr-zeromq/lib/pull_msg_source_impl.h               |  9 ++--
 3 files changed, 35 insertions(+), 43 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h 
b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
index 7d87ba1..1749515 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
@@ -24,20 +24,20 @@
 #define INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H
 
 #include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
 
 namespace gr {
   namespace zeromq {
 
     /*!
-     * \brief Receive messages on ZMQ PULL socket and source stream
+     * \brief Receive messages on ZMQ PULL socket and output async messages
      * \ingroup zeromq
      *
      * \details
-     * This block will connect to a ZMQ PUSH socket, then produce all
-     * incoming messages as streaming output.
+     * This block will connect to a ZMQ PUSH socket, then convert
+     * received messages to outgoing async messages.
      */
-    class ZEROMQ_API pull_msg_source : virtual public gr::sync_block
+    class ZEROMQ_API pull_msg_source : virtual public gr::block
     {
     public:
       typedef boost::shared_ptr<pull_msg_source> sptr;
diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc 
b/gr-zeromq/lib/pull_msg_source_impl.cc
index 0c848fc..ca496ef 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.cc
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -39,18 +39,21 @@ namespace gr {
     }
 
     pull_msg_source_impl::pull_msg_source_impl(char *address, int timeout)
-      : gr::sync_block("pull_msg_source",
-                       gr::io_signature::make(0, 0, 0),
-                       gr::io_signature::make(0, 0, 0)),
+      : gr::block("pull_msg_source",
+                  gr::io_signature::make(0, 0, 0),
+                  gr::io_signature::make(0, 0, 0)),
         d_timeout(timeout)
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
       d_socket->connect (address);
@@ -65,53 +68,43 @@ namespace gr {
       delete d_context;
     }
 
-    bool pull_msg_source_impl::start(){
+    bool pull_msg_source_impl::start()
+    {
       d_finished = false;
-      d_thread = new boost::thread( boost::bind( 
&pull_msg_source_impl::readloop , this ) );
+      d_thread = new 
boost::thread(boost::bind(&pull_msg_source_impl::readloop, this));
       return true;
     }
 
-    bool pull_msg_source_impl::stop(){
+    bool pull_msg_source_impl::stop()
+    {
       d_finished = true;
-      d_thread->join(); 
+      d_thread->join();
       return true;
     }
 
-    void pull_msg_source_impl::readloop(){
+    void pull_msg_source_impl::readloop()
+    {
       while(!d_finished){
-        //std::cout << "readloop\n";
-      
+
         zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
         zmq::poll (&items[0], 1, d_timeout);
 
         //  If we got a reply, process
         if (items[0].revents & ZMQ_POLLIN) {
 
-            // Receive data
-            zmq::message_t msg;
-            d_socket->recv(&msg);
-        
-            //std::cout << "got msg...\n";
-
-            std::string buf(static_cast<char*>(msg.data()), msg.size());
-            std::stringbuf sb(buf);
-            pmt::pmt_t m = pmt::deserialize(sb);
-            //std::cout << m << "\n";
-            message_port_pub(pmt::mp("out"), m);
-
-          } else {
-            usleep(100);
-          }
-        } 
-    }
+          // Receive data
+          zmq::message_t msg;
+          d_socket->recv(&msg);
 
+          std::string buf(static_cast<char*>(msg.data()), msg.size());
+          std::stringbuf sb(buf);
+          pmt::pmt_t m = pmt::deserialize(sb);
+          message_port_pub(pmt::mp("out"), m);
 
-    int
-    pull_msg_source_impl::work(int noutput_items,
-                           gr_vector_const_void_star &input_items,
-                           gr_vector_void_star &output_items)
-    {
-    return noutput_items;
+        } else {
+          usleep(100);
+        }
+      }
     }
 
   } /* namespace zeromq */
diff --git a/gr-zeromq/lib/pull_msg_source_impl.h 
b/gr-zeromq/lib/pull_msg_source_impl.h
index 9ff89ef..6d8791d 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.h
+++ b/gr-zeromq/lib/pull_msg_source_impl.h
@@ -35,19 +35,18 @@ namespace gr {
       int             d_timeout; // microseconds, -1 is blocking
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      boost::thread   *d_thread;
+
       void readloop();
-      boost::thread     *d_thread;
 
     public:
+      bool d_finished;
+
       pull_msg_source_impl(char *address, int timeout);
       ~pull_msg_source_impl();
 
       bool start();
       bool stop();
-      bool d_finished;
-      int work(int noutput_items,
-               gr_vector_const_void_star &input_items,
-               gr_vector_void_star &output_items);
     };
 
   } // namespace zeromq



reply via email to

[Prev in Thread] Current Thread [Next in Thread]