libassa 3.5.0
Public Member Functions | Private Types | Private Member Functions | Private Attributes
ASSA::Reactor Class Reference

#include <Reactor.h>

List of all members.

Public Member Functions

 Reactor ()
 Constructor.
 ~Reactor ()
 Destructor.
TimerId registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
 Register Timer Event handler with Reactor.
bool registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
 Register I/O Event handler with Reactor.
bool removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS)
 Remove Event handler from reactor for either all I/O events or timeout event or both.
bool removeTimerHandler (TimerId id_)
 Remove Timer event from the queue.
bool removeIOHandler (handler_t fd_)
 Remove IO Event handler from reactor.
void waitForEvents (void)
 Main waiting loop that blocks indefinitely processing events.
void waitForEvents (TimeVal *tv_)
 Wait for events for time specified.
void stopReactor (void)
 Stop Reactor's activity.
void deactivate (void)
 Deactivate Reactor.

Private Types

typedef std::map< u_int,
EventHandler * > 
Fd2Eh_Map_Type
 no cloning
typedef Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter

Private Member Functions

 Reactor (const Reactor &)
Reactoroperator= (const Reactor &)
 no cloning
void adjust_maxfdp1 (handler_t fd_)
 Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
bool handleError (void)
 Handle error in select(2) loop appropriately.
bool dispatch (int minimum_)
 Notify all EventHandlers registered on respecful events occured.
int isAnyReady (void)
 Return number of file descriptors ready accross all sets.
bool checkFDs (void)
 Check mask for bad file descriptors.
void dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
 Call handler's callback and, if callback returns negative value, remove it from the Reactor.
void calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_)
 Calculate closest timeout.

Private Attributes

int m_fd_setsize
 Max number of open files per process.
handler_t m_maxfd_plus1
 Max file descriptor number (in all sets) plus 1.
bool m_active
 Flag that indicates whether Reactor is active or had been stopped.
Fd2Eh_Map_Type m_readSet
 Event handlers awaiting on READ_EVENT.
Fd2Eh_Map_Type m_writeSet
 Event handlers awaiting on WRITE_EVENT.
Fd2Eh_Map_Type m_exceptSet
 Event handlers awaiting on EXCEPT_EVENT.
MaskSet m_waitSet
 Handlers to wait for event on.
MaskSet m_readySet
 Handlers that are ready for processing.
TimerQueue m_tqueue
 The queue of Timers.

Detailed Description

Definition at line 57 of file Reactor.h.


Member Typedef Documentation

typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter [private]

Definition at line 155 of file Reactor.h.

typedef std::map<u_int, EventHandler*> ASSA::Reactor::Fd2Eh_Map_Type [private]

no cloning

Definition at line 154 of file Reactor.h.


Constructor & Destructor Documentation

Reactor::Reactor ( )

Constructor.

Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.

Initialize winsock2 library

Definition at line 24 of file Reactor.cpp.

References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.

           : 
    m_fd_setsize  (1024), 
    m_maxfd_plus1 (0), 
    m_active      (true)
{
    trace_with_mask("Reactor::Reactor",REACTTRACE);

#if defined(WIN32)
    m_fd_setsize = FD_SETSIZE;

#else  // POSIX
    struct rlimit rlim;
    rlim.rlim_max = 0;

    if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
        m_fd_setsize = rlim.rlim_cur;
    }
#endif

#if defined (WIN32)             
    WSADATA data;
    WSAStartup (MAKEWORD (2, 2), &data);
#endif
}
Reactor::~Reactor ( )

Destructor.

Definition at line 55 of file Reactor.cpp.

References deactivate(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

{   
    trace_with_mask("Reactor::~Reactor",REACTTRACE);

    m_readSet.clear   ();
    m_writeSet.clear  ();
    m_exceptSet.clear ();
    deactivate ();
}
ASSA::Reactor::Reactor ( const Reactor ) [private]

Member Function Documentation

void Reactor::adjust_maxfdp1 ( handler_t  fd_) [private]

Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).

