00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "SupplierAdmin.h"
00025
00026 #include "EventChannel.h"
00027 #include "ProxyPushConsumer.h"
00028 #include "ProxyPullConsumer.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031
00032 namespace OmniEvents {
00033
00034 CosEventChannelAdmin::ProxyPushConsumer_ptr
00035 SupplierAdmin_i::obtain_push_consumer()
00036 {
00037 return _pushConsumer->createObject();
00038 }
00039
00040
00041 CosEventChannelAdmin::ProxyPullConsumer_ptr
00042 SupplierAdmin_i::obtain_pull_consumer()
00043 {
00044 if(!_pullConsumer)
00045 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00046 return _pullConsumer->createObject();
00047 }
00048
00049
00050 SupplierAdmin_i::SupplierAdmin_i(
00051 const EventChannel_i& channel,
00052 PortableServer::POA_ptr poa
00053 )
00054 : Servant(poa),
00055 _channel(channel),
00056 _pushConsumer(NULL),
00057 _pullConsumer(NULL),
00058 _queue(),
00059 _nextPull(0)
00060 {
00061
00062
00063
00064
00065
00066
00067 _pushConsumer=new ProxyPushConsumer_i(_poa,_queue,_channel.consumerAdmin());
00068
00069 activateObjectWithId("SupplierAdmin");
00070 _remove_ref();
00071 }
00072
00073
00074 SupplierAdmin_i::~SupplierAdmin_i()
00075 {
00076 if(_pushConsumer)
00077 {
00078 delete _pushConsumer;
00079 _pushConsumer=NULL;
00080 }
00081 for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
00082 delete *i;
00083 DB(20,"~SupplierAdmin_i()")
00084 }
00085
00086
00087 void SupplierAdmin_i::collect(list<CORBA::Any*>& events)
00088 {
00089 if(_pullConsumer)
00090 {
00091
00092 unsigned long sec,nsec;
00093 omni_thread::get_time(&sec,&nsec);
00094 if(sec>_nextPull)
00095 {
00096 _pullConsumer->trigger();
00097 _nextPull=sec+_channel.pullRetryPeriod();
00098 }
00099 }
00100 _pushConsumer->trigger();
00101
00102 events=_queue;
00103 _queue.clear();
00104 }
00105
00106
00107 void SupplierAdmin_i::disconnect()
00108 {
00109 if(_pushConsumer)
00110 _pushConsumer->disconnect();
00111 if(_pullConsumer)
00112 _pullConsumer->disconnect();
00113 }
00114
00115
00116 void SupplierAdmin_i::reincarnate(const PersistNode& node)
00117 {
00118
00119 PersistNode* pushcNode =node.child("ProxyPushConsumer");
00120 if(pushcNode && !pushcNode->_child.empty())
00121 {
00122 assert(_pushConsumer!=NULL);
00123 _pushConsumer->reincarnate(*pushcNode);
00124 }
00125
00126
00127 PersistNode* pullcNode =node.child("ProxyPullConsumer");
00128 if(pullcNode && !pullcNode->_child.empty())
00129 {
00130 if(!_pullConsumer)
00131 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00132 _pullConsumer->reincarnate(*pullcNode);
00133 }
00134 }
00135
00136
00137 void SupplierAdmin_i::output(ostream& os)
00138 {
00139 if(_pushConsumer)
00140 _pushConsumer->output(os);
00141 if(_pullConsumer)
00142 _pullConsumer->output(os);
00143 }
00144
00145
00146 };