commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 18/29: zmq: rep/req msg blocks now working


From: git
Subject: [Commit-gnuradio] [gnuradio] 18/29: zmq: rep/req msg blocks now working
Date: Tue, 13 Jan 2015 01:04:28 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 18944a9a761eb7c2256e4ad450b943f09664c410
Author: Tim O'Shea <address@hidden>
Date:   Tue Dec 30 18:42:48 2014 +0100

    zmq: rep/req msg blocks now working
---
 gr-zeromq/lib/rep_msg_sink_impl.cc   | 66 +++++++++++++++---------
 gr-zeromq/lib/rep_msg_sink_impl.h    |  6 ++-
 gr-zeromq/lib/req_msg_source_impl.cc | 99 ++++++++++++++++++++----------------
 gr-zeromq/lib/req_msg_source_impl.h  |  5 ++
 4 files changed, 108 insertions(+), 68 deletions(-)

diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc 
b/gr-zeromq/lib/rep_msg_sink_impl.cc
index 72dd5fb..0a18a8b 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -56,8 +56,8 @@ namespace gr {
       d_socket->bind (address);
 
       message_port_register_in(pmt::mp("in"));
-      set_msg_handler( pmt::mp("in"), 
-        boost::bind(&rep_msg_sink_impl::handler, this, _1));
+//      set_msg_handler( pmt::mp("in"), 
+//        boost::bind(&rep_msg_sink_impl::handler, this, _1));
     }
 
     rep_msg_sink_impl::~rep_msg_sink_impl()
@@ -67,6 +67,19 @@ namespace gr {
       delete d_context;
     }
 
