commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r4804 - gnuradio/branches/developers/eb/mb/mblock/src/


From: eb
Subject: [Commit-gnuradio] r4804 - gnuradio/branches/developers/eb/mb/mblock/src/lib
Date: Tue, 27 Mar 2007 17:31:46 -0600 (MDT)

Author: eb
Date: 2007-03-27 17:31:45 -0600 (Tue, 27 Mar 2007)
New Revision: 4804

Modified:
   gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc
   gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc
   gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h
   gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc
   gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc
Log:
Added condition var to mb_msg_queue.  Implemented blocking and
non-blocking message dequeuing.


Modified: 
gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc    
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc    
2007-03-27 23:31:45 UTC (rev 4804)
@@ -45,6 +45,4 @@
   mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
   msg->set_port_id(d_port_name);
   d_mb->impl()->msgq().insert(msg);
-
-  // FIXME tell runtime that we're ready to run
 }

Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc   
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc   
2007-03-27 23:31:45 UTC (rev 4804)
@@ -25,9 +25,9 @@
 #include <mb_msg_queue.h>
 #include <mb_message.h>
 
-// FIXME turn this into a template so we can use it for the runq of mblocks too
 
 mb_msg_queue::mb_msg_queue()
+  : d_not_empty(&d_mutex)
 {
 }
 
@@ -51,14 +51,21 @@
     d_queue[q].tail = msg;
     msg->d_next.reset();       // msg->d_next = 0;
   }
+
   // FIXME set bit in bitmap
+
+  d_not_empty.signal();
 }
 
+/*
+ * Delete highest pri message from the queue and return it.
+ * Returns equivalent of zero pointer if queue is empty.
+ *
+ * Caller must be holding d_mutex
+ */
 mb_message_sptr
-mb_msg_queue::get_highest_pri_msg()
+mb_msg_queue::get_highest_pri_msg_helper()
 {
-  omni_mutex_lock      l(d_mutex);
-
   // FIXME use bitmap and ffz to find best queue in O(1)
 
   for (mb_pri_t q = 0; q <= MB_PRI_WORST; q++){
@@ -78,3 +85,27 @@
 
   return mb_message_sptr();    // equivalent of a zero pointer
 }
+
+
+mb_message_sptr
+mb_msg_queue::get_highest_pri_msg_nowait()
+{
+  omni_mutex_lock      l(d_mutex);
+
+  return get_highest_pri_msg_helper();
+}
+
+mb_message_sptr
+mb_msg_queue::get_highest_pri_msg()
+{
+  omni_mutex_lock l(d_mutex);
+
+  while (1){
+    mb_message_sptr msg = get_highest_pri_msg_helper();
+    if (msg)                   // Got one; return it
+      return msg;
+
+    d_not_empty.wait();                // Wait for something
+  }
+}
+

Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h    
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h    
2007-03-27 23:31:45 UTC (rev 4804)
@@ -37,11 +37,14 @@
     bool empty_p() const { return head == 0; }
   };
 
-  omni_mutex   d_mutex;
+  omni_mutex    d_mutex;
+  omni_condition d_not_empty;  // reader waits on this
 
   // FIXME add bitmap to indicate which queues are non-empty.
-  subq         d_queue[MB_NPRI];
+  subq          d_queue[MB_NPRI];
 
+  mb_message_sptr get_highest_pri_msg_helper();
+
 public:
   mb_msg_queue();
   ~mb_msg_queue();
@@ -53,6 +56,12 @@
    * \brief Delete highest pri message from the queue and return it.
    * Returns equivalent of zero pointer if queue is empty.
    */
+  mb_message_sptr get_highest_pri_msg_nowait();
+
+  /*
+   * \brief Delete highest pri message from the queue and return it.
+   * If the queue is empty, this call blocks until it can return a message.
+   */
   mb_message_sptr get_highest_pri_msg();
 };
 

Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc        
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc        
2007-03-27 23:31:45 UTC (rev 4804)
@@ -330,7 +330,7 @@
   mb_msg_queue q;
 
   // check initial state
-  CPPUNIT_ASSERT(q.get_highest_pri_msg() == 0);
+  CPPUNIT_ASSERT(q.get_highest_pri_msg_nowait() == 0);
 
   CPPUNIT_ASSERT(MB_NPRI >= 5);        // sanity check for this test
 
@@ -340,11 +340,11 @@
   q.insert(mb_make_message(PMT_NIL, pmt_from_long(1), PMT_NIL, MB_PRI_BEST + 
2));
   q.insert(mb_make_message(PMT_NIL, pmt_from_long(2), PMT_NIL, MB_PRI_BEST + 
2));
   
-  CPPUNIT_ASSERT_EQUAL(0L, pmt_to_long(q.get_highest_pri_msg()->data()));
-  CPPUNIT_ASSERT_EQUAL(1L, pmt_to_long(q.get_highest_pri_msg()->data()));
-  CPPUNIT_ASSERT_EQUAL(2L, pmt_to_long(q.get_highest_pri_msg()->data()));
+  CPPUNIT_ASSERT_EQUAL(0L, 
pmt_to_long(q.get_highest_pri_msg_nowait()->data()));
+  CPPUNIT_ASSERT_EQUAL(1L, 
pmt_to_long(q.get_highest_pri_msg_nowait()->data()));
+  CPPUNIT_ASSERT_EQUAL(2L, 
pmt_to_long(q.get_highest_pri_msg_nowait()->data()));
 
-  CPPUNIT_ASSERT(q.get_highest_pri_msg() == 0);
+  CPPUNIT_ASSERT(q.get_highest_pri_msg_nowait() == 0);
 
 
   // insert messages of different priorities in pseudo-random order
@@ -361,19 +361,19 @@
   q.insert(mb_make_message(PMT_NIL, PMT_NIL, PMT_NIL, MB_PRI_BEST + 1));
 
   // confirm that they come out in order
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4, q.get_highest_pri_msg()->priority());
-  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4, q.get_highest_pri_msg()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4, 
q.get_highest_pri_msg_nowait()->priority());
+  CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4, 
q.get_highest_pri_msg_nowait()->priority());
   
   // check final state
-  CPPUNIT_ASSERT(q.get_highest_pri_msg() == 0);
+  CPPUNIT_ASSERT(q.get_highest_pri_msg_nowait() == 0);
 }
 
 ////////////////////////////////////////////////////////////////
@@ -397,10 +397,10 @@
 
   pmt_t cs = pmt_intern("cs");
 
-  mb_message_sptr msg = mb->impl()->msgq().get_highest_pri_msg();
+  mb_message_sptr msg = mb->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(pmt_eq(cs, msg->port_id()));        // confirm that port_id 
is set
   CPPUNIT_ASSERT_EQUAL(0L, pmt_to_long(msg->data())); // and that data is 
correct
 
-  CPPUNIT_ASSERT_EQUAL(1L, 
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg()->data()));
-  CPPUNIT_ASSERT_EQUAL(2L, 
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg()->data()));
+  CPPUNIT_ASSERT_EQUAL(1L, 
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg_nowait()->data()));
+  CPPUNIT_ASSERT_EQUAL(2L, 
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg_nowait()->data()));
 }

Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc 
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc 
2007-03-27 23:31:45 UTC (rev 4804)
@@ -156,14 +156,14 @@
   // Reach into the guts and see if the messages ended up where they should 
have
 
   // mb0 should have received two messages sent from mb1 via its p1
-  msg = mb0->impl()->msgq().get_highest_pri_msg();
+  msg = mb0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p0, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/mb1"), s_p1, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = mb0->impl()->msgq().get_highest_pri_msg();
+  msg = mb0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p0, msg->port_id());
@@ -176,28 +176,28 @@
 
   mb_mblock_sptr mb1 = mb0->impl()->component("mb1");
 
-  msg = mb1->impl()->msgq().get_highest_pri_msg();
+  msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p1, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top"), s_p0, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = mb1->impl()->msgq().get_highest_pri_msg();
+  msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p1, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top"), s_p0, 
pmt_from_long(1)),
                           msg->data()));
 
-  msg = mb1->impl()->msgq().get_highest_pri_msg();
+  msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/mb2"), s_p2, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = mb1->impl()->msgq().get_highest_pri_msg();
+  msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
@@ -210,14 +210,14 @@
 
   mb_mblock_sptr mb2 = mb0->impl()->component("mb2");
 
-  msg = mb2->impl()->msgq().get_highest_pri_msg();
+  msg = mb2->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/mb1"), s_p2, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = mb2->impl()->msgq().get_highest_pri_msg();
+  msg = mb2->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
@@ -335,14 +335,14 @@
   // c0c0 should have received
   //   two message from c1 via its p2
 
-  msg = c0c0->impl()->msgq().get_highest_pri_msg();
+  msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   //std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c1"), s_p1, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = c0c0->impl()->msgq().get_highest_pri_msg();
+  msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   //std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
@@ -352,14 +352,14 @@
   // c1 should have received
   //   two message from c0c0 via its p2
 
-  msg = c1->impl()->msgq().get_highest_pri_msg();
+  msg = c1->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   //std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c0/c0"), s_p1, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = c1->impl()->msgq().get_highest_pri_msg();
+  msg = c1->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   //std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
@@ -417,14 +417,14 @@
   // c0c0 should have received
   //   two message from c1c0 via its p2
 
-  msg = c0c0->impl()->msgq().get_highest_pri_msg();
+  msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c1/c0"), s_p1, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = c0c0->impl()->msgq().get_highest_pri_msg();
+  msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
@@ -434,14 +434,14 @@
   // c1c0 should have received
   //   two message from c0c0 via its p2
 
-  msg = c1c0->impl()->msgq().get_highest_pri_msg();
+  msg = c1c0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
   CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c0/c0"), s_p1, 
pmt_from_long(0)),
                           msg->data()));
 
-  msg = c1c0->impl()->msgq().get_highest_pri_msg();
+  msg = c1c0->impl()->msgq().get_highest_pri_msg_nowait();
   CPPUNIT_ASSERT(msg);
   // std::cerr << msg->data() << std::endl;
   CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());





reply via email to

[Prev in Thread] Current Thread [Next in Thread]