Main Page   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   File Members  

ProxyPullConsumer.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPullConsumer.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029 
00030 namespace OmniEvents {
00031 
00032 //
00033 //  ProxyPullConsumerManager
00034 //
00035 
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038   const PortableServer::ObjectId& oid,
00039   PortableServer::POA_ptr         poa
00040 )
00041 {
00042   DB(20,"ProxyPullConsumerManager::incarnate()")
00043   ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044   _servants.insert(result);
00045   return result;
00046 }
00047 
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049   PortableServer::POA_ptr parentPoa,
00050   list<CORBA::Any*>&      q
00051 )
00052 : ProxyManager(parentPoa,"ProxyPullConsumer"),
00053   _queue(q)
00054 {
00055   // pass
00056 }
00057 
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060   DB(20,"~ProxyPullConsumerManager()")
00061 }
00062 
00063 CosEventChannelAdmin::ProxyPullConsumer_ptr
00064 ProxyPullConsumerManager::createObject()
00065 {
00066   return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00067            _managedPoa.in(),
00068            CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00069          );
00070 }
00071 
00072 void ProxyPullConsumerManager::trigger()
00073 {
00074   // Trigger each servant in turn.
00075   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00076   {
00077     ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00078     proxy->trigger();
00079   }
00080 }
00081 
00082 void ProxyPullConsumerManager::disconnect()
00083 {
00084   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00085   {
00086     Proxy* p =*i; // Sun's CC requires this temporary.
00087     ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00088     ppc->disconnect_pull_consumer();
00089   }
00090 }
00091 
00092 
00093 //
00094 //  ProxyPullConsumer_i
00095 //
00096 
00097 // CORBA interface methods
00098 
00099 void ProxyPullConsumer_i::connect_pull_supplier(
00100   CosEventComm::PullSupplier_ptr pullSupplier
00101 )
00102 {
00103   if(CORBA::is_nil(pullSupplier))
00104       throw CORBA::BAD_PARAM();
00105   if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00106       throw CosEventChannelAdmin::AlreadyConnected();
00107   _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00108 
00109   output(WriteLock().os);
00110 }
00111 
00112 void ProxyPullConsumer_i::disconnect_pull_consumer()
00113 {
00114   DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00115   eraseKey("SupplierAdmin/ProxyPullConsumer");
00116   deactivateObject();
00117   if(CORBA::is_nil(_target))
00118   {
00119     throw CORBA::OBJECT_NOT_EXIST(
00120       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00121       CORBA::COMPLETED_NO
00122     );
00123   }
00124   else
00125   {
00126     CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00127     req->send_deferred();
00128     Orb::inst().deferredRequest(req._retn());
00129     _target=CosEventComm::PullSupplier::_nil();
00130   }
00131 }
00132 
00133 //
00134 
00135 ProxyPullConsumer_i::ProxyPullConsumer_i(
00136   PortableServer::POA_ptr poa,
00137   list<CORBA::Any*>&      q
00138 )
00139 : Proxy(poa),
00140   _target(CosEventComm::PullSupplier::_nil()),
00141   _queue(q)
00142 {
00143   _exceptionCount[Pull]=0;
00144   _exceptionCount[TryPull]=0;
00145 }
00146 
00147 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00148 {
00149   DB(20,"~ProxyPullConsumer_i()")
00150 }
00151 
00152 void ProxyPullConsumer_i::trigger()
00153 {
00154   // Prefer 'pull' method calls.
00155   CORBA::String_var opname ="pull";
00156 
00157   if(!CORBA::is_nil(_req) && _req->poll_response()) 
00158   {
00159     opname=CORBA::string_dup(_req->operation());
00160 
00161     CORBA::Environment_ptr env=_req->env(); // No need to release environment.
00162     if(!CORBA::is_nil(env) && env->exception()) 
00163     {
00164       CORBA::Exception* ex =env->exception(); // No need to free exception.
00165       DB(10,"ProxyPullConsumer got exception"
00166            IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00167       _req=CORBA::Request::_nil();
00168       if(0==strcmp("pull",opname))
00169       {
00170         ++(_exceptionCount[Pull]);
00171         opname="try_pull"; // Try something else next time.
00172       }
00173       else if(0==strcmp("try_pull",opname))
00174       {
00175         ++(_exceptionCount[TryPull]);
00176         opname="pull"; // Try something else next time.
00177       }
00178       else
00179           DB(2,"Ignoring unrecognised response. operation:"<<opname);
00180       if(_exceptionCount[Pull]>=2 && _exceptionCount[TryPull]>=2)
00181       {
00182         Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00183 
00184         // Try to notify the Supplier that the connection is closing.
00185         CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00186         req->send_deferred();
00187         Orb::inst().deferredRequest(req._retn());
00188 
00189         _target=CosEventComm::PullSupplier::_nil(); // disconnected
00190         eraseKey("SupplierAdmin/ProxyPullConsumer");
00191         deactivateObject();
00192         return;
00193       }
00194     }
00195     else  
00196     {
00197       // Do we have an event?
00198       bool hasEvent=false;
00199       if(0==strcmp("pull",opname))
00200       {
00201         _exceptionCount[Pull]=0;
00202         hasEvent=true;
00203       }
00204       else if(0==strcmp("try_pull",opname))
00205       {
00206         _exceptionCount[TryPull]=0;
00207         CORBA::NVList_ptr args=_req->arguments(); // No need to release args.
00208         if(args->count()==1)
00209         {
00210           CORBA::NamedValue_var hasEventArg=args->item(0);
00211           if(0==strcmp(hasEventArg->name(),"has_event"))
00212           {
00213             CORBA::Any* a =hasEventArg->value();
00214             CORBA::Boolean b;
00215             CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs!
00216             hasEvent=(((*a)>>=tb) && b);
00217           }
00218         }
00219       }
00220       // Pick up an event, if we have one.
00221       if(hasEvent)
00222       {
00223         CORBA::Any* event =new CORBA::Any();
00224         _req->return_value() >>= (*event);
00225         _queue.push_back(event);
00226       }
00227     }
00228     _req=CORBA::Request::_nil();
00229   }
00231   if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00232   {
00233     _req=_target->_request(opname);
00234     _req->set_return_type(CORBA::_tc_any);
00235     if(0==strcmp("try_pull",opname))
00236         _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00237     _req->send_deferred();
00238   }
00239 }
00240 
00241 void ProxyPullConsumer_i::reincarnate(
00242   const string&      oid,
00243   const PersistNode& node
00244 )
00245 {
00246   CosEventComm::PullSupplier_var pullSupplier =
00247     string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00248   // Do not activate until we know that we have read a valid target.
00249   activateObjectWithId(oid.c_str());
00250   connect_pull_supplier(pullSupplier.in());
00251 }
00252 
00253 void ProxyPullConsumer_i::output(ostream& os)
00254 {
00255   basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00256 }
00257 
00258 }; // end namespace OmniEvents

Generated on Fri Oct 8 15:52:51 2004 for OmniEvents by doxygen1.2.15