commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 01/04: controlport: cleanup and thread safe


From: git
Subject: [Commit-gnuradio] [gnuradio] 01/04: controlport: cleanup and thread safety.
Date: Thu, 16 Apr 2015 15:08:09 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit e9d9cf5aced8ed80bc71d817c114866babef3942
Author: Nate Goergen <address@hidden>
Date:   Thu Apr 16 10:26:49 2015 -0400

    controlport: cleanup and thread safety.
    
    Cleaned up some of the templates for only the necessary stuff.
    
    Adds mutex lock protections around getters and setters.
---
 gnuradio-runtime/include/gnuradio/rpcbufferedget.h |  3 +-
 .../include/gnuradio/rpcserver_booter_thrift.h     |  3 +-
 .../include/gnuradio/rpcserver_thrift.h            |  3 +
 .../include/gnuradio/thrift_application_base.h     |  7 +-
 .../include/gnuradio/thrift_server_template.h      | 87 ++++++++++------------
 .../controlport/thrift/rpcserver_booter_thrift.cc  | 14 ++--
 .../lib/controlport/thrift/rpcserver_thrift.cc     |  8 ++
 7 files changed, 62 insertions(+), 63 deletions(-)

diff --git a/gnuradio-runtime/include/gnuradio/rpcbufferedget.h 
b/gnuradio-runtime/include/gnuradio/rpcbufferedget.h
index ebd740b..ad05551 100644
--- a/gnuradio-runtime/include/gnuradio/rpcbufferedget.h
+++ b/gnuradio-runtime/include/gnuradio/rpcbufferedget.h
@@ -25,7 +25,6 @@
 
 #include <boost/thread/condition_variable.hpp>
 #include <boost/thread/mutex.hpp>
-#include <stdio.h>
 
 template<typename TdataType>
 class rpcbufferedget {
@@ -49,8 +48,8 @@ public:
   }
 
   TdataType get() {
-    d_data_needed = true;
     boost::mutex::scoped_lock lock(d_buffer_lock);
+    d_data_needed = true;
     d_data_ready.wait(lock);
     return d_buffer;
   }
diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h 
b/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h
index 28900a4..fd1da09 100644
--- a/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h
+++ b/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h
@@ -34,8 +34,7 @@ class rpcserver_booter_thrift
   : public virtual rpcserver_booter_base,
     public virtual thrift_server_template<rpcserver_base,
                                           rpcserver_thrift,