If the socket descriptor that has just been eliminated was the maxfd+1, we readjust to the next highest.

Win32 implementation of select() ignores this value altogether.

Definition at line 701 of file Reactor.cpp.

References DL, m_maxfd_plus1, m_waitSet, ASSA::MaskSet::max_fd(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by removeHandler(), and removeIOHandler().

{
#if !defined (WIN32)  /* POSIX */

    trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);

    if (m_maxfd_plus1 == fd_ + 1) 
    {
        m_maxfd_plus1 = m_waitSet.max_fd () + 1;
        DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
    }
#endif
}
void Reactor::calculateTimeout ( TimeVal *&  howlong_,
TimeVal maxwait_ 
) [private]

Calculate closest timeout.

If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.

Parameters:
maxwait_(in) how long we are expected to wait for event(s).
howlong_(out) how long we are going to wait.

Definition at line 421 of file Reactor.cpp.

References DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::top(), trace_with_mask, and ASSA::TimeVal::zeroTime().

Referenced by waitForEvents().

{
    trace_with_mask("Reactor::calculateTimeout",REACTTRACE);

    TimeVal now;
    TimeVal tv;

    if (m_tqueue.isEmpty () ) {
        howlong_ = maxwait_;
        goto done;
    }
    now = TimeVal::gettimeofday ();
    tv = m_tqueue.top ();
    
    if (tv < now) {
        /*--- 
          It took too long to get here (fraction of a millisecond), 
          and top timer had already expired. In this case,
          perform non-blocking select in order to drain the timer queue.
          ---*/
        *howlong_ = 0;
    }
    else {  
        DL((REACT,"--------- Timer Queue ----------\n"));
        m_tqueue.dump();
        DL((REACT,"--------------------------------\n"));

        if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
            *howlong_ = tv - now;
        }
        else {
            *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
        }
    }

 done:
    if (howlong_ != NULL) {
        DL((REACT,"delay (%f)\n", double (*howlong_) ));
    }
    else {
        DL((REACT,"delay (forever)\n"));
    }
}
bool Reactor::checkFDs ( void  ) [private]

Check mask for bad file descriptors.

Returns:
true if any fd(s) were found and removed; false otherwise

Definition at line 317 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by handleError().

{
    trace_with_mask("Reactor::checkFDs",REACTTRACE);
    
    bool num_removed = false;
    FdSet mask;
    timeval poll = { 0, 0 };

    for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
        if ( m_readSet[fd] != NULL ) {
            mask.setFd (fd);
            if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
                removeIOHandler (fd);
                num_removed = true;
                DL((REACT,"Detected BAD FD: %d\n", fd ));
            }
            mask.clear (fd);
        }
    }
    return (num_removed);
}
void ASSA::Reactor::deactivate ( void  ) [inline]

Deactivate Reactor.

This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.

Definition at line 234 of file Reactor.h.

References m_active.

Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().

{  m_active = false; }
bool Reactor::dispatch ( int  minimum_) [private]

Notify all EventHandlers registered on respecful events occured.

Many UNIX systems will count a particular file descriptor in the ready_ only ONCE, even if it was flagged by ::select(2) in, say, both read and write masks.

Parameters:
minimum_number of file descriptors ready.

Definition at line 626 of file Reactor.cpp.

References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

{
    trace_with_mask("Reactor::dispatch", REACTTRACE);

    m_tqueue.expire (TimeVal::gettimeofday ());

    if ( ready_ < 0 ) 
    {
#if !defined (WIN32)
        EL((ASSAERR,"::select(3) error\n"));
#endif
        return (false);
    }
    if ( ready_ == 0 ) {
        return (true);
    }

    DL((REACT,"Dispatching %d FDs.\n",ready_));
    DL((REACT,"m_readySet:\n"));
    m_readySet.dump ();

    /*--- Writes first ---*/
    dispatchHandler (m_readySet.m_wset, 
                     m_writeSet, 
                     &EventHandler::handle_write);

    /*--- Exceptions next ---*/
    dispatchHandler (m_readySet.m_eset, 
                     m_exceptSet, 
                     &EventHandler::handle_except);

    /*--- Finally, the Reads ---*/
    dispatchHandler (m_readySet.m_rset, 
                     m_readSet, 
                     &EventHandler::handle_read);

    return (true);
}
void Reactor::dispatchHandler ( FdSet mask_,
Fd2Eh_Map_Type fdSet_,
EH_IO_Callback  callback_ 
) [private]

