In this example, a subset of the Event Service capabilities is demonstrated. Here we build a push supplier and a push consumer. The push supplier produces events and supplies them to the event channel at its own pace. When the event channel receives an event from the supplier, it supplies (pushes) the event to the push consumer.
The PushSupplier program is responsible for supplying events to the event channel. In this example, the "events" are simple numbers that have been entered by the user. In general "events" can be any data chosen by application designer. To aid in exposition, a few simplifications are made. The PushSupplier program is implemented as a client of the Event Service. It does not implement the push supplier interface, so the event channel will not be able to invoke the disconnect_push_supplier( ) method. Also, minimal error checking is performed. The NonStop DOM sample program from which this example is drawn is more conscientious about error checking. Your own applications should follow that style because undetected errors can lead to further trouble. The push supplier program starts off in the usual way for a client program. The first step is to initialize the ORB and obtain an object reference for the root-naming context:
int main(int argc, char *argv[])
{
// Initialize the ORB
CORBA::ORB_ptr orb = CORBA::ORB_init(argc, argv);
// Get root context for Naming Service
CORBAObject = orb->
resolve_initial_references("NameService");
RootNameContext =
CosNaming::NamingContext::_narrow(CORBAObject);
|
Next, the event channel is found in the Naming Service. We assume that another program
(RegisterChannel) has previously created the channel and placed it into the Naming Service.
name.length(2); name[0].id = "NSDOM-Samples"; name[0].kind = ""; name[1].id = "SampleEventChannel"; name[1].kind = "EventChannel"; CORBAObject = RootNameContext->resolve(name); // Narrow object reference to an Event Channel object EventChannel = CosEventChannelAdmin::EventChannel:: _narrow( CORBAObject ); |
With the event channel, we obtain a SupplierAdmin object:
CosEventChannelAdmin::SupplierAdmin_var SupplierAdminObj; SupplierAdminObj = EventChannel->for_suppliers(); |
The SupplierAdmin object is then used to obtain a ProxyPushConsumer:
CosEventChannelAdmin::ProxyPushConsumer_var
ProxyPushConsumer;
ProxyPushConsumer = SupplierAdminObj->
obtain_push_consumer();
|
Using the ProxyPushConsumer object, we register as a supplier of events:
CosEventComm::PushSupplier_ptr PushSupplier=NULL; ProxyPushConsumer->connect_push_supplier(PushSupplier); |
Now we are ready to supply events. The following loop will prompt for user input in the form of numbers. When the user enters a number, it is package in an any variable. The any variable is then pushed as an event. This continues until a blank line is entered.
CORBA::Long aLong;
CORBA::Any anyLong;
const unsigned int lineSize = 1024;
char buffer[lineSize];
cout << "Enter number to be pushed to event channel"
<< endl;
cout << "> " << flush;
while (cin.getline(buffer, lineSize))
{
if (strlen(buffer) == 0) break;
aLong = strtol(buffer, NULL, 10);
anyLong <<= aLong;
ProxyPushConsumer->push(anyLong);
cout << "> " << flush;
}
|
Finally, we inform the ProxyPushConsumer that we are disconnecting.
ProxyPushConsumer->disconnect_push_consumer(); return 0; } |
PushConsumer program is responsible for consuming events produced by the PushSupplier via the event channel. In this example, the "events" are numbers that have been entered by the user. The PushConsumer program simply displays the events.
The PushConsumer implements the PushConsumer interface defined in CosEventComm.idl.
interface PushConsumer {
void push (in any data) raises(Disconnected);
void disconnect_push_consumer();
};
|
The IDL compiler produces the skeleton server-side header files and implementation. The PushConsumer program must provide a concrete implementation of the methods. The first step in doing this is to provide an implementation header file.
#include <CosEventComm_server.h>
class PushConsumer_impl: public
POA_CosEventComm_PushConsumer
{
public:
PushConsumer_impl() {}
~PushConsumer_impl() {}
void push(const CORBA::Any& data,
CORBA::Environment &pr_env);
void disconnect_push_consumer(
CORBA::Environment &pr_env);
};
|
Here we start by including the implementation header file produced by the IDL compiler. Then the PushConsumer_impl class is defined. This class inherits from POA_CosEventComm_PushConsumer, which contains the base class definition for the server implementation. Inside the PushConsumer_impl class, we declare the two methods that must be implemented: push( ) and disconnect_push_consumer( ).
Next we turn our attention to the C++ implementation of the PushConsumer. The implementation for the push( ) method is shown in Example 10.
void PushConsumer_impl::push(const CORBA::Any& data,
CORBA::Environment &pr_env)
{
cout << "Invoked PushConsumer_impl::push, ";
// Display the "any" data
if ((data.type())->kind() == CORBA::tk_long)
{
CORBA::Long aLong;
if (data >>= aLong); else aLong = 0;
cout << "Long value=" << aLong << endl;
}
else
cout << "Unknown any data received" << endl;
}
|
When a “push” request arrives, the push( ) method simply displays information on standard output. The value, which is encapsulated in the any data variable, is extracted and displayed. For simplicity, the implementation of the disconnect_push_consumer( ) method simply indicates that it was invoked. A real-life implementation could take actions to ensure an orderly shutdown of the PushConsumer.
void PushConsumer_impl::disconnect_push_consumer
(CORBA::Environment &pr_env)
{
cout << "PushConsumer_impl::disconnect_push_consumer "
"was called." << endl;
}
|
The main program for the PushConsumer implementation sets up the environment for processing the requests. The first step is to initialize the ORB and obtain an object reference for the root POA:
int main(int argc, char *argv[])
{
// Initialize the ORB
CORBA::ORB_ptr orb = CORBA::ORB_init(argc, argv);
// Get object reference for POA Manager
CORBAObject = orb->resolve_initial_references("RootPOA");
RootPOA = PortableServer::POA::_narrow(CORBAObject);
|
An instance of the PushConsumer_impl object is created. This is the object that will be servicing the requests arriving from the event channel. The POA is told to activate this object. Once activated, the object can service requests.
PushConsumer_impl *PushConsumer = new PushConsumer_impl;
PortableServer::ObjectId_ptr oid =
RootPOA->activate_object(PushConsumer);
delete oid;
|
Next, the event channel is found in the Naming Service. We assume that another program (RegisterChannel) has previously created the channel and placed it into the Naming Service.
// Get root context for Naming Service
CORBAObject = orb->
resolve_initial_references("NameService");
RootNameContext =
CosNaming::NamingContext::_narrow(CORBAObject);
// Find the Event Channel
name.length(2);
name[0].id = "NSDOM-Samples";
name[0].kind = "";
name[1].id = "SampleEventChannel";
name[1].kind = "EventChannel";
CORBAObject = RootNameContext->resolve(name);
// Narrow object reference to an Event Channel object
EventChannel = CosEventChannelAdmin::EventChannel::
_narrow( CORBAObject );
|
With the event channel, we obtain a ConsumerAdmin object:
CosEventChannelAdmin::ConsumerAdmin_var ConsumerAdminObj; ConsumerAdminObj = EventChannel->for_consumers(); |
The ConsumerAdmin object is then used to obtain a ProxyPushSupplier:
CosEventChannelAdmin::ProxyPushSupplier_var
ProxyPushSupplier;
ProxyPushSupplier = ConsumerAdminObj->
obtain_push_supplier();
|
Using the ProxyPushSupplier object, we register as a consumer of events:
ProxyPushSupplier->connect_push_consumer(PushConsumer); |
Notice that the PushConsumer object created previously is supplied as an argument here. By supplying the PushConsumer object references to the ProxyPushSupplier, we enable it to invoke methods on the PushConsumer object.
The final steps are to activate the POA and allow it to begin servicing requests.
RootPOA->the_POAManager()->activate(); orb->run(); |
The NonStop DOM Event Service provides some additional interfaces for administering the event channel. The following declarations are found in CosEventChannelAdmin.idl:
interface EventChannel
{
struct EventChannelStats
{
unsigned long PushConsumers;
unsigned long PushSuppliers;
unsigned long PullConsumers;
unsigned long PullSuppliers;
unsigned long IdlePullSuppliers;
unsigned long StoredEvents;
unsigned long TotalEvents;
};
struct EventChannelPolicies
{
unsigned long PullDelay;
unsigned long OldEventThreshold;
};
void get_statistics(out EventChannelStats
Statistics);
void set_trace(in boolean OnOff);
void get_policies(out EventChannelPolicies Policies);
void set_policies(in EventChannelPolicies Policies);
};
|
By invoking the various methods on the event channel object, an administrative program can influence the processing policies and obtain statistics on the current state of the event channel.