commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be worki


From: git
Subject: [Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be working
Date: Tue, 13 Jan 2015 01:04:28 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 29bd7ae09383372afddcbab22fcd99b2333e4c1e
Author: Tim O'Shea <address@hidden>
Date:   Tue Dec 30 18:07:52 2014 +0100

    zmq: pull_msg_source should be working
---
 gr-zeromq/lib/pull_msg_source_impl.cc | 84 +++++++++++++++++------------------
 gr-zeromq/lib/pull_msg_source_impl.h  |  5 +++
 2 files changed, 46 insertions(+), 43 deletions(-)

diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc 
b/gr-zeromq/lib/pull_msg_source_impl.cc
index 5b207ce..0c848fc 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.cc
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -65,55 +65,53 @@ namespace gr {
       delete d_context;
     }
 
+    bool pull_msg_source_impl::start(){
+      d_finished = false;
+      d_thread = new boost::thread( boost::bind( 
&pull_msg_source_impl::readloop , this ) );
+      return true;
+    }
+
+    bool pull_msg_source_impl::stop(){
+      d_finished = true;
+      d_thread->join(); 
+      return true;
+    }
+
+    void pull_msg_source_impl::readloop(){
+      while(!d_finished){
+        //std::cout << "readloop\n";
+      
+        zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+        zmq::poll (&items[0], 1, d_timeout);
+
+        //  If we got a reply, process
+        if (items[0].revents & ZMQ_POLLIN) {
+
+            // Receive data
+            zmq::message_t msg;
+            d_socket->recv(&msg);
+        
+            //std::cout << "got msg...\n";
+
+            std::string buf(static_cast<char*>(msg.data()), msg.size());
+            std::stringbuf sb(buf);
+            pmt::pmt_t m = pmt::deserialize(sb);
+            //std::cout << m << "\n";
+            message_port_pub(pmt::mp("out"), m);
+
+          } else {
+            usleep(100);
+          }
+        } 
+    }
+
+
     int
     pull_msg_source_impl::work(int noutput_items,
                            gr_vector_const_void_star &input_items,
                            gr_vector_void_star &output_items)
     {
     return noutput_items;
-    /*
-      char *out = (char*)output_items[0];
-
-      zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&items[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (items[0].revents & ZMQ_POLLIN) {
-
-        // Receive data
-        zmq::message_t msg;
-        d_socket->recv(&msg);
-
-        // check header for tags...
-        std::string buf(static_cast<char*>(msg.data()), msg.size());
-        if(d_pass_tags){
-            uint64_t rcv_offset;
-            std::vector<gr::tag_t> tags;
-            buf = parse_tag_header(buf, rcv_offset, tags);
-            for(size_t i=0; i<tags.size(); i++){
-                tags[i].offset -= rcv_offset - nitems_written(0);
-                add_item_tag(0, tags[i]);
-                }
-            }
-
-
-        // Copy to ouput buffer and return
-        if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
-          memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
-
-          return noutput_items;
-        }
-        else {
-          memcpy(out, (void *)&buf[0], buf.size());
-
-          return buf.size()/(d_itemsize*d_vlen);
-        }
-      }
-      else {
-        return 0; // FIXME: someday when the scheduler does all the 
poll/selects
-      }
-
-    */
     }
 
   } /* namespace zeromq */
diff --git a/gr-zeromq/lib/pull_msg_source_impl.h 
b/gr-zeromq/lib/pull_msg_source_impl.h
index fb9237c..9ff89ef 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.h
+++ b/gr-zeromq/lib/pull_msg_source_impl.h
@@ -35,11 +35,16 @@ namespace gr {
       int             d_timeout; // microseconds, -1 is blocking
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      void readloop();
+      boost::thread     *d_thread;
 
     public:
       pull_msg_source_impl(char *address, int timeout);
       ~pull_msg_source_impl();
 
+      bool start();
+      bool stop();
+      bool d_finished;
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
                gr_vector_void_star &output_items);



reply via email to

[Prev in Thread] Current Thread [Next in Thread]