//Editor-Info: -*- C++ -*-
//
//Subject: Scheduler Framework
//
//File: fifoscheduler.cpp
//
//Version: $Revision: 1.10 $
//
//State: $State: Exp $
//
//Date: $Date: 1998/10/06 07:55:50 $
//
//Organisation:
//      Helsinki University of Technology
//      Laboratory of Telecommunications Software and Multimedia
//
//Author:
//      Juhana Räsänen
//
//Description:
//      See corresponding header file
//
//Copyright:
//      Copyright 1999 Helsinki University of Technology
//      ALL RIGHTS RESERVED BETWEEN JANUARY 1996 AND JUNE 1999.
//
//Licence:
//
//
//History: 
//

#include <sys/time.h>
#include "fifoscheduler.h"
#include "schedulerhandle.h"
#include "task.h"

#ifdef ORB_USED
CORBA_ORB_var sfFIFOScheduler :: _orb = 0;
CORBA_BOA_var sfFIFOScheduler :: _boa = 0;
#endif
sfFIFOScheduler *sfFIFOScheduler :: _only = 0;

sfFIFOScheduler * sfFIFOScheduler::instance()
{
    if(_only == 0)
    {
        _only = new sfFIFOScheduler();
    }
    return _only;
}

//
// Method: constructor
//
// Description:
//     Initialises the class variables and calls base class constructor.
//     The schedulable task list is a circular double-linked list, for
//     which a marker node is created and set to point to itself.
//

sfFIFOScheduler :: sfFIFOScheduler(int taskCount_, int timeoutCount_)
    : sfScheduler(),
      _taskCount(taskCount_),
      _timeoutCount(timeoutCount_),
      _localTime(),
      _fifoQueue(0),
      _timeoutList(),
#ifndef ORB_USED
      _readList(),
      _writeList(),
      _selectFlag(0)
#else
      _reactor(OBReactor::instance())
#endif
{
#ifndef ORB_USED
    FD_ZERO(&_read_fds);
    FD_ZERO(&_write_fds);
#endif

    // Create empty runnable queue by setting _fifoQueue point to a
    // dummy marker handle. The handle _next_ to the marker is the
    // first one, the handle _previous_ to dummy is the last one,
    // ie. the queue is a double linked circular list.
    _fifoQueue = new FIFOHandle;
    _fifoQueue->_nextTask = _fifoQueue;
    _fifoQueue->_prevTask = _fifoQueue;

    return;
}

//
// Method: destructor
//
// Description:
//     Frees the runnable queue and the marker node.
//

sfFIFOScheduler :: ~sfFIFOScheduler(void)
{
    while (fifoQueueEmpty() == 0)
    {
        FIFOHandle *handle = _fifoQueue->_nextTask->unlink();
        delete handle;
    }
    delete _fifoQueue;
    return;
}


//
// Method: step
//
// Description:
//     The scheduler main loop body (the loop itself is in the base class
//     run() method). Examines the scheduler queues and executes tasks
//     needing service, or blocks to wait for next timeout or I/O event
//     if no tasks are available for running.
//     NOTE 1: The sfScheduler run()/step() semantics gets slightly
//             broken, since FIFO scheduler may remain arbitrarily
//             long time inside step(), if there are only CPU tasks
//             scheduled and no timeouts or file operations. This is
//             intentional for performance reasons.
//     NOTE 2: FIFO semantics is not complete; a task will be linked
//             only once to the runnable queue, and if it has requested
//             CPU time several times, it will be moved to the end of
//             the runnable queue after one execution of the task, not
//             to the point at which it had re-requested CPU.
//

