commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup


From: git
Subject: [Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup
Date: Tue, 13 Jan 2015 01:04:29 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 0b65c1aa92356c9b3d848718b232176b96fda30e
Author: Johnathan Corgan <address@hidden>
Date:   Mon Jan 12 16:51:39 2015 -0800

    zeromq: minor cleanup
---
 gr-zeromq/lib/pull_source_impl.cc | 22 +++++++++++-----------
 gr-zeromq/lib/push_sink_impl.cc   | 36 ++++++++++++++++++++----------------
 gr-zeromq/lib/rep_sink_impl.cc    | 19 +++++++++++--------
 gr-zeromq/lib/req_source_impl.cc  | 21 ++++++++++++---------
 gr-zeromq/lib/sub_source_impl.cc  | 25 +++++++++++++------------
 5 files changed, 67 insertions(+), 56 deletions(-)

diff --git a/gr-zeromq/lib/pull_source_impl.cc 
b/gr-zeromq/lib/pull_source_impl.cc
index 87b330a..3215096 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
       d_socket->connect (address);
@@ -86,25 +89,22 @@ namespace gr {
         // check header for tags...
         std::string buf(static_cast<char*>(msg.data()), msg.size());
         if(d_pass_tags){
-            uint64_t rcv_offset;
-            std::vector<gr::tag_t> tags;
-            buf = parse_tag_header(buf, rcv_offset, tags);
-            for(size_t i=0; i<tags.size(); i++){
-                tags[i].offset -= rcv_offset - nitems_written(0);
-                add_item_tag(0, tags[i]);
-                }
-            }
-
+          uint64_t rcv_offset;
+          std::vector<gr::tag_t> tags;
+          buf = parse_tag_header(buf, rcv_offset, tags);
+          for(size_t i=0; i<tags.size(); i++){
+            tags[i].offset -= rcv_offset - nitems_written(0);
+            add_item_tag(0, tags[i]);
+          }
+        }
 
         // Copy to ouput buffer and return
         if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
           memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
-
           return noutput_items;
         }
         else {
           memcpy(out, (void *)&buf[0], buf.size());
-
           return buf.size()/(d_itemsize*d_vlen);
         }
       }
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 4cc9ab9..677de10 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
       d_socket->bind (address);
@@ -71,32 +74,33 @@ namespace gr {
       const char *in = (const char *) input_items[0];
 
       zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
-      zmq::poll (&itemsout[0], 1, d_timeout);
+      zmq::poll(&itemsout[0], 1, d_timeout);
 
       //  If we got a reply, process
       if (itemsout[0].revents & ZMQ_POLLOUT) {
 
-      // encode the current offset, # tags, and tags into header
-      std::string header("");
-    
-      if(d_pass_tags){
-        uint64_t offset = nitems_read(0);
-        std::vector<gr::tag_t> tags;
-        get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
-        header = gen_tag_header( offset, tags );
+        // encode the current offset, # tags, and tags into header
+        std::string header("");
+
+        if(d_pass_tags){
+          uint64_t offset = nitems_read(0);
+          std::vector<gr::tag_t> tags;
+          get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
+          header = gen_tag_header(offset, tags);
         }
 
-      // create message copy and send
-      zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
-      if(d_pass_tags)
-        memcpy((void*) msg.data(), header.c_str(), header.length() );
-      memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*noutput_items);
-      d_socket->send(msg);
+        // create message copy and send
+        zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+
+        if(d_pass_tags)
+          memcpy((void*) msg.data(), header.c_str(), header.length());
+        memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*noutput_items);
 
+        d_socket->send(msg);
         return noutput_items;
       }
       else {
-        return 0;
+        return 0; // FIXME: when scheduler supports return blocking
       }
     }
 
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 88ed6c1..85f9a78 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
       d_socket->bind (address);
@@ -71,7 +74,7 @@ namespace gr {
       const char *in = (const char *) input_items[0];
 
       zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&items[0], 1, d_timeout);
