00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029
00030 namespace OmniEvents {
00031
00032
00033
00034
00035
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038 const PortableServer::ObjectId& oid,
00039 PortableServer::POA_ptr poa
00040 )
00041 {
00042 DB(20,"ProxyPullConsumerManager::incarnate()")
00043 ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044 _servants.insert(result);
00045 return result;
00046 }
00047
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049 PortableServer::POA_ptr parentPoa,
00050 list<CORBA::Any*>& q
00051 )
00052 : ProxyManager(parentPoa,"ProxyPullConsumer"),
00053 _queue(q)
00054 {
00055
00056 }
00057
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060 DB(20,"~ProxyPullConsumerManager()")
00061 }
00062
00063 CosEventChannelAdmin::ProxyPullConsumer_ptr
00064 ProxyPullConsumerManager::createObject()
00065 {
00066 return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00067 _managedPoa.in(),
00068 CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00069 );
00070 }
00071
00072 void ProxyPullConsumerManager::trigger()
00073 {
00074
00075 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00076 {
00077 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00078 proxy->trigger();
00079 }
00080 }
00081
00082 void ProxyPullConsumerManager::disconnect()
00083 {
00084 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00085 {
00086 Proxy* p =*i;
00087 ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00088 ppc->disconnect_pull_consumer();
00089 }
00090 }
00091
00092
00093
00094
00095
00096
00097
00098
00099 void ProxyPullConsumer_i::connect_pull_supplier(
00100 CosEventComm::PullSupplier_ptr pullSupplier
00101 )
00102 {
00103 if(CORBA::is_nil(pullSupplier))
00104 throw CORBA::BAD_PARAM();
00105 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00106 throw CosEventChannelAdmin::AlreadyConnected();
00107 _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00108
00109 output(WriteLock().os);
00110 }
00111
00112 void ProxyPullConsumer_i::disconnect_pull_consumer()
00113 {
00114 DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00115 eraseKey("SupplierAdmin/ProxyPullConsumer");
00116 deactivateObject();
00117 if(CORBA::is_nil(_target))
00118 {
00119 throw CORBA::OBJECT_NOT_EXIST(
00120 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00121 CORBA::COMPLETED_NO
00122 );
00123 }
00124 else
00125 {
00126 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00127 req->send_deferred();
00128 Orb::inst().deferredRequest(req._retn());
00129 _target=CosEventComm::PullSupplier::_nil();
00130 }
00131 }
00132
00133
00134
00135 ProxyPullConsumer_i::ProxyPullConsumer_i(
00136 PortableServer::POA_ptr poa,
00137 list<CORBA::Any*>& q
00138 )
00139 : Proxy(poa),
00140 _target(CosEventComm::PullSupplier::_nil()),
00141 _queue(q)
00142 {
00143 _exceptionCount[Pull]=0;
00144 _exceptionCount[TryPull]=0;
00145 }
00146
00147 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00148 {
00149 DB(20,"~ProxyPullConsumer_i()")
00150 }
00151
00152 void ProxyPullConsumer_i::trigger()
00153 {
00154
00155 CORBA::String_var opname ="pull";
00156
00157 if(!CORBA::is_nil(_req) && _req->poll_response())
00158 {
00159 opname=CORBA::string_dup(_req->operation());
00160
00161 CORBA::Environment_ptr env=_req->env();
00162 if(!CORBA::is_nil(env) && env->exception())
00163 {
00164 CORBA::Exception* ex =env->exception();
00165 DB(10,"ProxyPullConsumer got exception"
00166 IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00167 _req=CORBA::Request::_nil();
00168 if(0==strcmp("pull",opname))
00169 {
00170 ++(_exceptionCount[Pull]);
00171 opname="try_pull";
00172 }
00173 else if(0==strcmp("try_pull",opname))
00174 {
00175 ++(_exceptionCount[TryPull]);
00176 opname="pull";
00177 }
00178 else
00179 DB(2,"Ignoring unrecognised response. operation:"<<opname);
00180 if(_exceptionCount[Pull]>=2 && _exceptionCount[TryPull]>=2)
00181 {
00182 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00183
00184
00185 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00186 req->send_deferred();
00187 Orb::inst().deferredRequest(req._retn());
00188
00189 _target=CosEventComm::PullSupplier::_nil();
00190 eraseKey("SupplierAdmin/ProxyPullConsumer");
00191 deactivateObject();
00192 return;
00193 }
00194 }
00195 else
00196 {
00197
00198 bool hasEvent=false;
00199 if(0==strcmp("pull",opname))
00200 {
00201 _exceptionCount[Pull]=0;
00202 hasEvent=true;
00203 }
00204 else if(0==strcmp("try_pull",opname))
00205 {
00206 _exceptionCount[TryPull]=0;
00207 CORBA::NVList_ptr args=_req->arguments();
00208 if(args->count()==1)
00209 {
00210 CORBA::NamedValue_var hasEventArg=args->item(0);
00211 if(0==strcmp(hasEventArg->name(),"has_event"))
00212 {
00213 CORBA::Any* a =hasEventArg->value();
00214 CORBA::Boolean b;
00215 CORBA::Any::to_boolean tb(b);
00216 hasEvent=(((*a)>>=tb) && b);
00217 }
00218 }
00219 }
00220
00221 if(hasEvent)
00222 {
00223 CORBA::Any* event =new CORBA::Any();
00224 _req->return_value() >>= (*event);
00225 _queue.push_back(event);
00226 }
00227 }
00228 _req=CORBA::Request::_nil();
00229 }
00231 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00232 {
00233 _req=_target->_request(opname);
00234 _req->set_return_type(CORBA::_tc_any);
00235 if(0==strcmp("try_pull",opname))
00236 _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00237 _req->send_deferred();
00238 }
00239 }
00240
00241 void ProxyPullConsumer_i::reincarnate(
00242 const string& oid,
00243 const PersistNode& node
00244 )
00245 {
00246 CosEventComm::PullSupplier_var pullSupplier =
00247 string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00248
00249 activateObjectWithId(oid.c_str());
00250 connect_pull_supplier(pullSupplier.in());
00251 }
00252
00253 void ProxyPullConsumer_i::output(ostream& os)
00254 {
00255 basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00256 }
00257
00258 };