commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 25/29: zeromq: cleanup and made req_msg_sou


From: git
Subject: [Commit-gnuradio] [gnuradio] 25/29: zeromq: cleanup and made req_msg_source derive from gr::block
Date: Tue, 13 Jan 2015 01:04:29 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 5de5b1a87f71bdbed9925f80648756c566c3a91b
Author: Johnathan Corgan <address@hidden>
Date:   Mon Jan 12 16:04:45 2015 -0800

    zeromq: cleanup and made req_msg_source derive from gr::block
---
 gr-zeromq/include/gnuradio/zeromq/req_msg_source.h | 13 ++---
 gr-zeromq/lib/req_msg_source_impl.cc               | 67 ++++++++++------------
 gr-zeromq/lib/req_msg_source_impl.h                |  9 ++-
 3 files changed, 39 insertions(+), 50 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h 
b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
index cf91833..05d80b8 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2013 Free Software Foundation, Inc.
+ * Copyright 2013-2015 Free Software Foundation, Inc.
  *
  * This file is part of GNU Radio.
  *
@@ -24,20 +24,20 @@
 #define INCLUDED_ZEROMQ_REQ_MSG_SOURCE_H
 
 #include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
 
 namespace gr {
   namespace zeromq {
 
     /*!
-     * \brief Receive messages on ZMQ REQ socket and source stream
+     * \brief Receive messages on ZMQ REQ socket output async messages
      * \ingroup zeromq
      *
      * \details
-     * This block will connect to a ZMQ REP socket, then produce all
-     * incoming messages as streaming output.
+     * This block will connect to a ZMQ REP socket, then resend all
+     * incoming messages as asynchronous messages.
      */
-    class ZEROMQ_API req_msg_source : virtual public gr::sync_block
+    class ZEROMQ_API req_msg_source : virtual public gr::block
     {
     public:
       typedef boost::shared_ptr<req_msg_source> sptr;
@@ -45,7 +45,6 @@ namespace gr {
       /*!
        * \brief Return a shared_ptr to a new instance of 
zeromq::req_msg_source.
        *
-       *
        * \param address  ZMQ socket address specifier
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        *
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc 
b/gr-zeromq/lib/req_msg_source_impl.cc
index 2dda152..b30ef26 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -39,18 +39,21 @@ namespace gr {
     }
 
     req_msg_source_impl::req_msg_source_impl(char *address, int timeout)
-      : gr::sync_block("req_msg_source",
-                       gr::io_signature::make(0, 0, 0),
-                       gr::io_signature::make(0, 0, 0)),
-       d_timeout(timeout)
+      : gr::block("req_msg_source",
+                  gr::io_signature::make(0, 0, 0),
+                  gr::io_signature::make(0, 0, 0)),
+        d_timeout(timeout)
     {
       int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
+      zmq::version(&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
       d_socket->connect (address);
@@ -65,67 +68,55 @@ namespace gr {
       delete d_context;
     }
 
-    bool req_msg_source_impl::start(){
+    bool req_msg_source_impl::start()
+    {
       d_finished = false;
-      d_thread = new boost::thread( boost::bind( 
&req_msg_source_impl::readloop , this ) );
+      d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, 
this));
       return true;
     }
 
-    bool req_msg_source_impl::stop(){
+    bool req_msg_source_impl::stop()
+    {
       d_finished = true;
       d_thread->join();
       return true;
     }
 
-    void req_msg_source_impl::readloop(){
+    void req_msg_source_impl::readloop()
+    {
       while(!d_finished){
         //std::cout << "readloop\n";
-  
+
         zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
-        zmq::poll (&itemsout[0], 1, d_timeout);
-  
+        zmq::poll(&itemsout[0], 1, d_timeout);
+
         //  If we got a reply, process
         if (itemsout[0].revents & ZMQ_POLLOUT) {
           // Request data, FIXME non portable?
           int nmsg = 1;
           zmq::message_t request(sizeof(int));
-          memcpy ((void *) request.data (), &nmsg, sizeof(int));
+          memcpy((void *) request.data (), &nmsg, sizeof(int));
           d_socket->send(request);
-          //std::cout << "sent request...\n";
         }
 
         zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
         zmq::poll (&items[0], 1, d_timeout);
-        //std::cout << "rx response...\n";
 
         //  If we got a reply, process
         if (items[0].revents & ZMQ_POLLIN) {
-            //std::cout << "rx response... got data\n";
-
-            // Receive data
-            zmq::message_t msg;
-            d_socket->recv(&msg);
+          // Receive data
+          zmq::message_t msg;
+          d_socket->recv(&msg);
 
-            //std::cout << "got msg...\n";
+          std::string buf(static_cast<char*>(msg.data()), msg.size());
+          std::stringbuf sb(buf);
+          pmt::pmt_t m = pmt::deserialize(sb);
+          message_port_pub(pmt::mp("out"), m);
 
-            std::string buf(static_cast<char*>(msg.data()), msg.size());
-            std::stringbuf sb(buf);
-            pmt::pmt_t m = pmt::deserialize(sb);
-            //std::cout << m << "\n";
-            message_port_pub(pmt::mp("out"), m);
-
-          } else {
-            usleep(100);
-          }
+        } else {
+          usleep(100);
         }
-    }
-
-    int
-    req_msg_source_impl::work(int noutput_items,
-                          gr_vector_const_void_star &input_items,
-                          gr_vector_void_star &output_items)
-    {
-      return noutput_items;
+      }
     }
 
   } /* namespace zeromq */
diff --git a/gr-zeromq/lib/req_msg_source_impl.h 
b/gr-zeromq/lib/req_msg_source_impl.h
index 3a69174..5835dd4 100644
--- a/gr-zeromq/lib/req_msg_source_impl.h
+++ b/gr-zeromq/lib/req_msg_source_impl.h
@@ -35,19 +35,18 @@ namespace gr {
       int             d_timeout;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      boost::thread   *d_thread;
+
       void readloop();
-      boost::thread     *d_thread;
 
     public:
+      bool d_finished;
+
       req_msg_source_impl(char *address, int timeout);
       ~req_msg_source_impl();
 
       bool start();
       bool stop();
-      bool d_finished;
-      int work(int noutput_items,
-               gr_vector_const_void_star &input_items,
-               gr_vector_void_star &output_items);
     };
 
   } // namespace zeromq



reply via email to

[Prev in Thread] Current Thread [Next in Thread]