+      zmq::poll(&items[0], 1, d_timeout);
 
       //  If we got a reply, process
       if (items[0].revents & ZMQ_POLLIN) {
@@ -80,20 +83,20 @@ namespace gr {
         d_socket->recv(&request);
         int req_output_items = *(static_cast<int*>(request.data()));
         int nitems_send = std::min(noutput_items, req_output_items);
-        
+
         // encode the current offset, # tags, and tags into header
         std::string header("");
         if(d_pass_tags){
-            uint64_t offset = nitems_read(0);
-            std::vector<gr::tag_t> tags;
-            get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
-            header = gen_tag_header( offset, tags );
-            }
+          uint64_t offset = nitems_read(0);
+          std::vector<gr::tag_t> tags;
+          get_tags_in_range(tags, 0, nitems_read(0), 
nitems_read(0)+noutput_items);
+          header = gen_tag_header( offset, tags );
+        }
 
         // create message copy and send
         zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
         if(d_pass_tags)
-            memcpy((void*) msg.data(), header.c_str(), header.length() );
+          memcpy((void*) msg.data(), header.c_str(), header.length() );
         memcpy((uint8_t *)msg.data() + header.length(), in, 
d_itemsize*d_vlen*nitems_send);
         d_socket->send(msg);
 
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 5c5071e..f69d447 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
+
       int time = 0;
       d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
       d_socket->connect (address);
@@ -82,7 +85,7 @@ namespace gr {
       }
 
       zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&itemsin[0], 1, d_timeout);
+      zmq::poll(&itemsin[0], 1, d_timeout);
 
       //  If we got a reply, process
       if (itemsin[0].revents & ZMQ_POLLIN) {
@@ -94,14 +97,14 @@ namespace gr {
         std::string buf(static_cast<char*>(reply.data()), reply.size());
 
         if(d_pass_tags){
-            uint64_t rcv_offset;
-            std::vector<gr::tag_t> tags;
-            buf = parse_tag_header(buf, rcv_offset, tags);
-            for(size_t i=0; i<tags.size(); i++){
-                tags[i].offset -= rcv_offset - nitems_written(0);
-                add_item_tag(0, tags[i]);
-                }
-            }
+          uint64_t rcv_offset;
+          std::vector<gr::tag_t> tags;
+          buf = parse_tag_header(buf, rcv_offset, tags);
+          for(size_t i=0; i<tags.size(); i++){
+            tags[i].offset -= rcv_offset - nitems_written(0);
+            add_item_tag(0, tags[i]);
+          }
+        }
 
 
         // Copy to ouput buffer and return
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 813ff5a..1242688 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -46,12 +46,14 @@ namespace gr {
     {
       int major, minor, patch;
       zmq::version (&major, &minor, &patch);
+
       if (major < 3) {
         d_timeout = timeout*1000;
       }
+
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
-      //int time = 0;
+
       d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
       d_socket->connect (address);
     }
@@ -87,17 +89,16 @@ namespace gr {
         std::string buf(static_cast<char*>(msg.data()), msg.size());
 
         if(d_pass_tags){
-            uint64_t rcv_offset;
-            std::vector<gr::tag_t> tags;
-            //int olen = buf.size();
-            buf = parse_tag_header(buf, rcv_offset, tags);
-            //std::cout << "SUB: Header Len = " << olen - buf.size() << ", 
data len = " << buf.size() << "\n";
-            for(size_t i=0; i<tags.size(); i++){
-                //std::cout << "add item tag ... (offset = " << tags[i].offset 
<< " rcv_offset = " << rcv_offset << " nitems_read(0) = " << nitems_written(0) 
<< "\n";
-                tags[i].offset -= rcv_offset - nitems_written(0);
-                add_item_tag(0, tags[i]);
-                }
-            }
+          uint64_t rcv_offset;
+          std::vector<gr::tag_t> tags;
+
+          buf = parse_tag_header(buf, rcv_offset, tags);
+
+          for(size_t i=0; i<tags.size(); i++) {
+            tags[i].offset -= rcv_offset - nitems_written(0);
+            add_item_tag(0, tags[i]);
+          }
+        }
 
         // Copy to ouput buffer and return
         if (buf.size() >= d_itemsize*d_vlen*noutput_items) {



reply via email to

[Prev in Thread] Current Thread [Next in Thread]