void sfFIFOScheduler :: step(void)
{
#ifndef ORB_USED
    int hfd = create_fd_sets();
#endif

    if ((fifoQueueEmpty() != 0) && (_timeoutList.empty() != 0))
    {
#ifndef ORB_USED
        // If there are no tasks in the FIFO queue and no pending timeouts,
        // we can block on the file descriptors registered to the scheduler
        if (hfd == -1)
        {
            // If there were no fds either, there is no way anything can
            // proceed anymore, so an exception is thrown
            sfException::throwSchedulerEmptyException();
        }
        // Block on the fds without timeout...
        select(hfd + 1, &_read_fds, &_write_fds, 0, 0);
        handle_fd_sets();
#else
        // Block on the fds without timeout by calling ORB dispatch
        // with negative timeout if there are no pending timeouts or
        // scheduled tasks
        _reactor->dispatchOneEvent(-1);
#endif
    }
#ifndef ORB_USED
    else if ((hfd == -1) && (_timeoutList.empty() != 0))
    {
        // If there are tasks only in the run queue, we can set _selectFlag
        // to 0 and loop the run queue until a timeout or a file descriptor
        // is added.
        _selectFlag = 0;
    }
    else
    {
        // Otherwise FDs and/or timeouts must be handled
        OTime nextTimeout = 0;
        synchronize(); // Sync just before the time comparison

        // If the run queue is empty AND next timeout has not passed
        // yet, we can block until next timeout, otherwise the fds
        // should just be polled (polled when nextTimeout = 0).
        if ((fifoQueueEmpty() != 0) &&
            (_timeoutList.back()->_timeout > _localTime))
        {
            nextTimeout = _timeoutList.back()->_timeout - _localTime;
        }

        // If there were files to examine OR next timeout has not
        // passed yet, select must be called.
        if ((hfd > -1) || (nextTimeout != 0))
        {
            int events =
                select(hfd + 1, &_read_fds, &_write_fds, 0, &nextTimeout);
            if (events > 0)
            {
                handle_fd_sets();
            }
            synchronize(); // In case select had blocked for timeout...
        }

        // Loop through the timeout queue executing the occured timeouts,
        // at max _timeoutCount timeouts at one pass to prevent tasks
        // with very short timeouts and/or long execution times to starve
        // the FIFO tasks.
        int timeoutCount = _timeoutCount;
        while ((_timeoutList.empty() == 0) &&
               (_timeoutList.back()->_timeout <= _localTime) &&
               (timeoutCount > 0))
        {
            FIFOHandle *expired = _timeoutList.back();
            _timeoutList.pop_back();
            expired->_pendingTimeouts--;
            expired->_task->timeoutCallback();
            synchronize();
            timeoutCount--;
        }
    }
#else
    else
    {
        // Otherwise FIFO tasks and/or timeouts must be handled
        OTime nextTimeout(0);
        synchronize(); // Sync just before the time comparison

        // If the run queue is empty AND next timeout has not passed
        // yet, we can block until next timeout, otherwise the fds
        // should just be polled
        if ((fifoQueueEmpty() != 0) &&
            (_timeoutList.back()->_timeout > _localTime))
        {
            nextTimeout = _timeoutList.back()->_timeout - _localTime;
            CORBA_Long timeout = nextTimeout.getSeconds() * 1000 +
                                 nextTimeout.getMicros() / 1000;
            _reactor->dispatchOneEvent(timeout);
        }
        else
        {
            // Call ORB reactor with zero timeout to poll the fds
            _reactor->dispatchOneEvent(0);
        }

        // Loop through the timeout queue executing the occured timeouts,
        // at max _timeoutCount timeouts at one pass to prevent tasks
        // with very short timeouts and/or long execution times to starve
        // the FIFO tasks.
        int timeoutCount = _timeoutCount;
        while ((_timeoutList.empty() == 0) &&
               (_timeoutList.back()->_timeout <= _localTime) &&
               (timeoutCount > 0))
        {
            FIFOHandle *expired = _timeoutList.back();
            _timeoutList.pop_back();
            expired->_pendingTimeouts--;
            expired->_task->timeoutCallback();
            synchronize();
            timeoutCount--;
        }

        // Loop through the run queue, executing at most _taskCount tasks
        int taskCount = _taskCount;
        while ((taskCount > 0) && (fifoQueueEmpty() == 0))
        {
            // Get and unlink the first handle in the run queue
            FIFOHandle *handle = _fifoQueue->_nextTask->unlink();

            // If the task had requested CPU more than once, it must be
            // relinked to the run queue *before* it is executed to keep
            // the queue consistent in case the task reschedules itself
            handle->_pendingCPUReqs--;
            if (handle->_pendingCPUReqs > 0) 
            {
                _fifoQueue->link(handle); 
            }

            // Finally execute the task
            handle->_task->runCallback();
            taskCount--;
        }       
    }
#endif
 
#ifndef ORB_USED
    // Loop through the run queue forever, or executing at most
    // _taskCount tasks if _selectFlag is or becomes nonzero
    // (timeouts and/or file operations pending).
    int taskCount = _taskCount;
    while (((_selectFlag == 0) || (taskCount > 0)) &&
           (fifoQueueEmpty() == 0))
    {
        // Get and unlink the first handle in the run queue
        FIFOHandle *handle = _fifoQueue->_nextTask->unlink();

        // If the task had requested CPU more than once, it must be
        // relinked to the run queue *before* it is executed to keep
        // the queue consistent in case the task reschedules itself
        handle->_pendingCPUReqs--;
        if (handle->_pendingCPUReqs > 0)
        {
           _fifoQueue->link(handle);
        }

        // Finally execute the task
        handle->_task->runCallback();
        
        // Reduce task counter only if there are file operations
        // or timeouts to be checked
        if (_selectFlag > 0)
        {
            taskCount--;
        }
    }
#endif
    return;
}