-                                          rpcserver_booter_thrift,
-                                          
boost::shared_ptr<GNURadio::ControlPortIf> >
+                                          rpcserver_booter_thrift>
 {
  public:
   rpcserver_booter_thrift();
diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h 
b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
index 027a9ea..203be66 100644
--- a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
+++ b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
@@ -32,6 +32,7 @@
 #include "thrift/ControlPort.h"
 #include "thrift/gnuradio_types.h"
 #include <boost/format.hpp>
+#include <boost/thread/mutex.hpp>
 
 #define S(x) #x
 #define S_(x) S(x)
@@ -61,6 +62,8 @@ public:
   virtual void shutdown();
 
  private:
+  boost::mutex d_callback_map_lock;
+
   typedef std::map<std::string, configureCallback_t> ConfigureCallbackMap_t;
   ConfigureCallbackMap_t d_setcallbackmap;
 
diff --git a/gnuradio-runtime/include/gnuradio/thrift_application_base.h 
b/gnuradio-runtime/include/gnuradio/thrift_application_base.h
index aa50c55..4af5e88 100644
--- a/gnuradio-runtime/include/gnuradio/thrift_application_base.h
+++ b/gnuradio-runtime/include/gnuradio/thrift_application_base.h
@@ -28,6 +28,7 @@
 #include <gnuradio/prefs.h>
 #include <gnuradio/thread/thread.h>
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/scoped_ptr.hpp>
 
 namespace {
   // Time, in milliseconds, to wait between checks to the Thrift runtime to 
see if
@@ -83,7 +84,7 @@ public:
   /*!
    * Destructor for the application. Since shutdown and cleanup of the
    * runtime is typically custom to a particular booter
-   * implementation, this must be implemented as a specalized function
+   * implementation, this must be implemented as a specialized function
    * for a particular booter. Thus a template implementation is not
    * provided here.
    */
@@ -118,7 +119,7 @@ protected:
   /*!
    * Reference to the Thrift runtime.
    */
-  std::auto_ptr<apache::thrift::server::TServer> d_thriftserver;
+  boost::scoped_ptr<apache::thrift::server::TServer> d_thriftserver;
 
   /*!
    * Max number of attempts when checking the Thrift runtime for
@@ -175,7 +176,7 @@ private:
 
   // Pointer to the structure containing staticly allocated
   // state information for the applicaiton_base singleton.
-  static std::auto_ptr<thrift_application_base_impl > p_impl;
+  static boost::scoped_ptr<thrift_application_base_impl > p_impl;
 
   // Mutex to protect the endpoint string.
   gr::thread::mutex d_lock;
diff --git a/gnuradio-runtime/include/gnuradio/thrift_server_template.h 
b/gnuradio-runtime/include/gnuradio/thrift_server_template.h
index 1e9059d..b8f5448 100644
--- a/gnuradio-runtime/include/gnuradio/thrift_server_template.h
+++ b/gnuradio-runtime/include/gnuradio/thrift_server_template.h
@@ -39,7 +39,7 @@
 
 using namespace apache;
 
-template<typename TserverBase, typename TserverClass, typename TImplClass, 
typename TThriftClass>
+template<typename TserverBase, typename TserverClass, typename TImplClass>
 class thrift_server_template : public thrift_application_base<TserverBase, 
TImplClass>
 {
 public:
@@ -50,9 +50,12 @@ protected:
   TserverBase* i_impl();
   friend class thrift_application_base<TserverBase, TImplClass>;
 
-  TserverBase* d_server;
-
 private:
+  boost::shared_ptr<TserverClass> d_handler;
+  boost::shared_ptr<thrift::TProcessor> d_processor;
+  boost::shared_ptr<thrift::transport::TServerTransport> d_serverTransport;
+  boost::shared_ptr<thrift::transport::TTransportFactory> d_transportFactory;
+  boost::shared_ptr<thrift::protocol::TProtocolFactory> d_protocolFactory;
   /**
    * Custom TransportFactory that allows you to override the default Thrift 
buffer size
    * of 512 bytes.
@@ -76,9 +79,14 @@ private:
   };
 };
 
-template<typename TserverBase, typename TserverClass, typename TImplClass, 
typename TThriftClass>
-thrift_server_template<TserverBase, TserverClass, TImplClass, 
TThriftClass>::thrift_server_template
-(TImplClass* _this) : thrift_application_base<TserverBase, TImplClass>(_this)
+template<typename TserverBase, typename TserverClass, typename TImplClass>
+thrift_server_template<TserverBase, TserverClass, 
TImplClass>::thrift_server_template
+(TImplClass* _this) : thrift_application_base<TserverBase, TImplClass>(_this),
+d_handler(new TserverClass()),
+d_processor(new GNURadio::ControlPortProcessor(d_handler)),
+d_serverTransport(),
+d_transportFactory(),
+d_protocolFactory(new thrift::protocol::TBinaryProtocolFactory())
 {
   gr::logger_ptr logger, debug_logger;
   gr::configure_default_loggers(logger, debug_logger, "controlport");
@@ -87,74 +95,59 @@ thrift_server_template<TserverBase, TserverClass, 
TImplClass, TThriftClass>::thr
   std::string thrift_config_file = 
gr::prefs::singleton()->get_string("ControlPort", "config", "");
 
   if(thrift_config_file.length() > 0) {
-    gr::prefs::singleton()->add_config_file(thrift_config_file);
+      gr::prefs::singleton()->add_config_file(thrift_config_file);
   }
 
   // Collect configuration options from the Thrift config file;
   // defaults if the config file doesn't exist or list the specific
   // options.
   port = static_cast<unsigned int>(gr::prefs::singleton()->get_long("thrift", 
"port",
-    thrift_application_base<TserverBase, TImplClass>::d_default_thrift_port));
+                                                                    
thrift_application_base<TserverBase, TImplClass>::d_default_thrift_port));
   nthreads = static_cast<unsigned 
int>(gr::prefs::singleton()->get_long("thrift", "nthreads",
-    thrift_application_base<TserverBase, 
TImplClass>::d_default_num_thrift_threads));
+                                                                        
thrift_application_base<TserverBase, 
TImplClass>::d_default_num_thrift_threads));
   buffersize = static_cast<unsigned 
int>(gr::prefs::singleton()->get_long("thrift", "buffersize",
-    thrift_application_base<TserverBase, 
TImplClass>::d_default_thrift_buffer_size));
-
-  boost::shared_ptr<TserverClass> handler(new TserverClass());
-
-  boost::shared_ptr<thrift::TProcessor>
-    processor(new GNURadio::ControlPortProcessor(handler));
+                                                                          
thrift_application_base<TserverBase, 
TImplClass>::d_default_thrift_buffer_size));
 
-  boost::shared_ptr<thrift::transport::TServerTransport>
-    serverTransport(new thrift::transport::TServerSocket(port));
+  d_serverTransport.reset(new thrift::transport::TServerSocket(port));
 
-  boost::shared_ptr<thrift::transport::TTransportFactory>
-    transportFactory(new 
thrift_server_template::TBufferedTransportFactory(buffersize));
-
-  boost::shared_ptr<thrift::protocol::TProtocolFactory>
-    protocolFactory(new thrift::protocol::TBinaryProtocolFactory());
+  d_transportFactory.reset(new 
thrift_server_template::TBufferedTransportFactory(buffersize));
 
 
   if(nthreads <= 1) {
-    // "Thrift: Single-threaded server"
-    //std::cout << "Thrift Single-threaded server" << std::endl;
-    thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
-      new thrift::server::TSimpleServer(processor, serverTransport,
-                                        transportFactory, protocolFactory));
+      // "Thrift: Single-threaded server"
+      //std::cout << "Thrift Single-threaded server" << std::endl;
+      thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
+          new thrift::server::TSimpleServer(d_processor, d_serverTransport, 
d_transportFactory, d_protocolFactory));
   }
   else {
-    //std::cout << "Thrift Multi-threaded server : " << nthreads << std::endl;
-    boost::shared_ptr<thrift::concurrency::ThreadManager> threadManager
-      (thrift::concurrency::ThreadManager::newSimpleThreadManager(nthreads));
-
-    boost::shared_ptr<thrift::concurrency::PlatformThreadFactory> threadFactory
-      (boost::shared_ptr<thrift::concurrency::PlatformThreadFactory>
-       (new thrift::concurrency::PlatformThreadFactory()));
+      //std::cout << "Thrift Multi-threaded server : " << d_nthreads << 
std::endl;
+      boost::shared_ptr<thrift::concurrency::ThreadManager> threadManager(
+          
thrift::concurrency::ThreadManager::newSimpleThreadManager(nthreads));
 
-    threadManager->threadFactory(threadFactory);
+      threadManager->threadFactory(
+          boost::shared_ptr<thrift::concurrency::PlatformThreadFactory>(
+              new thrift::concurrency::PlatformThreadFactory()));
 
-    threadManager->start();
+      threadManager->start();
 
-    thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
-      new thrift::server::TThreadPoolServer(processor, serverTransport,
-                                            transportFactory, protocolFactory,
-                                            threadManager));
+      thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
+          new thrift::server::TThreadPoolServer(d_processor, d_serverTransport,
+                                                d_transportFactory, 
d_protocolFactory,
+                                                threadManager));
   }
-
-  d_server = handler.get();
 }
 
-template<typename TserverBase, typename TserverClass, typename TImplClass, 
typename TThriftClass>
-thrift_server_template<TserverBase, TserverClass,TImplClass, 
TThriftClass>::~thrift_server_template()
+template<typename TserverBase, typename TserverClass, typename TImplClass>
+thrift_server_template<TserverBase, 
TserverClass,TImplClass>::~thrift_server_template()
 {
 }
 
-template<typename TserverBase, typename TserverClass, typename TImplClass, 
typename TThriftClass>
-TserverBase* thrift_server_template<TserverBase, TserverClass, TImplClass, 
TThriftClass>::i_impl()
+template<typename TserverBase, typename TserverClass, typename TImplClass>
+TserverBase* thrift_server_template<TserverBase, TserverClass, 
TImplClass>::i_impl()
 {
   //std::cerr << "thrift_server_template: i_impl" << std::endl;
 
-  return d_server;
+  return d_handler.get();
 }
 
 #endif /* THRIFT_SERVER_TEMPLATE_H */
diff --git a/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc 
b/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc
index 40cfe1a..1d6cafe 100644
--- a/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc
+++ b/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc
@@ -42,8 +42,7 @@ namespace {
 rpcserver_booter_thrift::rpcserver_booter_thrift() :
   thrift_server_template<rpcserver_base,
                          rpcserver_thrift,
-                         rpcserver_booter_thrift,
-                         boost::shared_ptr<GNURadio::ControlPortIf> >(this),
+                         rpcserver_booter_thrift>(this),
   d_type(std::string(CONTROL_PORT_CLASS))
 {;}
 
@@ -54,8 +53,7 @@ rpcserver_base*
 rpcserver_booter_thrift::i()
 {
   return thrift_server_template<rpcserver_base, rpcserver_thrift,
-                                rpcserver_booter_thrift,
-                                GNURadio::ControlPortIf>::i();
+                                rpcserver_booter_thrift>::i();
 }
 
 /*!
@@ -66,8 +64,7 @@ const std::vector<std::string>
 rpcserver_booter_thrift::endpoints()
 {
   return thrift_server_template<rpcserver_base, rpcserver_thrift,
-                                rpcserver_booter_thrift,
-                                GNURadio::ControlPortIf>::endpoints();
+                                rpcserver_booter_thrift>::endpoints();
 }
 
 // Specialized thrift_application_base attributes and functions
@@ -87,14 +84,13 @@ const unsigned int thrift_application_base<rpcserver_base, 
rpcserver_booter_thri
     ALRIGHT_DEFAULT_BUFFER_SIZE);
 
 template<class rpcserver_base,  class rpcserver_booter_thrift>
-std::auto_ptr<thrift_application_base_impl>
+boost::scoped_ptr<thrift_application_base_impl>
   thrift_application_base<rpcserver_base,  rpcserver_booter_thrift>::p_impl(
       new thrift_application_base_impl());
 
 template<class rpcserver_base, class rpcserver_booter_thrift>
 thrift_application_base<rpcserver_base, 
rpcserver_booter_thrift>::~thrift_application_base()
 {
-  GR_LOG_DEBUG(d_debug_logger, "thrift_application_base: shutdown");
   if(d_thirft_is_running) {
     d_thriftserver->stop();
     d_thirft_is_running = false;
@@ -125,7 +121,7 @@ bool thrift_application_base<rpcserver_base, 
rpcserver_booter_thrift>::applicati
     const std::string boost_hostname(boost::asio::ip::host_name());
 
     std::string endpoint = boost::str(boost::format("-h %1% -p %2%") % 
boost_hostname % used_port);
-    //std::cout << "Thrift endpoint: " << endpoint << " boost hostname: " << 
boost_hostname << std::endl;
+
     set_endpoint(endpoint);
 
     GR_LOG_INFO(d_logger, "Apache Thrift: " + endpoint);
diff --git a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc 
b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
index c4655d3..3e6eabc 100644
--- a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
+++ b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
@@ -49,6 +49,7 @@ void
 rpcserver_thrift::registerConfigureCallback(const std::string &id,
                                             const configureCallback_t callback)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   {
     ConfigureCallbackMap_t::const_iterator iter(d_setcallbackmap.find(id));
     if(iter != d_setcallbackmap.end()) {
@@ -68,6 +69,7 @@ rpcserver_thrift::registerConfigureCallback(const std::string 
&id,
 void
 rpcserver_thrift::unregisterConfigureCallback(const std::string &id)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   ConfigureCallbackMap_t::iterator iter(d_setcallbackmap.find(id));
   if(iter == d_setcallbackmap.end()) {
     std::stringstream s;
@@ -86,6 +88,7 @@ void
 rpcserver_thrift::registerQueryCallback(const std::string &id,
                                         const queryCallback_t callback)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   {
     QueryCallbackMap_t::const_iterator iter(d_getcallbackmap.find(id));
     if(iter != d_getcallbackmap.end()) {
@@ -105,6 +108,7 @@ rpcserver_thrift::registerQueryCallback(const std::string 
&id,
 void
 rpcserver_thrift::unregisterQueryCallback(const std::string &id)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   QueryCallbackMap_t::iterator iter(d_getcallbackmap.find(id));
   if(iter == d_getcallbackmap.end()) {
     std::stringstream s;
@@ -123,6 +127,7 @@ rpcserver_thrift::unregisterQueryCallback(const std::string 
&id)
 void
 rpcserver_thrift::setKnobs(const GNURadio::KnobMap& knobs)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   std::for_each(knobs.begin(), knobs.end(),
                 set_f<GNURadio::KnobMap::value_type,ConfigureCallbackMap_t>
                 (d_setcallbackmap, cur_priv));
@@ -133,6 +138,7 @@ void
 rpcserver_thrift::getKnobs(GNURadio::KnobMap& _return,
                            const GNURadio::KnobIDList& knobs)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   if(knobs.size() == 0) {
     std::for_each(d_getcallbackmap.begin(), d_getcallbackmap.end(),
                   get_all_f<QueryCallbackMap_t::value_type, 
QueryCallbackMap_t, GNURadio::KnobMap>
@@ -148,6 +154,7 @@ rpcserver_thrift::getKnobs(GNURadio::KnobMap& _return,
 void
 rpcserver_thrift::getRe(GNURadio::KnobMap& _return, const 
GNURadio::KnobIDList& knobs)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   if(knobs.size() == 0) {
     std::for_each(d_getcallbackmap.begin(), d_getcallbackmap.end(),
                   get_all_f<QueryCallbackMap_t::value_type, 
QueryCallbackMap_t, GNURadio::KnobMap>
@@ -172,6 +179,7 @@ void
 rpcserver_thrift::properties(GNURadio::KnobPropMap& _return,
                              const GNURadio::KnobIDList& knobs)
 {
+  boost::mutex::scoped_lock lock(d_callback_map_lock);
   if(knobs.size() == 0) {
     std::for_each(d_getcallbackmap.begin(), d_getcallbackmap.end(),
                   properties_all_f<QueryCallbackMap_t::value_type,



reply via email to

[Prev in Thread] Current Thread [Next in Thread]