Call handler's callback and, if callback returns negative value, remove it from the Reactor.

This spot needs re-thinking.

When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.

WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).

Definition at line 568 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

Referenced by dispatch().

{
    trace_with_mask("Reactor::dispatchHandler",REACTTRACE);

    int ret = 0;
    handler_t fd;
    EventHandler* ehp = NULL;
    std::string eh_id;

    Fd2Eh_Map_Iter iter = fdSet_.begin ();

    while (iter != fdSet_.end ()) 
    {
        fd  = (*iter).first;
        ehp = (*iter).second;

        if (mask_.isSet (fd) && ehp != NULL) 
        {
            eh_id = ehp->get_id ();
            DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
                eh_id.c_str (), fd));

            ret = (ehp->*callback_) (fd); /* Fire up a callback */

            if (ret == -1) {
                removeIOHandler (fd);
            }
            else if (ret > 0) {
                DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
                    ret, fd, eh_id.c_str ()));
                //return;   <-- would starve other connections
            }
            else {
                DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 
                    eh_id.c_str (), fd));
                mask_.clear (fd);
            }
            iter = fdSet_.begin ();
        }
        else {
            iter++;
        }
    }
}
bool Reactor::handleError ( void  ) [private]

Handle error in select(2) loop appropriately.

If commanded to stop, do so

Definition at line 341 of file Reactor.cpp.

References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

{
    trace_with_mask("Reactor::handleError",REACTTRACE);

    if ( !m_active ) {
        DL((REACT,"Received cmd to stop Reactor\n"));
        return (false);
    }

    /*---
      TODO: If select(2) returns before time expires, with
      a descriptor ready or with EINTR, timeval is not
      going to be updated with number of seconds remaining.
      This is true for all systems except Linux, which will
      do so. Therefore, to restart correctly in case of
      EINTR, we ought to take time measurement before and
      after select, and try to select() for remaining time.
    
      For now, we restart with the initial timing value.
      ---*/
    /*---
      BSD kernel never restarts select(2). SVR4 will restart if
      the SA_RESTART flag is specified when the signal handler
      for the signal delivered is installed. This means taht for
      portability, we must handle signal interrupts.
      ---*/

    if ( errno == EINTR ) {
        EL((REACT,"EINTR: interrupted select(2)\n"));
        /*
          If I was sitting in select(2) and received SIGTERM,
          the signal handler would have set m_active to 'false',
          and this function would have returned 'false' as above.
          For any other non-critical signals (USR1,...),
          we retry select.
        */
        return (true);
    }
    /*
      EBADF - bad file number. One of the file descriptors does
      not reference an open file to open(), close(), ioctl().
      This can happen if user closed fd and forgot to remove
      handler from Reactor.
    */
    if ( errno == EBADF ) {
        DL((REACT,"EBADF: bad file descriptor\n"));
        return (checkFDs ());
    }
    /*
      Any other error from select
    */
#if defined (WIN32) 
    DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
#else
    EL((ASSAERR,"select(3) error\n"));
#endif
    return (false);
}
int Reactor::isAnyReady ( void  ) [private]

Return number of file descriptors ready accross all sets.

Definition at line 404 of file Reactor.cpp.

References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

{
    trace_with_mask("Reactor::isAnyReady",REACTTRACE);

    int n = m_readySet.m_rset.numSet () +
        m_readySet.m_wset.numSet () +
        m_readySet.m_eset.numSet ();

    if ( n > 0 ) {
        DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
        m_readySet.dump ();
    }
    return (n);
}
Reactor& ASSA::Reactor::operator= ( const Reactor ) [private]

