00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140 #include "omniEventsLog.h"
00141
00142 #ifdef HAVE_CONFIG_H
00143 # include "config.h"
00144 #endif
00145
00146 #include <stdio.h>
00147
00148 #ifdef HAVE_STDLIB_H
00149 # include <stdlib.h>
00150 #endif
00151
00152 #ifdef HAVE_SYS_TYPES_H
00153 # include <sys/types.h>
00154 #endif
00155
00156 #ifdef HAVE_SYS_STAT_H
00157 # include <sys/stat.h>
00158 #endif
00159
00160 #ifdef HAVE_FCNTL_H
00161 # include <fcntl.h>
00162 #endif
00163
00164 #if defined(__VMS) && __CRTL_VER < 70000000
00165 # include <omniVMS/unlink.hxx>
00166 #endif
00167
00168 #ifdef __WIN32__
00169 # include <io.h>
00170 # include <winbase.h>
00171 # define stat(x,y) _stat(x,y)
00172 # define unlink(x) _unlink(x)
00173 # define STRUCT_STAT struct _stat
00174 #else
00175 # define STRUCT_STAT struct stat
00176 #endif // __WIN32__
00177
00178 #ifdef HAVE_UNISTD_H
00179 # include <unistd.h>
00180 #endif
00181
00182 #ifdef HAVE_LIBC_H
00183 # include <libc.h>
00184 #endif
00185
00186 #ifdef HAVE_SYS_PARAM_H
00187 # include <sys/param.h>
00188 #endif
00189
00190 #include <errno.h>
00191 #include <time.h>
00192 #include <assert.h>
00193 #include "gethostname.h"
00194
00195 #include "EventChannelFactory.h"
00196 #include "Orb.h"
00197 #include "defaults.h"
00198
00199
00200
00201
00202
00203 #if defined(HAVE_FSTREAM_OPEN)
00204 # define FLAG_TRUNCATE ios::trunc
00205 # define FLAG_APPEND ios::app
00206 # define FLAG_SYNC 0
00207 #elif defined(HAVE_FSTREAM_ATTACH)
00208 # if defined(__WIN32__)
00209 # define FLAG_SYNC 0
00210 # elif defined(O_SYNC)
00211 # define FLAG_SYNC O_SYNC
00212 # else
00213 # define FLAG_SYNC O_FSYNC // FreeBSD 3.2 does not have O_SYNC???
00214 # endif
00215 # define FLAG_TRUNCATE O_CREAT|O_TRUNC
00216 # define FLAG_APPEND O_APPEND
00217 #else
00218 # error "Can't open a file without ofstream::open() or ofstream::attach()"
00219 #endif
00220
00221
00222
00223
00224
00225 #ifdef __VMS
00226 # define VMS_SEMICOLON ";"
00227 #else
00228 # define VMS_SEMICOLON
00229 #endif
00230
00231 extern int yyparse();
00232 extern int yydebug;
00233 extern FILE *yyin;
00234
00235 namespace OmniEvents {
00236
00242 class timestamp
00243 {
00244 char str[29];
00245 public:
00246 timestamp(void)
00247 {
00248 str[0] = '[';
00249 str[1] = str[28] = '\0';
00250 }
00251 const char* t(void)
00252 {
00253 time_t t =time(NULL);
00254 char* p =ctime(&t);
00255 if(strncmp(p, &str[1], 24) == 0)
00256 return "";
00257 strncpy(&str[1], p, 24);
00258 str[25] = ']';
00259 str[26] = ' ';
00260 str[27] = ' ';
00261 return str;
00262 }
00263 };
00264
00265 timestamp ts;
00266
00267
00268
00269
00270
00271 omniEventsLog *omniEventsLog::theLog = NULL;
00272
00273 omniEventsLog::omniEventsLog(const char* logdir) :
00274 _logstream(),
00275 _activeFilename(NULL),
00276 _backupFilename(NULL),
00277 _checkpointFilename(NULL),
00278 _workerThread(NULL),
00279 _factory(NULL),
00280 _checkpointNeeded(true),
00281 _lock()
00282 {
00283 omniEventsLog::theLog = this;
00284 initializeFileNames(logdir);
00285 }
00286
00287
00288 bool omniEventsLog::fileExists(const char* filename) const
00289 {
00290 STRUCT_STAT sb;
00291 return(::stat(filename,&sb) == 0);
00292 }
00293
00294
00295 PersistNode* omniEventsLog::bootstrap(int port, const char* endPointNoListen)
00296 {
00297
00298
00299 PersistNode* initialState=new PersistNode();
00300 PersistNode* ecf =initialState->addnode("ecf");
00301 ecf->addattr("port",port);
00302 if(endPointNoListen && endPointNoListen[0])
00303 ecf->addattr(string("endPointNoListen=")+endPointNoListen);
00304 return initialState;
00305 }
00306
00307
00308 PersistNode* omniEventsLog::parse()
00309 {
00310
00311
00312 ifstream persiststream(_activeFilename);
00313 if(!persiststream)
00314 {
00315 cerr << "Error: cannot read database file '"
00316 << _activeFilename << "'." << endl;
00317 if( fileExists(_backupFilename) )
00318 {
00319 cerr <<
00320 " Backup file '" << _backupFilename << "' exists.\n"
00321 " Either rename it to '" << _activeFilename << "' to\n"
00322 " to recover the server's state, or delete it to create a new\n"
00323 " database file." << endl;
00324 }
00325 exit(1);
00326 }
00327 PersistNode* initialState=new PersistNode(persiststream);
00328 persiststream.close();
00329
00330
00331
00332 const char* errorStr =NULL;
00333 PersistNode* ecf=initialState->child("ecf");
00334 if(!ecf)
00335 errorStr="Can't find EventChannelFactory.";
00336 else if(ecf->attrLong("port",-1)<=0)
00337 errorStr="EventChannelFactory is not assigned a valid port.";
00338
00339 if(errorStr)
00340 {
00341 cerr<<"Error parsing database '"<<_activeFilename<<"'.\n"
00342 <<errorStr<<" Try deleting the file (and any backup)."<<endl;
00343 exit(1);
00344 }
00345
00346 return initialState;
00347 }
00348
00349
00350 void omniEventsLog::incarnateFactory(PersistNode* initialState)
00351 {
00352 assert(initialState!=NULL);
00353
00354
00355
00356 try
00357 {
00358 openOfstream(_logstream,_activeFilename,FLAG_APPEND);
00359 }
00360 catch (IOError& ex)
00361 {
00362 cerr << "Error: cannot "
00363 << (fileExists(_activeFilename)?"write to":"create new")
00364 << " database file '" << _activeFilename
00365 << "': " << strerror(errno) << endl;
00366 cerr << "\nUse option '-l' or set the environment variable "
00367 << OMNIEVENTS_LOGDIR_ENV_VAR
00368 << "\nto specify the directory where the files are kept.\n"
00369 << endl;
00370 _logstream.close();
00371 unlink(_activeFilename);
00372 exit(1);
00373 }
00374
00375
00376
00377 PersistNode* ecf=initialState->child("ecf");
00378 assert(ecf!=NULL);
00379 _factory =new EventChannelFactory_i(*ecf);
00380 assert(!CORBA::is_nil(_factory->_this()));
00381 }
00382
00383
00384 void omniEventsLog::runWorker()
00385 {
00386 assert(_factory!=NULL);
00387
00388 _workerThread=new omniEventsLogWorker(
00389 this,
00390 &omniEventsLog::checkpoint,
00391 omni_thread::PRIORITY_NORMAL
00392 );
00393 }
00394
00395
00396 void omniEventsLog::output(ostream& os)
00397 {
00398 _factory->output(os);
00399 os<<endl;
00400 }
00401
00402
00403 void omniEventsLog::checkpoint(void)
00404 {
00405 int idle_time_btw_chkpt;
00406 static int firstCheckPoint = 1;
00407 char *itbc = getenv("OMNIEVENTS_ITBC");
00408 if (itbc == NULL || sscanf(itbc,"%d",&idle_time_btw_chkpt) != 1)
00409 {
00410 idle_time_btw_chkpt=OMNIEVENTS_LOG_CHECKPOINT_PERIOD;
00411 }
00412
00413 omni_mutex mutex;
00414 omni_condition cond(&mutex);
00415
00416 mutex.lock();
00417 while (1) {
00418
00419
00420
00421
00422
00423 if (! firstCheckPoint)
00424 {
00425 unsigned long s, n;
00426 omni_thread::get_time(&s, &n, idle_time_btw_chkpt);
00427 cond.timedwait(s,n);
00428
00429 _lock.lock();
00430 if(!_checkpointNeeded)
00431 {
00432 _lock.unlock();
00433 continue;
00434 }
00435 }
00436 else
00437 {
00438 _lock.lock();
00439 firstCheckPoint = 0;
00440 }
00441
00442 DB(1,ts.t() << "Checkpointing Phase 1: Prepare.")
00443
00444 ofstream ckpf;
00445 int fd = -1;
00446
00447 try
00448 {
00449 try
00450 {
00451 openOfstream(ckpf,_checkpointFilename,FLAG_TRUNCATE|FLAG_SYNC,&fd);
00452 }
00453 catch(IOError& ex)
00454 {
00455 DB(0,ts.t() << "Error: cannot open checkpoint file '"
00456 << _checkpointFilename << "' for writing.")
00457 throw;
00458 }
00459
00460 output(ckpf);
00461
00462 ckpf.close();
00463 if(!ckpf)
00464 throw IOError();
00465
00466
00467 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00468 if(close(fd) < 0)
00469 throw IOError();
00470 #endif
00471
00472 }
00473 catch(IOError& ex)
00474 {
00475 DB(0,ts.t()<<"I/O error writing checkpoint file: "<<strerror(errno)
00476 <<"\nAbandoning checkpoint")
00477 ckpf.close();
00478
00479 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00480 close(fd);
00481 #endif
00482 unlink(_checkpointFilename);
00483 _lock.unlock();
00484 continue;
00485 }
00486
00487
00488
00489
00490
00491 DB(1,ts.t() << "Checkpointing Phase 2: Commit.")
00492
00493
00494 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00495 close(_logstream.rdbuf()->fd());
00496 #endif
00497
00498 _logstream.close();
00499
00500 unlink(_backupFilename);
00501
00502 #if defined(__WIN32__)
00503 if(rename(_activeFilename, _backupFilename) != 0)
00504 #elif defined(__VMS)
00505 if(rename(_activeFilename, _backupFilename) < 0)
00506 #else
00507 if(link(_activeFilename,_backupFilename) < 0)
00508 #endif
00509 {
00510
00511 DB(0,ts.t() << "Error: failed to link backup file '"
00512 << _backupFilename << "' to old log file '"
00513 << _activeFilename << "'.")
00514 exit(1);
00515 }
00516
00517 #if !defined( __VMS) && !defined(__WIN32__)
00518 if(unlink(_activeFilename) < 0)
00519 {
00520
00521 DB(0,ts.t() << "Error: failed to unlink old log file '"
00522 << _activeFilename << "': " << strerror(errno))
00523 exit(1);
00524 }
00525 #endif
00526
00527 #if defined(__WIN32__)
00528 if(rename(_checkpointFilename,_activeFilename) != 0)
00529 #elif defined(__VMS)
00530 if(rename(_checkpointFilename,_activeFilename) < 0)
00531 #else
00532 if(link(_checkpointFilename,_activeFilename) < 0)
00533 #endif
00534 {
00535
00536 DB(0,ts.t() << "Error: failed to link log file '" << _activeFilename
00537 << "' to checkpoint file '" << _checkpointFilename << "'.")
00538 exit(1);
00539 }
00540
00541 #if !defined( __VMS) && !defined(__WIN32__)
00542 if (unlink(_checkpointFilename) < 0)
00543 {
00544
00545 DB(0,ts.t() << "Error: failed to unlink checkpoint file '"
00546 << _checkpointFilename << "'.")
00547 exit(1);
00548 }
00549 #endif
00550
00551 try
00552 {
00553 openOfstream(_logstream,_activeFilename,FLAG_APPEND|FLAG_SYNC,&fd);
00554 }
00555 catch (IOError& ex)
00556 {
00557 DB(0,ts.t() << "Error: cannot open new log file '" << _activeFilename
00558 << "' for writing.")
00559 exit(1);
00560 }
00561
00562 DB(1,ts.t() << "Checkpointing completed.")
00563
00564 _checkpointNeeded=false;
00565 _lock.unlock();
00566 }
00567 mutex.unlock();
00568 }
00569
00570
00581 void omniEventsLog::initializeFileNames(const char* logdir)
00582 {
00583 if(!logdir)
00584 logdir=getenv(OMNIEVENTS_LOGDIR_ENV_VAR);
00585 if(!logdir)
00586 logdir=OMNIEVENTS_LOG_DEFAULT_LOCATION;
00587
00588 const char* logname ="omnievents-";
00589 char hostname[MAXHOSTNAMELEN];
00590 if (0!=gethostname(hostname,MAXHOSTNAMELEN))
00591 {
00592 cerr << "Error: cannot get the name of this host." << endl;
00593 exit(1);
00594 }
00595 const char* sep ="";
00596
00597 #if defined(__WIN32__)
00598 sep="\\";
00599 #elif defined(__VMS)
00600 char last( logdir[strlen(logdir)-1] );
00601 if (last != ':' && last != ']')
00602 {
00603 cerr << "Error: " << OMNIEVENTS_LOGDIR_ENV_VAR << " (" << logdir
00604 << ") is not a directory name." << endl;
00605 exit(1);
00606 }
00607 #else // Unix
00608 if (logdir[0] != '/')
00609 {
00610 cerr << "Error: " << OMNIEVENTS_LOGDIR_ENV_VAR << " (" << logdir
00611 << ") is not an absolute path name." << endl;
00612 exit(1);
00613 }
00614 if (logdir[strlen(logdir)-1] != '/')
00615 sep="/";
00616 #endif
00617
00618
00619
00620
00621 setFilename(_activeFilename,logdir,sep,logname,hostname,".log" VMS_SEMICOLON);
00622 setFilename(_backupFilename,logdir,sep,logname,hostname,".bak" VMS_SEMICOLON);
00623 setFilename(
00624 _checkpointFilename,logdir,sep,logname,hostname,".ckp" VMS_SEMICOLON);
00625 }
00626
00627
00631 void omniEventsLog::setFilename(
00632 char*& filename, const char* logdir, const char* sep,
00633 const char* logname, const char* hostname, const char* ext)
00634 {
00635 size_t len=1+
00636 strlen(logdir)+strlen(sep)+strlen(logname)+strlen(hostname)+strlen(ext);
00637 filename=new char[len];
00638 sprintf(filename,"%s%s%s%s%s",logdir,sep,logname,hostname,ext);
00639 }
00640
00641
00655 void omniEventsLog::openOfstream(
00656 ofstream& s, const char* filename, int flags, int* fd)
00657 {
00658 #if defined(HAVE_FSTREAM_OPEN)
00659 # ifdef HAVE_STD_IOSTREAM
00660 ios::openmode openmodeflags =ios::out|ios::openmode(flags);
00661 # else
00662 int openmodeflags =ios::out|flags;
00663 # endif
00664
00665 # ifdef FSTREAM_OPEN_PROT
00666 s.open(filename,openmodeflags,0644);
00667 # else
00668 s.open(filename,openmodeflags);
00669 # endif
00670 if (!s)
00671 throw IOError();
00672
00673 #elif defined(HAVE_FSTREAM_ATTACH)
00674 # ifdef __WIN32__
00675 int localFd = _open(filename, O_WRONLY | flags, _S_IWRITE);
00676 # else
00677 int localFd = open(filename, O_WRONLY | flags, 0644);
00678 # endif
00679 if (localFd < 0)
00680 throw IOError();
00681 if(fd)
00682 (*fd)=localFd;
00683 s.attach(localFd);
00684 #endif
00685 }
00686
00687
00688
00689
00690
00691 omniEventsLogWorker::omniEventsLogWorker(
00692 omniEventsLog* object,
00693 Method method,
00694 priority_t priority
00695 ):omni_thread(NULL,priority)
00696 {
00697 DB(15, "omniEventsLogWorker::omniEventsLogWorker()");
00698
00699 _method=method;
00700 _object=object;
00701
00702 start_undetached();
00703 }
00704
00705
00706 void* omniEventsLogWorker::run_undetached(void *)
00707 {
00708 try {
00709 DB(15, "omniEventsLogWorker : run_undetached Start");
00710 (_object->*_method)();
00711 DB(15, "omniEventsLogWorker : run_undetached End");
00712 }
00713 catch (CORBA::SystemException& ex) {
00714 DB(0,"omniEventsLogWorker killed by CORBA system exception"
00715 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00716 }
00717 catch (CORBA::Exception& ex) {
00718 DB(0,"omniEventsLogWorker killed by CORBA exception"
00719 IF_OMNIORB4(": "<<ex._name()<<) ".")
00720 }
00721 catch(...) {
00722 DB(0,"omniEventsLogWorker killed by unknown exception.")
00723 }
00724 return NULL;
00725 }
00726
00727 omniEventsLogWorker::~omniEventsLogWorker()
00728 {
00729 DB(20, "omniEventsLogWorker::~omniEventsLogWorker()");
00730 }
00731
00732
00733 };