+    bool rep_msg_sink_impl::start(){
+      d_finished = false;
+      d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop 
, this ) );
+      return true;
+    }
+
+    bool rep_msg_sink_impl::stop(){
+      d_finished = true;
+      d_thread->join();
+      return true;
+    }
+
+/*
     void rep_msg_sink_impl::handler(pmt::pmt_t msg){
       std::stringbuf sb("");
       pmt::serialize( msg, sb );
@@ -75,47 +88,54 @@ namespace gr {
       memcpy( zmsg.data(), s.c_str(), s.size() );
       d_socket->send(zmsg);
     }
+*/
 
     int
     rep_msg_sink_impl::work(int noutput_items,
                         gr_vector_const_void_star &input_items,
                         gr_vector_void_star &output_items)
     {
-      return noutput_items;
-/*
-      const char *in = (const char *) input_items[0];
+        return noutput_items;
+    }
 
+    void rep_msg_sink_impl::readloop(){
+
+     while(!d_finished){
+
+      // while we have data, wait for query...
+      while(!empty_p(pmt::mp("in"))){
+
+      //std::cout << "wait for req ...\n";
+      // wait for query...
       zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
       zmq::poll (&items[0], 1, d_timeout);
 
       //  If we got a reply, process
       if (items[0].revents & ZMQ_POLLIN) {
+        //std::cout << "wait for req ... got req\n";
         // receive data request
         zmq::message_t request;
         d_socket->recv(&request);
         int req_output_items = *(static_cast<int*>(request.data()));
-        int nitems_send = std::min(noutput_items, req_output_items);
+        if(req_output_items != 1)
+            throw std::runtime_error("Request was not 1 msg for rep/req 
request!!");
         
-        // encode the current offset, # tags, and tags into header
-        std::string header("");
-        if(d_pass_tags){
-            uint64_t offset = nitems_read(0);
-            std::vector<gr::tag_t> tags;
-            get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
-            header = gen_tag_header( offset, tags );
-            }
-
         // create message copy and send
-        zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
-        if(d_pass_tags)
-            memcpy((void*) msg.data(), header.c_str(), header.length() );
-        memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*nitems_send);
-        d_socket->send(msg);
+        //std::cout << "get pmt in\n";
+        pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
+        std::stringbuf sb("");
+        pmt::serialize( msg, sb );
+        std::string s = sb.str();
+        zmq::message_t zmsg(s.size());
+        memcpy( zmsg.data(), s.c_str(), s.size() );
+        //std::cout << "send pmt zmq\n";
+        d_socket->send(zmsg);
+      } // if req
 
-        return nitems_send;
-      }
+     } // while !empty
+
+     } // while !d_finished
 
-      return 0;*/
     }
   } /* namespace zeromq */
 } /* namespace gr */
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h 
b/gr-zeromq/lib/rep_msg_sink_impl.h
index 40d7969..25bd0e8 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.h
+++ b/gr-zeromq/lib/rep_msg_sink_impl.h
@@ -35,6 +35,9 @@ namespace gr {
       int             d_timeout;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      boost::thread   *d_thread;
+      bool            d_finished;
+      void            readloop();
 
     public:
       rep_msg_sink_impl(char *address, int timeout);
@@ -43,7 +46,8 @@ namespace gr {
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
                gr_vector_void_star &output_items);
-      void handler(pmt::pmt_t msg);
+      bool start();
+      bool stop();
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc 
b/gr-zeromq/lib/req_msg_source_impl.cc
index 92e26e9..2dda152 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -65,56 +65,67 @@ namespace gr {
       delete d_context;
     }
 
+    bool req_msg_source_impl::start(){
+      d_finished = false;
+      d_thread = new boost::thread( boost::bind( 
&req_msg_source_impl::readloop , this ) );
+      return true;
+    }
+
+    bool req_msg_source_impl::stop(){
+      d_finished = true;
+      d_thread->join();
+      return true;
+    }
+
+    void req_msg_source_impl::readloop(){
+      while(!d_finished){
+        //std::cout << "readloop\n";
+  
+        zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
+        zmq::poll (&itemsout[0], 1, d_timeout);
+  
+        //  If we got a reply, process
+        if (itemsout[0].revents & ZMQ_POLLOUT) {
+          // Request data, FIXME non portable?
+          int nmsg = 1;
+          zmq::message_t request(sizeof(int));
+          memcpy ((void *) request.data (), &nmsg, sizeof(int));
+          d_socket->send(request);
+          //std::cout << "sent request...\n";
+        }
+
+        zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+        zmq::poll (&items[0], 1, d_timeout);
+        //std::cout << "rx response...\n";
+
+        //  If we got a reply, process
+        if (items[0].revents & ZMQ_POLLIN) {
+            //std::cout << "rx response... got data\n";
+
+            // Receive data
+            zmq::message_t msg;
+            d_socket->recv(&msg);
+
+            //std::cout << "got msg...\n";
+
+            std::string buf(static_cast<char*>(msg.data()), msg.size());
+            std::stringbuf sb(buf);
+            pmt::pmt_t m = pmt::deserialize(sb);
+            //std::cout << m << "\n";
+            message_port_pub(pmt::mp("out"), m);
+
+          } else {
+            usleep(100);
+          }
+        }
+    }
+
     int
     req_msg_source_impl::work(int noutput_items,
                           gr_vector_const_void_star &input_items,
                           gr_vector_void_star &output_items)
     {
       return noutput_items;
-/*
-      char *out = (char*)output_items[0];
-
-      zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
-      zmq::poll (&itemsout[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (itemsout[0].revents & ZMQ_POLLOUT) {
-        // Request data, FIXME non portable?
-        zmq::message_t request(sizeof(int));
-        memcpy ((void *) request.data (), &noutput_items, sizeof(int));
-        d_socket->send(request);
-      }
-
-      zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&itemsin[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (itemsin[0].revents & ZMQ_POLLIN) {
-        // Receive data
-        zmq::message_t reply;
-        d_socket->recv(&reply);
-
-        // Deserialize header data / tags
-        std::string buf(static_cast<char*>(reply.data()), reply.size());
-
-        if(d_pass_tags){
-            uint64_t rcv_offset;
-            std::vector<gr::tag_t> tags;
-            buf = parse_tag_header(buf, rcv_offset, tags);
-            for(size_t i=0; i<tags.size(); i++){
-                tags[i].offset -= rcv_offset - nitems_written(0);
-                add_item_tag(0, tags[i]);
-                }
-            }
-
-
-        // Copy to ouput buffer and return
-        memcpy(out, (void *)&buf[0], buf.size());
-        return buf.size()/(d_itemsize*d_vlen);
-      }
-
-      return 0;
-        */
     }
 
   } /* namespace zeromq */
diff --git a/gr-zeromq/lib/req_msg_source_impl.h 
b/gr-zeromq/lib/req_msg_source_impl.h
index 635fa45..3a69174 100644
--- a/gr-zeromq/lib/req_msg_source_impl.h
+++ b/gr-zeromq/lib/req_msg_source_impl.h
@@ -35,11 +35,16 @@ namespace gr {
       int             d_timeout;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
+      void readloop();
+      boost::thread     *d_thread;
 
     public:
       req_msg_source_impl(char *address, int timeout);
       ~req_msg_source_impl();
 
+      bool start();
+      bool stop();
+      bool d_finished;
       int work(int noutput_items,
                gr_vector_const_void_star &input_items,
                gr_vector_void_star &output_items);



reply via email to

[Prev in Thread] Current Thread [Next in Thread]