[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r4911 - in gnuradio/branches/developers/eb/ibu: mblock
From: |
eb |
Subject: |
[Commit-gnuradio] r4911 - in gnuradio/branches/developers/eb/ibu: mblock/src/lib omnithread |
Date: |
Fri, 6 Apr 2007 23:49:56 -0600 (MDT) |
Author: eb
Date: 2007-04-06 23:49:56 -0600 (Fri, 06 Apr 2007)
New Revision: 4911
Modified:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h
gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h
Log:
work-in-progress on mblocks. Currently broken. Need to
rethink/rework calls of initial_transitions.
Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc
2007-04-07 05:49:56 UTC (rev 4911)
@@ -91,6 +91,12 @@
{
}
+mbe_mblock_failed::mbe_mblock_failed(mb_mblock *mb,
+ const std::string &msg)
+ : mbe_base(mb, "Message block failed: " + msg)
+{
+}
+
mbe_terminate::mbe_terminate()
{
}
Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h
2007-04-07 05:49:56 UTC (rev 4911)
@@ -93,6 +93,14 @@
const std::string &port_name);
};
+class mbe_mblock_failed : public mbe_base
+{
+public:
+ mbe_mblock_failed(mb_mblock *, const std::string &msg);
+};
+
+
+
// not derived from mbe_base to simplify try/catch
class mbe_terminate
{
Modified:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
===================================================================
---
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
2007-04-07 03:06:38 UTC (rev 4910)
+++
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
2007-04-07 05:49:56 UTC (rev 4911)
@@ -27,11 +27,10 @@
#include <mb_mblock_impl.h>
#include <mb_class_registry.h>
#include <mb_exception.h>
+#include <mb_worker.h>
#include <omnithread.h>
-
-
mb_runtime_thread_per_block::mb_runtime_thread_per_block()
: d_runtime_cond(&d_mutex)
{
@@ -40,45 +39,89 @@
mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
{
- // nop for now
+ // FIXME iterate over workers and ensure that they are dead.
}
-
bool
mb_runtime_thread_per_block::run(const std::string &instance_name,
- const std::string &class_name,
- pmt_t user_arg)
+ const std::string &class_name,
+ pmt_t user_arg)
{
- class initial_visitor : public mb_visitor
+ // Create the top-level component, and recursively all of its
+ // subcomponents.
+ mb_mblock_sptr top = create_component(instance_name, class_name, user_arg);
+
+ // FIXME! Rethink when to run initial_transitions.
+
+ // Now tell them all to run their initial_transitions
{
- public:
- bool operator()(mb_mblock *mblock)
- {
- mblock->initial_transition();
- return true;
- }
- };
+ omni_mutex_lock l1(d_mutex); // lock runtime first...
- initial_visitor visitor;
+ for (worker_iter_t w = d_workers.begin(); w != d_workers.end(); ++w){
+ omni_mutex_lock l2((*w)->d_mutex); // ...then worker.
+ switch ((*w)->d_state){
- mb_mblock_sptr top = create_component(instance_name, class_name, user_arg);
+ case mb_worker::TS_CONSTRUCTED: // expected case
+ (*w)->d_state = mb_worker::TS_RUN_INITIAL;
+ (*w)->d_state_cond.broadcast();
+ break;
- // FIXME wait for barrier, then ask each mblock to run its initial_transition
+ default:
+ // FIXME...
+ break;
+ }
+ }
+ }
return true;
}
//
-// FIXME create the thread, then create the component in the thread
+// Create the thread, then create the component in the thread.
+// Return a pointer to the created mblock.
//
mb_mblock_sptr
mb_runtime_thread_per_block::create_component(const std::string &instance_name,
- const std::string &class_name,
- pmt_t user_arg)
+ const std::string &class_name,
+ pmt_t user_arg)
{
mb_mblock_maker_t maker;
if (!mb_class_registry::lookup_maker(class_name, &maker))
throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")");
- return maker(this, instance_name, user_arg);
+ // FIXME here's where we'd lookup NUMA placement requests & mblock
+ // priorities and communicate them to the worker we're creating...
+
+ // Create the worker thread
+ mb_worker_sptr w =
+ mb_worker_sptr(new mb_worker(this, maker, instance_name, user_arg));
+
+ w->start_undetached(); // start it
+
+ // Wait for it to reach TS_CONSTRUCTED or TS_DEAD
+ {
+ omni_mutex_lock l(w->d_mutex);
+ while (!(w->d_state != mb_worker::TS_CONSTRUCTED
+ || w->d_state != mb_worker::TS_DEAD))
+ w->d_state_cond.wait();
+ }
+
+ if (w->d_state == mb_worker::TS_DEAD){ // failed to init
+ void *ignore;
+ w->join(&ignore); // reap it now
+
+ // FIXME with some work we ought to be able to propagate the
+ // exception from the worker.
+ throw mbe_mblock_failed(0, instance_name);
+ }
+
+ // The worker has successfully constructed the mblock and
+ // is blocked waiting to be told to run its initial_transition.
+ // Add w to the vector of workers, and return the mblock.
+ {
+ omni_mutex_lock l(d_mutex);
+ d_workers.push_back(w);
+ }
+
+ return w->d_mblock;
}
Modified:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
===================================================================
---
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
2007-04-07 03:06:38 UTC (rev 4910)
+++
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
2007-04-07 05:49:56 UTC (rev 4911)
@@ -18,23 +18,27 @@
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H
-#define INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H
+#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H
+#define INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H
#include <mb_runtime.h>
+#include <mb_worker.h>
/*!
* \brief Concrete runtime that uses a thread per mblock
* \implementation
*
- * This is all implementation details.
+ * These are all implementation details.
*/
class mb_runtime_thread_per_block : public mb_runtime
{
public:
- omni_mutex d_mutex;
- omni_condition d_runtime_cond; // runtime waits here when it's got
nothing to do
+ omni_mutex d_mutex;
+ omni_condition d_runtime_cond; // runtime waits here
+ std::vector<mb_worker_sptr> d_workers;
+ typedef std::vector<mb_worker_sptr>::iterator worker_iter_t;
+
mb_runtime_thread_per_block();
~mb_runtime_thread_per_block();
@@ -49,4 +53,4 @@
pmt_t user_arg);
};
-#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H */
+#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H */
Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
2007-04-07 05:49:56 UTC (rev 4911)
@@ -25,9 +25,25 @@
#include <mb_worker.h>
#include <mb_runtime_thread_per_block.h>
#include <mb_exception.h>
+#include <mb_mblock.h>
+#include <iostream>
+#include <sys/syscall.h> // Move this somewhere else and autoconf
+#include <unistd.h>
+#ifdef SYS_gettid
+int mb_gettid()
+{
+ return syscall(SYS_gettid);
+}
+#else
+int mb_gettid()
+{
+ return -1;
+}
+#endif
+
mb_worker::mb_worker(mb_runtime_thread_per_block *runtime,
mb_mblock_maker_t maker,
const std::string &instance_name,
@@ -35,7 +51,7 @@
: omni_thread((void *) 0, PRIORITY_NORMAL),
d_runtime(runtime), d_maker(maker),
d_instance_name(instance_name), d_user_arg(user_arg),
- d_state_cond(&d_mutex), d_state(TS_UNITIALIZED),
+ d_state_cond(&d_mutex), d_state(TS_UNINITIALIZED),
d_why_dead(RIP_NOT_DEAD_YET)
{
}
@@ -44,11 +60,26 @@
{
}
+void
+mb_worker::set_state(worker_state_t state)
+{
+ omni_mutex_lock l1(d_runtime->d_mutex); // lock runtime first, then
worker
+ omni_mutex_lock l2(d_mutex);
+
+ d_state = state; // update our state
+ d_state_cond.broadcast(); // Notify everybody who cares...
+ d_runtime->d_runtime_cond.broadcast();
+}
+
+
void *
mb_worker::run_undetached(void *ignored)
{
+ // FIXME add pthread_sigmask stuff
+
try {
worker_thread_top_level();
+ d_why_dead = RIP_EXIT;
}
catch (mbe_terminate){
d_why_dead = RIP_TERMINATE;
@@ -61,17 +92,61 @@
d_why_dead = RIP_UNHANDLED_EXCEPTION;
}
- omni_mutex_lock l1(d_runtime->d_mutex); // lock runtime first, then
worker
- omni_mutex_lock l2(d_mutex);
-
- d_state = TS_DEAD; // update our state
- d_state_cond.broadcast(); // Notify anybody who cares...
- d_runtime->d_runtime_cond.broadcast();
-
+ set_state(TS_DEAD);
return 0;
}
void
mb_worker::worker_thread_top_level()
{
+ std::cerr << "worker_thread_top_level (enter):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl
+ << " omnithread id: " << id() << std::endl
+ << " gettid: " << mb_gettid() << std::endl
+ << " getpid: " << getpid() << std::endl;
+
+ try {
+ d_mblock = d_maker(d_runtime, d_instance_name, d_user_arg);
+ }
+ catch (...){
+ d_why_dead = RIP_CTOR_EXCEPTION;
+ throw;
+ }
+
+ // We've got an mblock. Let runtime know we're good so far.
+ set_state(TS_CONSTRUCTED);
+
+
+ std::cerr << "worker_thread_top_level (post-construction):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl;
+
+ // Wait for runtime to change our state to TS_RUN_INITIAL.
+ {
+ omni_mutex_lock l(d_mutex);
+ while (d_state != TS_RUN_INITIAL)
+ d_state_cond.wait();
+ }
+
+ std::cerr << "worker_thread_top_level (got RUN_INITIAL):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl;
+
+ try {
+ d_mblock->initial_transition();
+ }
+ catch (...){
+ d_why_dead = RIP_INIT_EXCEPTION;
+ throw;
+ }
+
+ // initial_transition was OK, set state to TS_RUNNING
+ set_state(TS_RUNNING);
+
+ std::cerr << "worker_thread_top_level (post-initial-transition):" <<
std::endl
+ << " instance_name: " << d_instance_name << std::endl;
+
+ // FIXME run the main_loop here
+ sleep(5);
+
+ std::cerr << "worker_thread_top_level (exit):" << std::endl
+ << " instance_name: " << d_instance_name << std::endl;
}
Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h
2007-04-07 05:49:56 UTC (rev 4911)
@@ -27,6 +27,9 @@
#include <mb_class_registry.h>
+class mb_worker;
+typedef boost::shared_ptr<mb_worker> mb_worker_sptr;
+
class mb_runtime_thread_per_block;
/*!
@@ -38,7 +41,7 @@
public:
//! worker thread states
enum worker_state_t {
- TS_UNITIALIZED, // new, uninitialized
+ TS_UNINITIALIZED, // new, uninitialized
TS_CONSTRUCTED, // mblock was successfully constructed by thread
TS_RUN_INITIAL, // thread should run initial_transition
TS_RUNNING, // normal steady-state condition.
@@ -58,7 +61,7 @@
/*
* Args used by new thread to create mb_mblock
*/
- mb_runtime_thread_per_block *d_runtime;
+ mb_runtime_thread_per_block *d_runtime;
mb_mblock_maker_t d_maker;
std::string d_instance_name;
pmt_t d_user_arg;
@@ -93,6 +96,11 @@
* \brief Invokes the top-level of the new thread (name kind of sucks)
*/
void *run_undetached(void *arg);
+
+private:
+ // Neither d_mutex nor runtime->d_mutex may be held while calling this.
+ // It locks and unlocks them itself.
+ void set_state(worker_state_t state);
};
Modified: gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h 2007-04-07
03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h 2007-04-07
05:49:56 UTC (rev 4911)
@@ -391,11 +391,15 @@
// execute the run() or run_undetached() member functions depending on
// whether start() or start_undetached() is called respectively.
+public:
+
void start_undetached(void);
// can be used with the above constructor in a derived class to cause
// the thread to be undetached. In this case the thread executes the
// run_undetached member function.
+protected:
+
virtual ~omni_thread(void);
// destructor cannot be called by user (except via a derived class).
// Use exit() or cancel() instead. This also means a thread object must
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r4911 - in gnuradio/branches/developers/eb/ibu: mblock/src/lib omnithread,
eb <=