void sendAlert::alarmHandler(int num) // The alarm signal handler
{
cout << "I caught an SIGALRM!" << endl;
// Jump to the location in the thread set by sigsetjmp
siglongjmp(threadEnv, num);
}
void *sendAlert::processMessageQueueThread(void *not_used)
{
// Local queue to hold error messages transferred by problemMessageList
RWTPtrSlist<problemMessage> localProblemMessageList;
sigset_t threadSignalSet; // Threads signal set
int size; // Return value from sendto and recvfrom
int savemask = 1; // Value used by sigsetjmp
int returned_from_longjmp; // Return value of sigsetjmp
int clilen; // Size of cliAddr
clilen = sizeof(cliAddr);
sigemptyset(&threadSignalSet); // Empty the threads signal set
sigaddset(&threadSignalSet, SIGALRM); // Add SIGALRM to signal set
for(;;) // Start the threads infinite loop
{
sleep(1); // Execute the loop every 1 second
pthread_mutex_lock(&queueLock); // Lock access to problemMessageList
// If there have been error messages added to the queue
// transfer them to the local queue
if (!sendAlert::problemMessageList.isEmpty())
{
RWTPtrSlistIterator<probleMessage>
problemMessageListIterator(sendAlert::problemMessageList);
while(problemMessageListIterator())
{
problemMessage *aMessage = problemMesageListIterator.key();
localProblemMessageList.insert(new problemMessage(*aMessage));
}
// Remove all entries within the problemMessageList
sendAlert::problemMessageList.clear();
}
pthread_mutex_unlock(&queueLock); // Unlock the queue
// If the local message queue is empty there is no need to continue
if (localProblemMessageList.isEmpty()) continue;
// Unblock the SIGALRM signal so the thread can receive it
pthread_sigmask(SIG_UNBLOCK, &threadSignalSet, NULL);
RWTPtrSlistIterator<problemMessage>
localProblemMessageListIterator(localProblemMessageList);
// Iterate over the error message queue.
while(localProblemMessageListIterator())
{
problemMessage *aMessage = loclProblemMessageListIterator.key();
aMessage->decrementCountDown();
// If the message has expired remove it from the queue
if (aMessage->getCountDown == 0)
{
localProblemMessageList.remove(aMessage);
continue;
}
// Set the socket to broadcast mode
setsockopt(sendAlert::sockfd, SOL_SOCKET, SO_BROADCAST,
(const char *)aMessage, sizeof(problemMessage));
// Send the error message over the socket
if ((size = sendto(sendAlert::sockfd, (char *)aMessage,
sizeof(problemMessage),0,(struct socaddr *)&servAddr,
sizeof(servAddr))) != sizeof(problemMessage))
{
cerr << "Error Sending Error Message to Socket."
"Size of Message Sent: " << size << " Correct Size of Message: "
<< sizeof(problemMessage) << endl;
continue;
}
// Set up an environment to return to if SIGALRM signal
// is caught before acknowlegement is received from the server
if ((returned_from_longjmp=sigsetjmp(threadEnv, savemask))!= 0)
{
cerr << "Alarm went off before acknowlegement was"
" received" << endl;
continue;
}
// Setup the signal handler to receive SIGALRM signals
signal(SIGALRM, sendAlert::alarmHandler);
alarm(3); // Set an alarm to go off after 3 seconds
// Receive the acknowlegement from the server
size = recvfrom(sendAlert::sockfd, (char *)acknowledgment,
sizeof(acknowledgment), 0,
(struct sockaddr *)&sendAlert::cliAddr, &clilen);
alarm(0); // Turn the alarm off if the ack was received
if (size < 0)
{
cout << "Error receiving response from server:" << endl;
continue;
}
// If the acknowledgment matches the message sent then
// remove the message from the queue
if (acknowlegement == aMessage->getMsgId())
localProblemMessageList.remove(aMessage);
}
}
return 1; // This should never be reached but is need by the
// compiler since threads return a value
}
//End of File