no cloning

bool Reactor::registerIOHandler ( EventHandler eh_,
handler_t  fd_,
EventType  et_ = RWE_EVENTS 
)

Register I/O Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_Pointer to the EventHandler
fd_File descriptor
et_Event Type
Returns:
true if success, false if error

Definition at line 93 of file Reactor.cpp.

References ASSA::ASSAERR, Assure_return, DL, ASSA::MaskSet::dump(), ASSA::ends(), ASSA::EventHandler::get_id(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd_plus1, m_readSet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), and ASSA::RemoteLogger::log_open().

{
    trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);

    std::ostringstream msg;
    Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));

    if (isReadEvent (et_)) 
    {
        if (!m_waitSet.m_rset.setFd (fd_)) 
        {
            DL((ASSAERR,"readset: fd %d out of range\n", fd_));
            return (false);
        }
        m_readSet[fd_] = eh_;
        msg << "READ_EVENT";
    }

    if (isWriteEvent (et_)) 
    {
        if (!m_waitSet.m_wset.setFd (fd_)) 
        {
            DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
            return (false);
        }
        m_writeSet[fd_] = eh_;
        msg << " WRITE_EVENT";
    }

    if (isExceptEvent (et_)) 
    {
        if (!m_waitSet.m_eset.setFd (fd_)) 
        {
            DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
            return (false);
        }
        m_exceptSet[fd_] = eh_;
        msg << " EXCEPT_EVENT";
    }
    msg << std::ends;

    DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 
        eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));

#if !defined (WIN32)
    if (m_maxfd_plus1 < fd_+1) {
        m_maxfd_plus1 = fd_+1;
        DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
    }
#endif

    DL((REACT,"Modified waitSet:\n"));
    m_waitSet.dump ();

    return (true);
}
TimerId Reactor::registerTimerHandler ( EventHandler eh_,
const TimeVal tv_,
const std::string &  name_ = "<unknown>" 
)

Register Timer Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_Pointer to the EventHandler
tv_Timeout value
name_Name of the timer
Returns:
Timer ID that can be used to cancel timer and find out its name.

Definition at line 67 of file Reactor.cpp.

References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.

{
    trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
    Assure_return (eh_);

    TimeVal now (TimeVal::gettimeofday());
    TimeVal t (now + timeout_);

    DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",  
        timeout_.sec(),timeout_.msec()));
    DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
    DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));

    TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);

    DL((REACT,"---Modified Timer Queue----\n"));
    m_tqueue.dump();
    DL((REACT,"---------------------------\n"));

    return (tid);
}
bool Reactor::removeHandler ( EventHandler eh_,
EventType  et_ = ALL_EVENTS 
)

Remove Event handler from reactor for either all I/O events or timeout event or both.

Remove handler from all events that matches event_.

If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.

Parameters:
eh_Pointer to the EventHandler
et_Event Type to remove. Default will remove Event Handler for all events.
Returns:
true if success, false if wasn't registered for any events.

Definition at line 173 of file Reactor.cpp.

References adjust_maxfdp1(), ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::RemoteLogger::log_close(), and stopReactor().

{
    trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);

    bool ret = false;
    handler_t fd;
    Fd2Eh_Map_Iter iter;

    if (eh_ == NULL) {
        return false;
    }

    if (isTimeoutEvent (event_)) {
        ret = m_tqueue.remove (eh_);
        ret = true;
    }

    if (isReadEvent (event_)) {
        iter = m_readSet.begin ();
        while (iter != m_readSet.end ()) {
            if ((*iter).second == eh_) {
                fd = (*iter).first;
                m_readSet.erase (iter);
                m_waitSet.m_rset.clear (fd);
                ret = true;
                break;
            }
            iter++;
        }
    } 
    
    if (isWriteEvent (event_)) {
        iter = m_writeSet.begin ();
        while (iter != m_writeSet.end ()) {
            if ((*iter).second == eh_) {
                fd = (*iter).first;
                m_writeSet.erase (iter);
                m_waitSet.m_wset.clear (fd);
                ret = true;
                break;
            }
            iter++;
        }
    }

    if (isExceptEvent (event_)) {
        iter = m_exceptSet.begin ();
        while (iter != m_exceptSet.end ()) {
            if ((*iter).second == eh_) {
                fd = (*iter).first;
                m_exceptSet.erase (iter);
                m_waitSet.m_eset.clear (fd);
                ret = true;
                break;
            }
            iter++;
        }
    }

    if (ret == true) {
        DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
        eh_->handle_close (fd);
    }

    adjust_maxfdp1 (fd);

    DL((REACT,"Modifies waitSet:\n"));
    m_waitSet.dump ();

    return (ret);
}
bool Reactor::removeIOHandler ( handler_t  fd_)

