xrootd
|
00001 /*****************************************************************************/ 00002 /* */ 00003 /* XrdMonDecSink.hh */ 00004 /* */ 00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */ 00006 /* All Rights Reserved */ 00007 /* Produced by Jacek Becla for Stanford University under contract */ 00008 /* DE-AC02-76SF00515 with the Department of Energy */ 00009 /*****************************************************************************/ 00010 00011 // $Id$ 00012 00013 #ifndef XRDMONDECSINK_HH 00014 #define XRDMONDECSINK_HH 00015 00016 #include "XrdMon/XrdMonDecDictInfo.hh" 00017 #include "XrdMon/XrdMonDecTraceInfo.hh" 00018 #include "XrdMon/XrdMonDecUserInfo.hh" 00019 #include "XrdMon/XrdMonDecStageInfo.hh" 00020 #include "XrdMon/XrdMonBufferedOutput.hh" 00021 #include "XrdSys/XrdSysPthread.hh" 00022 #include <algorithm> 00023 #include <fstream> 00024 #include <map> 00025 #include <vector> 00026 using std::fstream; 00027 using std::map; 00028 using std::pair; 00029 using std::vector; 00030 00031 class XrdMonDecSink { 00032 public: 00033 XrdMonDecSink(const char* baseDir, 00034 const char* rtLogDir, 00035 int rtBufSize, 00036 bool saveTraces, 00037 int maxTraceLogSize); 00038 ~XrdMonDecSink(); 00039 00040 void init(dictid_t min, dictid_t max, const string& senderHP); 00041 sequen_t lastSeq() const { return _lastSeq; } 00042 void registerXrdRestart(kXR_int32 stod, senderid_t senderId); 00043 00044 void setLastSeq(sequen_t seq) { _lastSeq = seq; } 00045 00046 void addDictId(dictid_t xrdId, 00047 const char* theString, 00048 int len, 00049 senderid_t senderId); 00050 void addStageInfo(dictid_t xrdId, 00051 const char* theString, 00052 int len, 00053 senderid_t senderId); 00054 void addUserId(dictid_t xrdId, 00055 const char* theString, 00056 int len, 00057 senderid_t senderId); 00058 void add(dictid_t xrdId, 00059 XrdMonDecTraceInfo& trace, 00060 senderid_t senderId); 00061 void addUserDisconnect(dictid_t xrdId, 00062 kXR_int32 sec, 00063 kXR_int32 timestamp, 00064 senderid_t senderId); 00065 void openFile(dictid_t dictId, 00066 kXR_int32 timestamp, 00067 senderid_t senderId, 00068 kXR_int64 fSize); 00069 void closeFile(dictid_t dictId, 00070 kXR_int64 bytesR, 00071 kXR_int64 bytesW, 00072 kXR_int32 timestamp, 00073 senderid_t senderId); 00074 void flushHistoryData(); 00075 void flushRealTimeData() { if ( 0 != _rtLogger ) _rtLogger->flush(); } 00076 00077 void reset(senderid_t senderId); 00078 00079 private: 00080 typedef map<dictid_t, XrdMonDecDictInfo*> dmap_t; 00081 typedef map<dictid_t, XrdMonDecUserInfo*> umap_t; 00082 typedef map<dictid_t, XrdMonDecDictInfo*>::iterator dmapitr_t; 00083 typedef map<dictid_t, XrdMonDecUserInfo*>::iterator umapitr_t; 00084 00085 void initRT(const char* rtLogDir, int rtBufSize); 00086 void addVersion(); 00087 00088 void loadUniqueIdsAndSeq(); 00089 vector<XrdMonDecDictInfo*> loadActiveDictInfo(); 00090 void flushClosedDicts(); 00091 void flushUserCache(); 00092 void flushTCache(); 00093 void checkpoint(); 00094 void openTraceFile(fstream& f); 00095 void write2TraceFile(fstream& f, const char* buf, int len); 00096 void registerLostPacket(dictid_t id, const char* descr); 00097 void reportLostPackets(); 00098 00099 void flushOneDMap(dmap_t* m, int& curLen, const int BUFSIZE, 00100 string& buf, fstream& fD); 00101 void flushOneUMap(umap_t* m, int& curLen, const int BUFSIZE, 00102 string& buf, fstream& fD); 00103 00104 void resetDMap(senderid_t senderId); 00105 void resetUMap(senderid_t senderId); 00106 00107 private: 00108 // this defines how frequently version information will be 00109 // added to the log file (every ...how many entries in the log file) 00110 static const kXR_unt16 VER_FREQ; 00111 00112 kXR_unt16 _verFreqCount; 00113 00114 00115 vector< dmap_t* > _dCache; 00116 vector< umap_t* > _uCache; 00117 00118 // The mutexes guard access to dCache, uCache respectively. 00119 // _dCache and _uCache can be accessed from different threads 00120 // (periodic data flushing inside dedicated thread) 00121 XrdSysMutex _dMutex; 00122 XrdSysMutex _uMutex; 00123 00124 XrdMonBufferedOutput* _rtLogger; 00125 00126 bool _saveTraces; 00127 typedef vector<XrdMonDecTraceInfo> TraceVector; 00128 TraceVector _tCache; 00129 kXR_unt32 _tCacheSize; 00130 kXR_unt16 _traceLogNumber; // trace.000.ascii, 001, and so on... 00131 kXR_int64 _maxTraceLogSize; // [in MB] 00132 00133 map<dictid_t, long> _lost; //lost dictIds -> number of lost traces 00134 00135 sequen_t _lastSeq; 00136 dictid_t _uniqueDictId; // dictId in mySQL, unique for given xrootd host 00137 dictid_t _uniqueUserId; // userId in mySQL, unique for given xrootd host 00138 00139 string _path; // <basePath>/<date>_seqId_ 00140 string _jnlPath; // <basePath>/jnl 00141 string _dictPath; // <basePath>/<YYYYMMDD_HH:MM:SS.MMM_dict.ascii 00142 string _userPath; // <basePath>/<YYYYMMDD_HH:MM:SS.MMM_user.ascii 00143 string _rtFlagPath; // <rtLogDir>/rtRunning.flag 00144 string _rtMaxIdsPath;// <rtLogDir>/rtMax.jnl 00145 string _xrdRestartLog;// <basePath>/xrdRestarts.ascii 00146 }; 00147 00148 #endif /* XRDMONDECSINK_HH */