[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r8779 - gnuradio/branches/developers/eb/sched-wip/gnur
From: |
eb |
Subject: |
[Commit-gnuradio] r8779 - gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime |
Date: |
Thu, 3 Jul 2008 20:12:21 -0600 (MDT) |
Author: eb
Date: 2008-07-03 20:12:18 -0600 (Thu, 03 Jul 2008)
New Revision: 8779
Added:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
Log:
work-in-progress: gr_block_executor is the generic thread and/or task
per block manager. We should be able to use it with either a
thread-per-block or a TBB-style task implementation.
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
2008-07-04 01:26:58 UTC (rev 8778)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
2008-07-04 02:12:18 UTC (rev 8779)
@@ -35,6 +35,7 @@
gr_flat_flowgraph.cc \
gr_block.cc \
gr_block_detail.cc \
+ gr_block_executor.cc \
gr_hier_block2.cc \
gr_hier_block2_detail.cc \
gr_buffer.cc \
@@ -81,6 +82,7 @@
gr_flat_flowgraph.h \
gr_block.h \
gr_block_detail.h \
+ gr_block_executor.h \
gr_hier_block2.h \
gr_hier_block2_detail.h \
gr_buffer.h \
Copied:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
(from rev 8762,
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc)
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
(rev 0)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
2008-07-04 02:12:18 UTC (rev 8779)
@@ -0,0 +1,314 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+// must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+static int which_scheduler = 0;
+
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m)
+{
+ os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
+ return os;
+}
+
+
+inline static unsigned int
+round_up (unsigned int n, unsigned int multiple)
+{
+ return ((n + multiple - 1) / multiple) * multiple;
+}
+
+inline static unsigned int
+round_down (unsigned int n, unsigned int multiple)
+{
+ return (n / multiple) * multiple;
+}
+
+//
+// Return minimum available write space in all our downstream buffers
+// or -1 if we're output blocked and the output we're blocked
+// on is done.
+//
+static int
+min_available_space (gr_block_detail *d, int output_multiple)
+{
+ int min_space = std::numeric_limits<int>::max();
+
+ for (int i = 0; i < d->noutputs (); i++){
+ int n = round_down (d->output(i)->space_available (), output_multiple);
+ if (n == 0){ // We're blocked on output.
+ if (d->output(i)->done()){ // Downstream is done, therefore we're
done.
+ return -1;
+ }
+ return 0;
+ }
+ min_space = std::min (min_space, n);
+ }
+ return min_space;
+}
+
+
+
+gr_block_executor::gr_block_executor (gr_block_sptr block)
+ : d_block(block), d_log(0)
+{
+ if (ENABLE_LOGGING){
+ char name[100];
+ snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
+ d_log = new std::ofstream(name);
+ *d_log << "gr_block_executor: "
+ << d_block << std::endl;
+ }
+
+ d_block->detail()->set_done(false); // reset done flag
+ d_block->start(); // enable any drivers, etc.
+}
+
+gr_block_executor::~gr_block_executor ()
+{
+ if (ENABLE_LOGGING)
+ delete d_log;
+
+ d_block->stop(); // stop any drivers, etc.
+}
+
+
+bool
+gr_block_executor::run_one_iteration()
+{
+ int noutput_items;
+ int max_items_avail;
+ bool making_progress;
+
+ making_progress = false;
+
+ gr_block *m = d_block.get();
+ gr_block_detail *d = m->detail().get();
+
+ LOG(*d_log << std::endl << m);
+
+ if (d->done())
+ goto next_block;
+
+ if (d->source_p ()){
+ d_ninput_items_required.resize (0);
+ d_ninput_items.resize (0);
+ d_input_items.resize (0);
+ d_output_items.resize (d->noutputs ());
+
+ // determine the minimum available output space
+ noutput_items = min_available_space (d, m->output_multiple ());
+ LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0){ // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ goto next_block;
+ }
+
+ goto setup_call_to_work; // jump to common code
+ }
+
+ else if (d->sink_p ()){
+ d_ninput_items_required.resize (d->ninputs ());
+ d_ninput_items.resize (d->ninputs ());
+ d_input_items.resize (d->ninputs ());
+ d_output_items.resize (0);
+ LOG(*d_log << " sink\n");
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs (); i++){
+ d_ninput_items[i] = d->input(i)->items_available();
+ //if (d_ninput_items[i] == 0 && d->input(i)->done())
+ if (d_ninput_items[i] < m->output_multiple() && d->input(i)->done())
+ goto were_done;
+
+ max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+ }
+
+ // take a swag at how much output we can sink
+ noutput_items = (int) (max_items_avail * m->relative_rate ());
+ noutput_items = round_down (noutput_items, m->output_multiple ());
+ LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
+ LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
+
+ if (noutput_items == 0){ // we're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ goto next_block;
+ }
+
+ goto try_again; // Jump to code shared with regular case.
+ }
+
+ else {
+ // do the regular thing
+ d_ninput_items_required.resize (d->ninputs ());
+ d_ninput_items.resize (d->ninputs ());
+ d_input_items.resize (d->ninputs ());
+ d_output_items.resize (d->noutputs ());
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs (); i++){
+ d_ninput_items[i] = d->input(i)->items_available ();
+ max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+ }
+
+ // determine the minimum available output space
+ noutput_items = min_available_space (d, m->output_multiple ());
+ if (ENABLE_LOGGING){
+ *d_log << " regular ";
+ if (m->relative_rate() >= 1.0)
+ *d_log << "1:" << m->relative_rate() << std::endl;
+ else
+ *d_log << 1.0/m->relative_rate() << ":1\n";
+ *d_log << " max_items_avail = " << max_items_avail << std::endl;
+ *d_log << " noutput_items = " << noutput_items << std::endl;
+ }
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0){ // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ goto next_block;
+ }
+
+#if 0
+ // Compute best estimate of noutput_items that we can really use.
+ noutput_items =
+ std::min ((unsigned) noutput_items,
+ std::max ((unsigned) m->output_multiple(),
+ round_up ((unsigned) (max_items_avail *
m->relative_rate()),
+ m->output_multiple ())));
+
+ LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl);
+#endif
+
+ try_again:
+ if (m->fixed_rate()){
+ // try to work it forward starting with max_items_avail.
+ // We want to try to consume all the input we've got.
+ int reqd_noutput_items =
m->fixed_rate_ninput_to_noutput(max_items_avail);
+ reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+ if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+ noutput_items = reqd_noutput_items;
+ }
+
+ // ask the block how much input they need to produce noutput_items
+ m->forecast (noutput_items, d_ninput_items_required);
+
+ // See if we've got sufficient input available
+
+ int i;
+ for (i = 0; i < d->ninputs (); i++)
+ if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough
+ break;
+
+ if (i < d->ninputs ()){ // not enough input on input[i]
+ // if we can, try reducing the size of our output request
+ if (noutput_items > m->output_multiple ()){
+ noutput_items /= 2;
+ noutput_items = round_up (noutput_items, m->output_multiple ());
+ goto try_again;
+ }
+
+ // We're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ if (d->input(i)->done()) // If the upstream block is done, we're done
+ goto were_done;
+
+ // Is it possible to ever fulfill this request?
+ if (d_ninput_items_required[i] >
d->input(i)->max_possible_items_available ()){
+ // Nope, never going to happen...
+ std::cerr << "\nsched: <gr_block " << m->name()
+ << " (" << m->unique_id() << ")>"
+ << " is requesting more input data\n"
+ << " than we can provide.\n"
+ << " ninput_items_required = "
+ << d_ninput_items_required[i] << "\n"
+ << " max_possible_items_available = "
+ << d->input(i)->max_possible_items_available() << "\n"
+ << " If this is a filter, consider reducing the number of
taps.\n";
+ goto were_done;
+ }
+
+ goto next_block;
+ }
+
+ // We've got enough data on each input to produce noutput_items.
+ // Finish setting up the call to work.
+
+ for (int i = 0; i < d->ninputs (); i++)
+ d_input_items[i] = d->input(i)->read_pointer();
+
+ setup_call_to_work:
+
+ for (int i = 0; i < d->noutputs (); i++)
+ d_output_items[i] = d->output(i)->write_pointer();
+
+ // Do the actual work of the block
+ int n = m->general_work (noutput_items, d_ninput_items,
+ d_input_items, d_output_items);
+ LOG(*d_log << " general_work: noutput_items = " << noutput_items
+ << " result = " << n << std::endl);
+
+ if (n == -1) // block is done
+ goto were_done;
+
+ d->produce_each (n); // advance write pointers
+ if (n > 0)
+ making_progress = true;
+
+ goto next_block;
+ }
+ assert (0);
+
+ were_done:
+ LOG(*d_log << " were_done\n");
+ d->set_done (true);
+
+ next_block:
+ return making_progress;
+}
Copied:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
(from rev 8747,
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.h)
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
(rev 0)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
2008-07-04 02:12:18 UTC (rev 8779)
@@ -0,0 +1,61 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_BLOCK_EXECUTOR_H
+#define INCLUDED_GR_BLOCK_EXECUTOR_H
+
+#include <gr_runtime_types.h>
+#include <fstream>
+
+//class gr_block_executor;
+//typedef boost::shared_ptr<gr_block_executor> gr_block_executor_sptr;
+
+
+/*!
+ * \brief Manage the execution of a single block.
+ * \ingroup internal
+ */
+
+class gr_block_executor {
+protected:
+ gr_block_sptr d_block; // The block we're
trying to run
+ std::ofstream *d_log;
+
+ // These are allocated here so we don't have to on each iteration
+
+ gr_vector_int d_ninput_items_required;
+ gr_vector_int d_ninput_items;
+ gr_vector_const_void_star d_input_items;
+ gr_vector_void_star d_output_items;
+
+ public:
+ gr_block_executor(gr_block_sptr block);
+ ~gr_block_executor ();
+
+ /*
+ * \brief Run one iteration.
+ * \returns true if progress was made, else false.
+ */
+ bool run_one_iteration();
+};
+
+#endif /* INCLUDED_GR_BLOCK_EXECUTOR_H */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r8779 - gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime,
eb <=