//
// Method: makeHandle
//
// Description:
//     A factory method for creating a new FIFO scheduler handle for
//     the given task.
//

sfSchedulerHandle * sfFIFOScheduler :: makeHandle(sfTask *task_)
{
    FIFOHandle *handle = new FIFOHandle(this, task_);
    return handle;
}


//
// Method: synchronize
//
// Description:
//     Synchronises the scheduler local time to system time.
//

void sfFIFOScheduler :: synchronize(void)
{
    gettimeofday(&_localTime, 0);
    return;
}


#ifndef ORB_USED
//
// Method: create_fd_sets
//
// Description:
//     Creates file descriptor sets for select() by going through the
//     read and write queues.
//

int sfFIFOScheduler :: create_fd_sets(void)
{
    int highest_fd = -1;
    FD_ZERO(&_read_fds);
    FD_ZERO(&_write_fds);

    taskHandleIterator iter = _readList.begin();
    while (iter != _readList.end())
    {
        FD_SET((*iter)->_readFD, &_read_fds);
        if ((*iter)->_readFD > highest_fd)
        {
            highest_fd = (*iter)->_readFD;
        }
        iter++;
    }

    iter = _writeList.begin();
    while (iter != _writeList.end())
    {
        FD_SET((*iter)->_writeFD, &_write_fds);
        if ((*iter)->_writeFD > highest_fd)
        {
            highest_fd = (*iter)->_writeFD;
        }
        iter++;
    }

    return highest_fd;
}


//
// Method: handle_fd_sets
//
// Description:
//     Goes through the file descriptor sets after select() returned
//     and executes the tasks whose fd became available for reading
//     or writing.
//

void sfFIFOScheduler :: handle_fd_sets(void)
{
    taskHandleIterator iter = _readList.begin();
    while (iter != _readList.end())
    {
        if (FD_ISSET((*iter)->_readFD, &_read_fds) != 0)
        {
            taskHandleIterator removed(iter);
            iter++;
            (*removed)->_pendingReads--;
            (*removed)->_task->readCallback();
            _readList.erase(removed);
        }
        else
        {
            iter++;
        }
    }
    
    iter = _writeList.begin();
    while (iter != _writeList.end())
    {
        if (FD_ISSET((*iter)->_writeFD, &_write_fds) != 0)
        {
            taskHandleIterator removed(iter);
            iter++;
            (*removed)->_pendingWrites--;
            (*removed)->_task->writeCallback();
            _writeList.erase(removed);
        }
        else
        {
            iter++;
        }
    }
    return;
}
#endif


//
// Method: fifoQueueEmpty
//
// Description:
//     Returns nonzero, if there are no tasks in the run queue.
//

int sfFIFOScheduler :: fifoQueueEmpty(void) const
{
    int empty = 0;
    if (_fifoQueue == _fifoQueue->_nextTask)
    {
        empty = 1;
    }
    return empty;
}

#ifdef ORB_USED

//
// Method: initORB
//
// Description:
//    Initialize ORB
//

void sfFIFOScheduler :: initORB(int argc_, char* argv_[])
{
    assert(_only != 0);

    try
    {
        // Create ORB and BOA
        _orb = CORBA_ORB_init(argc_, argv_);
        _boa = _orb -> BOA_init(argc_, argv_);
        _boa->init_servers();
    }
#ifdef __GNUG__
    catch(CORBA_COMM_FAILURE& ex)
#else
    catch(CORBA_SystemException& ex)
#endif
    {
        OBPrintException(ex);
        exit(1);
    }
    return;
}

//
// Method: getORB
//
// Description:
//    Get pointer to ORB
//

CORBA_ORB_ptr sfFIFOScheduler :: getORB()
{
    assert(_only != 0);

    return _orb;
}

//
// Method: getBOA
//
// Description:
//    Get pointer to BOA
//

CORBA_BOA_ptr sfFIFOScheduler :: getBOA()
{
    assert(_only != 0);

    return _boa;
}

#endif

//-------------------------------------------------------------------------


//
// Methods: constructors
//
// Description:
//     Set the scheduler and call base class constructor
//

sfFIFOScheduler::FIFOHandle :: FIFOHandle(void)
    : sfSchedulerHandle(0),
      _scheduler(0),
      _pendingCPUReqs(0),
      _pendingTimeouts(0),
      _pendingReads(0),
      _pendingWrites(0),
      _timeout(0),
      _readFD(0),
      _writeFD(0),
      _prevTask(0),
      _nextTask(0)
{
    return;
}

sfFIFOScheduler::FIFOHandle ::
FIFOHandle(sfFIFOScheduler *scheduler_, sfTask *task_)
    : sfSchedulerHandle(task_),
      _scheduler(scheduler_),
      _pendingCPUReqs(0),
      _pendingTimeouts(0),
      _pendingReads(0),
      _pendingWrites(0),
      _timeout(0),
      _readFD(0),
      _writeFD(0),
      _prevTask(0),
      _nextTask(0)
{
    if ((scheduler_ == 0) || (task_ == 0))
    {
        sfException::throwInvalidHandleException();
    }
    return;
}

//
// Method: destructor
//
// Description:
//     Remove this handle from all scheduler queues
//

sfFIFOScheduler::FIFOHandle :: ~FIFOHandle(void)
{
    removeTask();
    return;
}


//
// Method: scheduleTask
//
// Description:
//     Adds the handle to the scheduler's run queue.
//

void sfFIFOScheduler::FIFOHandle :: scheduleTask(void)
{
    // Link handle to runnable list *only* if it isn't there already.
    // The linked list pointers are handle members, so they can be
    // used only once. If the handle was already in the queue, the
    // pending requests counter is incremented and the handle will
    // get relinked in the scheduler after execution.
    if (_pendingCPUReqs == 0)
    {
        _scheduler->_fifoQueue->link(this);
    }
    _pendingCPUReqs++;
    return;
}


//
// Methods: scheduleTimeout
//
// Description:
//     Adds a task to the timeout queue. Timeouts are stored as calendar
//     time, so local time is added to the given timeout value.
//

void sfFIFOScheduler::FIFOHandle :: scheduleTimeout(const OTime &timeout_)
{
    // ++TODO++ can somebody explain me this?
    if (_pendingTimeouts >= MAX_TIMEOUTS)
    {
        sfException::throwInvalidRequestException();
    }
    gettimeofday(&_timeout, 0);
    _timeout = _timeout + timeout_;
    taskHandleIterator iter = _scheduler->_timeoutList.begin();
    while ((iter != _scheduler->_timeoutList.end()) &&
           (_timeout < (*iter)->_timeout))
    {
        iter++;
    }
    _scheduler->_timeoutList.insert(iter, this);
    _scheduler->_selectFlag = 1;
    _pendingTimeouts++;
    return;
}


//
// Method: scheduleAbsoluteTimeout
//
// Description:
//     Adds a task to the timeout queue. Timeout is given as calendar time.
//

void sfFIFOScheduler::FIFOHandle ::
scheduleAbsoluteTimeout(const OTime &timeout_)
{
    if (_pendingTimeouts >= MAX_TIMEOUTS)
    {
        sfException::throwInvalidRequestException();
    }
    _timeout = timeout_;
    taskHandleIterator iter = _scheduler->_timeoutList.begin();
    while ((iter != _scheduler->_timeoutList.end()) &&
           (_timeout < (*iter)->_timeout))
    {
        iter++;
    }
    _scheduler->_timeoutList.insert(iter, this);
    _scheduler->_selectFlag = 1;
    _pendingTimeouts++;
    return;
}


//
// Method: scheduleRead
//
// Description:
//     Adds a task to the read queue so that the given file will be
//     polled in the next schedule step until it becomes available
//     for reading.
//
//     Registers a task to the OB Reactor so that the given file will be
//     polled in the next schedule step until it becomes available for
//     reading.
//

void sfFIFOScheduler::FIFOHandle ::
scheduleRead(int fileDescriptor_, const OTime &)
{
    if (_pendingReads >= MAX_READS)
    {
        sfException::throwInvalidRequestException();
    }
#ifndef ORB_USED
    _readFD = fileDescriptor_;

    // ++DIRTY TRICK++ New handlers are put in *front* of the list
    // so that handle_fd_sets doesn't get confused, if a handler
    // re-schedules itself from inside read/writeCallbacks. This
    // is because handle_fd_sets proceeds from the beginning to the
    // end of the list, and will errorneously call the callback
    // method if the handler is appended at the end of the list.
    _scheduler->_readList.push_front(this);
    _scheduler->_selectFlag = 1;
    _pendingReads++;
    // ++TODO++ Handle read/write timeouts
#else
    OBMask mask = OBEventRead;
    if (_pendingWrites > 0)
    {
        mask |= OBEventWrite;
    }
    _readFD = fileDescriptor_;
    _scheduler->_reactor->registerHandler(this, mask, fileDescriptor_);
    _pendingReads++;

    // ++TODO++ Handle read/write timeouts
#endif
    return;
}

//
// Method: scheduleWrite
//
// Description:
//     Adds a task to the write queue so that the given file will be
//     polled in the next schedule step until it becomes availble for
//     writing.
//
//     Registers a task to the OB Reactor so that the given file will be
//     polled in the next schedule step until it becomes available for
//     writing.
//

void sfFIFOScheduler::FIFOHandle ::
scheduleWrite(int fileDescriptor_, const OTime &)
{
    if (_pendingWrites >= MAX_WRITES)
    {
        sfException::throwInvalidRequestException();
    }
#ifndef ORB_USED
    _writeFD = fileDescriptor_;

    // ++DIRTY TRICK++ New handlers are put in *front* of the list
    // so that handle_fd_sets doesn't get confused, if a handler
    // re-schedules itself from inside read/writeCallbacks. This
    // is because handle_fd_sets proceeds from the beginning to the
    // end of the list, and will errorneously call the callback
    // method if the handler is appended at the end of the list.
    _scheduler->_writeList.push_front(this);
    _scheduler->_selectFlag = 1;
    _pendingWrites++;
#else
    // ++TODO++ Handle read/write timeouts
    OBMask mask = OBEventWrite;
    if (_pendingReads > 0)
    {
        mask |= OBEventRead;
    }
    _writeFD = fileDescriptor_;
    _scheduler->_reactor->registerHandler(this, mask, fileDescriptor_);
    _pendingWrites++;

    // ++TODO++ Handle read/write timeouts
#endif
    return;
}

//
// Method: removeTask
//
// Description:
//     Removes the given task from all scheduler queues.
//

void sfFIFOScheduler::FIFOHandle :: removeTask(void)
{
    unscheduleTask();
    unscheduleTimeout();
    unscheduleRead();
    unscheduleWrite();
    return;
}


//
// Method: unscheduleTask
//
// Description:
//     Removes given task from the run queue.
//

void sfFIFOScheduler::FIFOHandle :: unscheduleTask(void)
{
    if (_pendingCPUReqs > 0)
    {
        FIFOHandle *handle = _scheduler->_fifoQueue->_nextTask;
        while (handle != _scheduler->_fifoQueue)
        {
            if (handle == this)
            {
                handle->unlink();
                break; // There can be only one instance of the handle
                       // in the runnable queue
            }
            handle = handle->_nextTask;
        }
    }
    _pendingCPUReqs = 0;
    return;
}


//
// Method: unscheduleTimeout
//
// Description:
//     Removes all instances of the task from the timeout queue.
//

void sfFIFOScheduler::FIFOHandle :: unscheduleTimeout(void)
{
    _scheduler->_timeoutList.remove(this);
    _pendingTimeouts = 0;
    return;
}


//
// Method: unscheduleReadHandler
//
// Description:
//     Removes given task from the read queue.
//
//     Unregisters given task from the OB Reactor.
//

void sfFIFOScheduler::FIFOHandle :: unscheduleRead(void)
{
#ifndef ORB_USED
    _scheduler->_readList.remove(this);
#else
    if (_pendingReads > 0)
    {
        if (_pendingWrites > 0)
        {
            _scheduler->_reactor->
                registerHandler(this, OBEventWrite, _writeFD);
        }
        else
        {
            _scheduler->_reactor->unregisterHandler(this);
        }
    }
#endif
    _pendingReads = 0;
    return;
}


//
// Method: unscheduleWriteHandler
//
// Description:
//     Removes given task from the write queue.
//
//     Unregisters given task from the OB Reactor.
//

void sfFIFOScheduler::FIFOHandle :: unscheduleWrite(void)
{
#ifndef ORB_USED
    _scheduler->_writeList.remove(this);
#else
    if (_pendingWrites > 0)
    {
        if (_pendingReads > 0)
        {
            _scheduler->_reactor->registerHandler(this, OBEventRead, _readFD);
        }
        else
        {
            _scheduler->_reactor->unregisterHandler(this);
        }
    }
    _pendingWrites = 0;
#endif
    return;
}

#ifdef ORB_USED
//
// Method: handleEvent
//
// Description:
//     OmniBroker OBEventHandler callback method. Maps the event
//     to SF event callback.
//

void sfFIFOScheduler::FIFOHandle :: handleEvent(OBMask mask_)
{
    // Handler must be unregistered from the reactor, because SF
    // handlers are scheduled once per event (by calling requestRead
    // or requestWrite) but OBReactor registers persistent handlers.
    // The unregistration must be done before callbacks are called
    // in case the task reschedules a read or a write.

    if (mask_ & OBEventRead)
    {
        _pendingReads--;
        if (_pendingWrites == 0)
        {
            _scheduler->_reactor->unregisterHandler(this);
        }
        else
        {
            _scheduler->_reactor->
                registerHandler(this, OBEventWrite, _writeFD);
        }
        _task->readCallback();
    }
    if (mask_ & OBEventWrite)
    {
        _pendingWrites--;
        if (_pendingReads == 0)
        {
            _scheduler->_reactor->unregisterHandler(this);
        }
        else
        {
            _scheduler->_reactor->registerHandler(this, OBEventRead, _writeFD);
        }
        _task->writeCallback();
    }
    return;
}

//
// Method: handleStop
//
// Description:
//     OmniBroker OBEventHandler stop method.
//

void sfFIFOScheduler::FIFOHandle :: handleStop(void)
{
    return;
}
#endif

//
// Method: link
//
// Description:
//     Links the given handle to queue, previous to this
//

void sfFIFOScheduler::FIFOHandle ::
link(sfFIFOScheduler::FIFOHandle *handle_)
{
    if (_prevTask != 0)
    {
        _prevTask->_nextTask = handle_;
    }
    handle_->_prevTask = _prevTask;
    handle_->_nextTask = this;
    _prevTask = handle_;
    return;
}


//
// Method: unlink
//
// Description:
//     Unlinks the handle from a queue
//

sfFIFOScheduler::FIFOHandle * sfFIFOScheduler::FIFOHandle :: unlink(void)
{
    if (_prevTask != 0)
    {
        _prevTask->_nextTask = _nextTask;
    }
    if (_nextTask != 0)
    {
        _nextTask->_prevTask = _prevTask;
    }
    _prevTask = 0;
    _nextTask = 0;
    return this;
}

