commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 02/03: gr-blocks: stream_mux add tag_propag


From: git
Subject: [Commit-gnuradio] [gnuradio] 02/03: gr-blocks: stream_mux add tag_propagation qa
Date: Thu, 28 Apr 2016 00:13:12 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch maint
in repository gnuradio.

commit d7773175342aba3f559fb69801d085737c25463b
Author: Andrej Rode <address@hidden>
Date:   Tue Apr 26 11:31:48 2016 +0200

    gr-blocks: stream_mux add tag_propagation qa
---
 gr-blocks/python/blocks/qa_stream_mux.py | 60 +++++++++++++++++++++++++++++++-
 1 file changed, 59 insertions(+), 1 deletion(-)

diff --git a/gr-blocks/python/blocks/qa_stream_mux.py 
b/gr-blocks/python/blocks/qa_stream_mux.py
index 00e32e9..3b470af 100755
--- a/gr-blocks/python/blocks/qa_stream_mux.py
+++ b/gr-blocks/python/blocks/qa_stream_mux.py
@@ -21,7 +21,7 @@
 #
 
 from gnuradio import gr, gr_unittest, blocks
-
+import pmt
 import os
 
 class test_stream_mux (gr_unittest.TestCase):
@@ -67,6 +67,36 @@ class test_stream_mux (gr_unittest.TestCase):
 
         return dst.data ()
 
+    def help_stream_tag_propagation(self, N, stream_sizes):
+        src_data1           = stream_sizes[0]*N*[1,]
+        src_data2           = stream_sizes[1]*N*[2,]
+        src_data3           = stream_sizes[2]*N*[3,]
+    # stream_mux scheme (3,2,4)
+        src1 = blocks.vector_source_f(src_data1)
+        src2 = blocks.vector_source_f(src_data2)
+        src3 = blocks.vector_source_f(src_data3)
+        tag_stream1 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1,
+                                                     stream_sizes[0], 'src1')
+        tag_stream2 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1,
+                                                     stream_sizes[1], 'src2')
+        tag_stream3 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1,
+                                                     stream_sizes[2], 'src3')
+
+        mux = blocks.stream_mux(gr.sizeof_float, stream_sizes)
+        dst = blocks.vector_sink_f()
+
+        self.tb.connect(src1, tag_stream1)
+        self.tb.connect(src2, tag_stream2)
+        self.tb.connect(src3, tag_stream3)
+        self.tb.connect(tag_stream1, (mux,0))
+        self.tb.connect(tag_stream2, (mux,1))
+        self.tb.connect(tag_stream3, (mux,2))
+        self.tb.connect(mux, dst)
+        self.tb.run()
+
+        return (dst.data (), dst.tags ())
+
+
     def test_stream_2NN_ff(self):
         N = 40
         stream_sizes = [10, 10]
@@ -181,5 +211,33 @@ class test_stream_mux (gr_unittest.TestCase):
         self.tb.run ()
         self.assertEqual (r1 + r2, dst.data())
 
+    def test_tag_propagation(self):
+        N = 10 # Block length
+        stream_sizes = [1,2,3]
+
+        expected_result     = N*(stream_sizes[0]*[1,]
+                                 +stream_sizes[1]*[2,]
+                                 +stream_sizes[2]*[3,])
+        # check the data
+        (result, tags) = self.help_stream_tag_propagation(N, stream_sizes)
+        self.assertFloatTuplesAlmostEqual(expected_result, result, places=6)
+
+        # check the tags
+        expected_tag_offsets_src1 = [sum(stream_sizes)*i for i in range(N)]
+        expected_tag_offsets_src2 = [stream_sizes[0]
+                                     +sum(stream_sizes)*i for i in range(N)]
+        expected_tag_offsets_src3 = [stream_sizes[0]+stream_sizes[1]
+                                     +sum(stream_sizes)*i for i in range(N)]
+        tags_src1 = [tag for tag in tags if pmt.eq(tag.key, 
pmt.intern('src1'))]
+        tags_src2 = [tag for tag in tags if pmt.eq(tag.key, 
pmt.intern('src2'))]
+        tags_src3 = [tag for tag in tags if pmt.eq(tag.key, 
pmt.intern('src3'))]
+
+        for i in range(len(expected_tag_offsets_src1)):
+            self.assertTrue(expected_tag_offsets_src1[i] == 
tags_src1[i].offset)
+        for i in range(len(expected_tag_offsets_src2)):
+            self.assertTrue(expected_tag_offsets_src2[i] == 
tags_src2[i].offset)
+        for i in range(len(expected_tag_offsets_src3)):
+            self.assertTrue(expected_tag_offsets_src3[i] == 
tags_src3[i].offset)
+
 if __name__ == '__main__':
     gr_unittest.run(test_stream_mux, "test_stream_mux.xml")



reply via email to

[Prev in Thread] Current Thread [Next in Thread]