Listing 2: Implementing an active data type

// File: ActiveQueue.h

#if !defined(__ActiveThread_H)
#define __ActiveThread_H

#if !defined(_DEQUE_)
#include <deque>
#endif

#if !defined(_QUEUE_)
#include <queue>
#endif

#if !defined(__THREAD_H)
#include "thread.h"
#endif

// Exception class
class FailedToCreateSemaphore
{
};
class FailedToCreateThread
{
};

// We inherit protected from Thread to prevent a user from calling
// create. We do this instead of changing protection on Create from
// public to private, because Borland 5.0 has a problem with
// changing protection on members in base classes.
template <class T>
class ActiveQueue : protected Thread
{
public:
    ActiveQueue(DWORD dwParam = 0,DWORD dwCreationFlags = 0);
    virtual ~ActiveQueue();
    virtual void Add(const T &data);
    // Redefine operator HANDLE so an ActiveQueue can be turned into
    // a HANDLE for syncronization, and other thread related matters
    operator HANDLE();

protected:
    virtual bool Initialize();
    virtual bool HandleData(T &data) = 0;

private:
    HANDLE hQueueEntries;
    CRITICAL_SECTION criticalSection;
    // We specify the deque as default container, because
    // Borland 5.0 has a limited support for default templates
    std::queue<T,std::deque<T> > dataQueue;
    virtual UINT ThreadFunction(DWORD);
    ActiveQueue(ActiveQueue&); //prevent copying - make ctor private
    operator =(ActiveQueue&);  //prevent assignment - ctor private
};

template <class T>
ActiveQueue<T> :: ActiveQueue(DWORD dwParam,DWORD dwCreationFlags)
{
    hQueueEntries = CreateSemaphore(NULL,0,MAXLONG,NULL);
    if (hQueueEntries == NULL)
        throw FailedToCreateSemaphore();
    // Create the thread
    if (Create(dwParam,dwCreationFlags) == FALSE)
    {
        CloseHandle(hQueueEntries);
        throw FailedToCreateThread();
    }
    InitializeCriticalSection(&criticalSection);
}

template <class T>
ActiveQueue<T> :: ~ActiveQueue()
{
    CloseHandle(hQueueEntries);
    DeleteCriticalSection(&criticalSection);
}

template <class T>
void ActiveQueue<T> :: Add(const T &data)
{
    EnterCriticalSection(&criticalSection);
    dataQueue.push(data);
    LeaveCriticalSection(&criticalSection);
    ReleaseSemaphore(hQueueEntries,1,NULL);
}

template <class T>
ActiveQueue<T> :: operator HANDLE()
{
    return Thread::operator HANDLE();
}

template <class T>
bool ActiveQueue<T> :: Initialize()
{
    return true;
}

template <class T>
UINT ActiveQueue<T> :: ThreadFunction(DWORD)
{    
    T data;

    if (Initialize() == false)
        return 0;
    while (true)
    {
        WaitForSingleObject(hQueueEntries,INFINITE);
        EnterCriticalSection(&criticalSection);
        data = dataQueue.front();
        dataQueue.pop();
        LeaveCriticalSection(&criticalSection);                    
        if (HandleData(data) == false)
            break;        
    }
    return 0;
}

#endif
//End of File