Remove IO Event handler from reactor.

This will remove handler from receiving all I/O events.

Parameters:
fd_File descriptor
Returns:
true on success, false if fd_ is out of range

We clear m_readySet mask here as well, because if we don't, it will be erroneously used by isAnyReady() before select().

Definition at line 247 of file Reactor.cpp.

References adjust_maxfdp1(), Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::is_valid_handler(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by checkFDs(), and dispatchHandler().

{
    trace_with_mask("Reactor::removeIOHandler",REACTTRACE);

    bool ret = false;
    EventHandler*  ehp = NULL;
    Fd2Eh_Map_Iter iter;

    Assure_return (ASSA::is_valid_handler (fd_));

    DL((REACT,"Removing handler for fd=%d\n",fd_));

    if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 
    {
        ehp = (*iter).second;
        m_readSet.erase (iter);
        m_waitSet.m_rset.clear (fd_);
        m_readySet.m_rset.clear (fd_);
        if (m_readSet.size () > 0) {
            iter = m_readSet.end ();
            iter--;
        }
        ret = true;
    }

    if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 
    {
        ehp = (*iter).second;
        m_writeSet.erase (iter);
        m_waitSet.m_wset.clear (fd_);
        m_readySet.m_wset.clear (fd_);
        if (m_writeSet.size () > 0) {
            iter = m_writeSet.end ();
            iter--;
        }
        ret = true;
    }

    if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 
    {
        ehp = (*iter).second;
        m_exceptSet.erase (iter);
        m_waitSet.m_eset.clear (fd_);
        m_readySet.m_eset.clear (fd_);
        if (m_exceptSet.size () > 0) {
            iter = m_exceptSet.end ();
            iter--;
        }
        ret = true;
    }

    if (ret == true && ehp != NULL) {
        DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
        ehp->handle_close (fd_);
    }

    adjust_maxfdp1 (fd_);

    DL((REACT,"Modifies waitSet:\n"));
    m_waitSet.dump ();

    return (ret);
}
bool Reactor::removeTimerHandler ( TimerId  id_)

Remove Timer event from the queue.

This removes particular event.

Parameters:
id_Timer Id returned by registerTimer.
Returns:
true if timer found and removed; false otherwise

Definition at line 152 of file Reactor.cpp.

References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

{
    trace_with_mask("Reactor::removeTimer",REACTTRACE);
    bool ret;

    if ((ret = m_tqueue.remove (tid_))) {
        DL((REACT,"---Modified Timer Queue----\n"));
        m_tqueue.dump();
        DL((REACT,"---------------------------\n"));
    }
    else {
        EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
    }
    return (ret);
}
void Reactor::stopReactor ( void  )

Stop Reactor's activity.

This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.

Definition at line 667 of file Reactor.cpp.

References m_active, m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.

{ 
    trace_with_mask("Reactor::stopReactor", REACTTRACE);

    m_active = false; 

    Fd2Eh_Map_Iter iter;
    EventHandler* ehp;

    while (m_readSet.size () > 0) {
        iter = m_readSet.begin ();
        ehp = (*iter).second;
        removeHandler (ehp);
    }

    while (m_writeSet.size () > 0) {
        iter = m_writeSet.begin ();
        ehp = (*iter).second;
        removeHandler (ehp);
    }

    while (m_exceptSet.size () > 0) {
        iter = m_exceptSet.begin ();
        ehp = (*iter).second;
        removeHandler (ehp);
    }
}
void Reactor::waitForEvents ( void  )

Main waiting loop that blocks indefinitely processing events.

Block forever version.

Definition at line 470 of file Reactor.cpp.

References m_active.

{
    while ( m_active ) {
        waitForEvents ((TimeVal*) NULL);
    }
}
void Reactor::waitForEvents ( TimeVal tv_)

Wait for events for time specified.

===================================================================== | select() | errno | Events | Behavior | |===================================================================| | < 0 | EINTR | Interrup by signal | Retry | +----------+-------+---------------------+--------------------------+ | < 0 | EBADF | Bad file descriptor | Remove bad fds and retry | | | | | and retry | +----------+-------+---------------------+--------------------------+ | < 0 | others| Some other error | Fall through | +----------+-------+---------------------+--------------------------+ | == 0 | 0 | Timed out | Fall through | +----------+-------+---------------------+--------------------------+ | > 0 | 0 | Got some work to do | Fall through | +-------------------------------------------------------------------+

Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.

Parameters:
tv_[RW] is time to wait for.

Definition at line 495 of file Reactor.cpp.

References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.

{
    trace_with_mask("Reactor::waitForEvents",REACTTRACE);

    TimerCountdown traceTime (tv_);
    DL((REACT,"======================================\n"));

    /*--- Expire all stale Timers ---*/
    m_tqueue.expire (TimeVal::gettimeofday ());

    /* Test to see if Reactor has been deactivated as a result
     * of processing done by any TimerHandlers.
     */
    if (!m_active) {
        return;
    }

    int      nReady;
    TimeVal  delay;
    TimeVal* dlp = &delay;

    /*---
      In case if not all data have been processed by the EventHandler,
      and EventHandler stated so in its callback's return value
      to dispatcher (), it will be called again. This way 
      underlying file/socket stream can efficiently utilize its
      buffering mechaninsm.
      ---*/
    if ((nReady = isAnyReady ())) {
        DL((REACT,"isAnyReady returned: %d\n",nReady));
        dispatch (nReady);
        return;
    }

    DL((REACT,"=== m_waitSet ===\n"));
    m_waitSet.dump ();

    do {
        m_readySet.reset ();
        DL ((REACT,"m_readySet after reset():\n"));
        m_readySet.dump ();

        m_readySet = m_waitSet;
        DL ((REACT,"m_readySet after assign:\n"));
        m_readySet.dump ();

        calculateTimeout (dlp, tv_);

        nReady = ::select (m_maxfd_plus1, 
                           &m_readySet.m_rset,
                           &m_readySet.m_wset, 
                           &m_readySet.m_eset, 
                           dlp);
        DL((REACT,"::select() returned: %d\n",nReady));

        m_readySet.sync ();
        DL ((REACT,"m_readySet after select:\n"));
        m_readySet.dump ();

    } 
    while (nReady < 0 && handleError ());

    dispatch (nReady);
}

Member Data Documentation

bool ASSA::Reactor::m_active [private]

Flag that indicates whether Reactor is active or had been stopped.

Definition at line 209 of file Reactor.h.

Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents().

Event handlers awaiting on EXCEPT_EVENT.

Definition at line 218 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

Max number of open files per process.

This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.

Definition at line 200 of file Reactor.h.

Referenced by checkFDs(), and Reactor().

Max file descriptor number (in all sets) plus 1.

This value is ignored by WIN32 implementation of select()

Definition at line 206 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), and waitForEvents().

Event handlers awaiting on READ_EVENT.

Definition at line 212 of file Reactor.h.

Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

Handlers that are ready for processing.

Definition at line 224 of file Reactor.h.

Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().

The queue of Timers.

Definition at line 227 of file Reactor.h.

Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().

Handlers to wait for event on.

Definition at line 221 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().

Event handlers awaiting on WRITE_EVENT.

Definition at line 215 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines