[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r4804 - gnuradio/branches/developers/eb/mb/mblock/src/
From: |
eb |
Subject: |
[Commit-gnuradio] r4804 - gnuradio/branches/developers/eb/mb/mblock/src/lib |
Date: |
Tue, 27 Mar 2007 17:31:46 -0600 (MDT) |
Author: eb
Date: 2007-03-27 17:31:45 -0600 (Tue, 27 Mar 2007)
New Revision: 4804
Modified:
gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc
gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc
gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h
gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc
gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc
Log:
Added condition var to mb_msg_queue. Implemented blocking and
non-blocking message dequeuing.
Modified:
gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_accepter_smp.cc
2007-03-27 23:31:45 UTC (rev 4804)
@@ -45,6 +45,4 @@
mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
msg->set_port_id(d_port_name);
d_mb->impl()->msgq().insert(msg);
-
- // FIXME tell runtime that we're ready to run
}
Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.cc
2007-03-27 23:31:45 UTC (rev 4804)
@@ -25,9 +25,9 @@
#include <mb_msg_queue.h>
#include <mb_message.h>
-// FIXME turn this into a template so we can use it for the runq of mblocks too
mb_msg_queue::mb_msg_queue()
+ : d_not_empty(&d_mutex)
{
}
@@ -51,14 +51,21 @@
d_queue[q].tail = msg;
msg->d_next.reset(); // msg->d_next = 0;
}
+
// FIXME set bit in bitmap
+
+ d_not_empty.signal();
}
+/*
+ * Delete highest pri message from the queue and return it.
+ * Returns equivalent of zero pointer if queue is empty.
+ *
+ * Caller must be holding d_mutex
+ */
mb_message_sptr
-mb_msg_queue::get_highest_pri_msg()
+mb_msg_queue::get_highest_pri_msg_helper()
{
- omni_mutex_lock l(d_mutex);
-
// FIXME use bitmap and ffz to find best queue in O(1)
for (mb_pri_t q = 0; q <= MB_PRI_WORST; q++){
@@ -78,3 +85,27 @@
return mb_message_sptr(); // equivalent of a zero pointer
}
+
+
+mb_message_sptr
+mb_msg_queue::get_highest_pri_msg_nowait()
+{
+ omni_mutex_lock l(d_mutex);
+
+ return get_highest_pri_msg_helper();
+}
+
+mb_message_sptr
+mb_msg_queue::get_highest_pri_msg()
+{
+ omni_mutex_lock l(d_mutex);
+
+ while (1){
+ mb_message_sptr msg = get_highest_pri_msg_helper();
+ if (msg) // Got one; return it
+ return msg;
+
+ d_not_empty.wait(); // Wait for something
+ }
+}
+
Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/mb_msg_queue.h
2007-03-27 23:31:45 UTC (rev 4804)
@@ -37,11 +37,14 @@
bool empty_p() const { return head == 0; }
};
- omni_mutex d_mutex;
+ omni_mutex d_mutex;
+ omni_condition d_not_empty; // reader waits on this
// FIXME add bitmap to indicate which queues are non-empty.
- subq d_queue[MB_NPRI];
+ subq d_queue[MB_NPRI];
+ mb_message_sptr get_highest_pri_msg_helper();
+
public:
mb_msg_queue();
~mb_msg_queue();
@@ -53,6 +56,12 @@
* \brief Delete highest pri message from the queue and return it.
* Returns equivalent of zero pointer if queue is empty.
*/
+ mb_message_sptr get_highest_pri_msg_nowait();
+
+ /*
+ * \brief Delete highest pri message from the queue and return it.
+ * If the queue is empty, this call blocks until it can return a message.
+ */
mb_message_sptr get_highest_pri_msg();
};
Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_prims.cc
2007-03-27 23:31:45 UTC (rev 4804)
@@ -330,7 +330,7 @@
mb_msg_queue q;
// check initial state
- CPPUNIT_ASSERT(q.get_highest_pri_msg() == 0);
+ CPPUNIT_ASSERT(q.get_highest_pri_msg_nowait() == 0);
CPPUNIT_ASSERT(MB_NPRI >= 5); // sanity check for this test
@@ -340,11 +340,11 @@
q.insert(mb_make_message(PMT_NIL, pmt_from_long(1), PMT_NIL, MB_PRI_BEST +
2));
q.insert(mb_make_message(PMT_NIL, pmt_from_long(2), PMT_NIL, MB_PRI_BEST +
2));
- CPPUNIT_ASSERT_EQUAL(0L, pmt_to_long(q.get_highest_pri_msg()->data()));
- CPPUNIT_ASSERT_EQUAL(1L, pmt_to_long(q.get_highest_pri_msg()->data()));
- CPPUNIT_ASSERT_EQUAL(2L, pmt_to_long(q.get_highest_pri_msg()->data()));
+ CPPUNIT_ASSERT_EQUAL(0L,
pmt_to_long(q.get_highest_pri_msg_nowait()->data()));
+ CPPUNIT_ASSERT_EQUAL(1L,
pmt_to_long(q.get_highest_pri_msg_nowait()->data()));
+ CPPUNIT_ASSERT_EQUAL(2L,
pmt_to_long(q.get_highest_pri_msg_nowait()->data()));
- CPPUNIT_ASSERT(q.get_highest_pri_msg() == 0);
+ CPPUNIT_ASSERT(q.get_highest_pri_msg_nowait() == 0);
// insert messages of different priorities in pseudo-random order
@@ -361,19 +361,19 @@
q.insert(mb_make_message(PMT_NIL, PMT_NIL, PMT_NIL, MB_PRI_BEST + 1));
// confirm that they come out in order
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4, q.get_highest_pri_msg()->priority());
- CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4, q.get_highest_pri_msg()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 0,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 1,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 2,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 3,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4,
q.get_highest_pri_msg_nowait()->priority());
+ CPPUNIT_ASSERT_EQUAL(MB_PRI_BEST + 4,
q.get_highest_pri_msg_nowait()->priority());
// check final state
- CPPUNIT_ASSERT(q.get_highest_pri_msg() == 0);
+ CPPUNIT_ASSERT(q.get_highest_pri_msg_nowait() == 0);
}
////////////////////////////////////////////////////////////////
@@ -397,10 +397,10 @@
pmt_t cs = pmt_intern("cs");
- mb_message_sptr msg = mb->impl()->msgq().get_highest_pri_msg();
+ mb_message_sptr msg = mb->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(pmt_eq(cs, msg->port_id())); // confirm that port_id
is set
CPPUNIT_ASSERT_EQUAL(0L, pmt_to_long(msg->data())); // and that data is
correct
- CPPUNIT_ASSERT_EQUAL(1L,
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg()->data()));
- CPPUNIT_ASSERT_EQUAL(2L,
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg()->data()));
+ CPPUNIT_ASSERT_EQUAL(1L,
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg_nowait()->data()));
+ CPPUNIT_ASSERT_EQUAL(2L,
pmt_to_long(mb->impl()->msgq().get_highest_pri_msg_nowait()->data()));
}
Modified: gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc
===================================================================
--- gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc
2007-03-27 23:31:02 UTC (rev 4803)
+++ gnuradio/branches/developers/eb/mb/mblock/src/lib/qa_mblock_send.cc
2007-03-27 23:31:45 UTC (rev 4804)
@@ -156,14 +156,14 @@
// Reach into the guts and see if the messages ended up where they should
have
// mb0 should have received two messages sent from mb1 via its p1
- msg = mb0->impl()->msgq().get_highest_pri_msg();
+ msg = mb0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p0, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/mb1"), s_p1,
pmt_from_long(0)),
msg->data()));
- msg = mb0->impl()->msgq().get_highest_pri_msg();
+ msg = mb0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p0, msg->port_id());
@@ -176,28 +176,28 @@
mb_mblock_sptr mb1 = mb0->impl()->component("mb1");
- msg = mb1->impl()->msgq().get_highest_pri_msg();
+ msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p1, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top"), s_p0,
pmt_from_long(0)),
msg->data()));
- msg = mb1->impl()->msgq().get_highest_pri_msg();
+ msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p1, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top"), s_p0,
pmt_from_long(1)),
msg->data()));
- msg = mb1->impl()->msgq().get_highest_pri_msg();
+ msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/mb2"), s_p2,
pmt_from_long(0)),
msg->data()));
- msg = mb1->impl()->msgq().get_highest_pri_msg();
+ msg = mb1->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
@@ -210,14 +210,14 @@
mb_mblock_sptr mb2 = mb0->impl()->component("mb2");
- msg = mb2->impl()->msgq().get_highest_pri_msg();
+ msg = mb2->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/mb1"), s_p2,
pmt_from_long(0)),
msg->data()));
- msg = mb2->impl()->msgq().get_highest_pri_msg();
+ msg = mb2->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p3, msg->port_id());
@@ -335,14 +335,14 @@
// c0c0 should have received
// two message from c1 via its p2
- msg = c0c0->impl()->msgq().get_highest_pri_msg();
+ msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
//std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c1"), s_p1,
pmt_from_long(0)),
msg->data()));
- msg = c0c0->impl()->msgq().get_highest_pri_msg();
+ msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
//std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
@@ -352,14 +352,14 @@
// c1 should have received
// two message from c0c0 via its p2
- msg = c1->impl()->msgq().get_highest_pri_msg();
+ msg = c1->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
//std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c0/c0"), s_p1,
pmt_from_long(0)),
msg->data()));
- msg = c1->impl()->msgq().get_highest_pri_msg();
+ msg = c1->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
//std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
@@ -417,14 +417,14 @@
// c0c0 should have received
// two message from c1c0 via its p2
- msg = c0c0->impl()->msgq().get_highest_pri_msg();
+ msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c1/c0"), s_p1,
pmt_from_long(0)),
msg->data()));
- msg = c0c0->impl()->msgq().get_highest_pri_msg();
+ msg = c0c0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
@@ -434,14 +434,14 @@
// c1c0 should have received
// two message from c0c0 via its p2
- msg = c1c0->impl()->msgq().get_highest_pri_msg();
+ msg = c1c0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
CPPUNIT_ASSERT(pmt_equal(pmt_list3(pmt_intern("top/c0/c0"), s_p1,
pmt_from_long(0)),
msg->data()));
- msg = c1c0->impl()->msgq().get_highest_pri_msg();
+ msg = c1c0->impl()->msgq().get_highest_pri_msg_nowait();
CPPUNIT_ASSERT(msg);
// std::cerr << msg->data() << std::endl;
CPPUNIT_ASSERT_EQUAL(s_p2, msg->port_id());
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r4804 - gnuradio/branches/developers/eb/mb/mblock/src/lib,
eb <=