Main Page | Modules | Class Hierarchy | Compound List | File List | Compound Members | File Members | Related Pages

CommManager Class Reference
[Communications Facilities]

#include <CommManager.hh>

Inherited by SerialCommManager.

List of all members.


Detailed Description

Manager class for mailbox and stream based network communications.

The CommManager class is at the heart of communications in RHexLib. All communication channels are created using an instance of the CommManager class. In addition, the creation of the CommManager class initiates a sub-thread which manages the periodic resending of unacknowledged stream messages. The CommManager class also maintains a list of all remote communications managers which the various channels are contacting. Another sub-thread is created which processes incoming UDP messages and sorts them into the proper channels.

CommManager uses two paradigms for communications in RHexLib: the mailbox paradigm and the stream paradigm.

The mailbox paradigm is the simpler of the two. Mailbox messages are very close in spirit to raw UDP packets, i.e., users just dispatch mail to a mailbox, and there is no guarantee that the mail will ever arrive, or even arrive in the correct order. Mailbox messages are not queued, so only the most recently received mail message is available to be read. Now, in a reliable communications environment, it is very unlikely that mail will not be delievered, and it is extremely unlikely that mail will be received out of order, but there are no guarantees on either.

The stream paradigm provides some measure of guarantees on delivery. Users establish stream sinks as destinations for messages. Users connect stream sources to those stream sinks in order to send messages. Every stream message that is received by a stream sink is acknowledged, and stream sources will periodically resend messages if they are not acknowledged. The stream source holds a queue of the received messages for processing by the user. This protocol is analogous to TCP/IP, but is much simpler, and users have much finer access to and control over the list of unacknowledged messages left in the stream sources. Note that while this protocol can guarantee that a message sent is received, its simplicity means that in the case of intermittent communications failures, it is actually very likely that there will be multiple copies of a message received at a stream sink, and that the messages will be received out of order.


Public Member Functions

 CommManager (unsigned short port, int resend_millisecs=200, int priority=-2)
virtual ~CommManager ()
com_id_t getID ()
MailboxcreateMailbox (int size, com_id_t mailbox_id, com_id_t exclusive_client=0)
void destroyMailbox (Mailbox *)
MailercreateMailer (const char *host, unsigned short port, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0)
MailercreateMailer (RemoteManager *dest, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0)
void destroyMailer (Mailer *)
StreamSinkcreateStreamSink (int size, com_id_t stream_id, int max_buffer, com_id_t exclusive_client=0)
void destroyStreamSink (StreamSink *)
StreamSourcecreateStreamSource (RemoteManager *dest, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL)
StreamSourcecreateStreamSource (const char *host, unsigned short port, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL)
void destroyStreamSource (StreamSource *)
virtual RemoteManagerlookupRemote (const char *host, unsigned short port)
virtual RemoteManageropenRemote (const char *host, unsigned short port)
com_id_t nextMessagingID ()
void shutdown ()
com_id_t nextMessageID ()
int sendMailMsg (Mailer *mailer, Message *msg)
int sendStreamMsg (StreamSource *source, Message *msg)
virtual int immediateMailMsg (RemoteManager *dest, com_id_t mailbox_id, Message *msg)

Static Public Member Functions

TimeStamp now ()

Protected Member Functions

 CommManager ()
void init (int resend_interval=200)
void startThreads (int priority=-2)
virtual void runReceiveThread ()
virtual void runSendThread ()
void runResendThread ()
void send (Message *msg)
virtual void sendPing (RemoteManager *dest)
virtual void sendProtocolMsg (ProtocolBuffer *buf, sockaddr_in *from)
virtual int immediateStreamMsg (RemoteManager *dest, com_id_t stream_id, com_id_t source_id, Message *msg)
void pack_byte (Byte *&buf, Byte data)
void pack_short (Byte *&buf, short data)
void pack_int (Byte *&buf, int data)
void pack_id (Byte *&buf, int data)
Byte unpack_byte (Byte *&buf, int &len)
short unpack_short (Byte *&buf, int &len)
int unpack_int (Byte *&buf, int &len)
com_id_t unpack_id (Byte *&buf, int &len)
virtual RemoteManagerrecord_contact (sockaddr_in *dest, com_id_t mgr_id, TimeStamp &receipt_time)
Mailboxfind_mailbox (com_id_t mailbox_id)
Mailerfind_mailer (com_id_t mailbox_id)
StreamSourcefind_source (com_id_t stream_id)
StreamSinkfind_sink (com_id_t stream_id)
void resend_bucket (int i)
virtual ProtocolBuffer * acquire_protocol ()
virtual void release_protocol (ProtocolBuffer *)
utils::List< StreamSource > & stream_bucket (com_id_t stream_id)

Static Protected Member Functions

void * receive_entry (void *)
void * resend_entry (void *)
void * send_entry (void *)

Protected Attributes

com_id_t _id
int _resend_interval
utils::ManagedDict< Mailbox_mailboxes
utils::ManagedDict< Mailer_mailers
utils::ManagedDict< StreamSink_stream_sinks
utils::ManagedDict< StreamSource_stream_sources
utils::ManagedList< RemoteManager_remotes
utils::ManagedList< ProtocolBuffer > _protocol_list
utils::ManagedList< ProtocolBuffer > _protocol_pool
utils::List< Message_send_list
utils::List< StreamSource_stream_buckets [NUM_BUCKETS-1]
int _socket_fd
Byte * _incoming_buffer
pthread_rwlock_t _list_lock
pthread_rwlock_t _remote_lock
pthread_mutex_t _send_mutex
pthread_cond_t _send_var
pthread_mutex_t _protocol_pool_mutex
pthread_t _receive_tid
pthread_t _resend_tid
pthread_t _send_tid
com_id_t _next_message_id
com_id_t _next_messaging_id
bool _running

Static Protected Attributes

const int NUM_BUCKETS


Constructor & Destructor Documentation

CommManager::CommManager unsigned short  port,
int  resend_millisecs = 200,
int  priority = -2
 

This is the CommManager constructor. port is the actual UDP port number which this communications manager will use to send and receive messages. The constructor initiates a thread which has two basic purposes. The main purpose is to resend stream messages which have not yet been acknowledged. A secondary purpose is to notice that it has not heard from a remote communications manager in a while, and if this is the case try and ``ping'' that module with a dummy stream message which will then be acknowledged. Thus, even if a remove communications manager never explicity sends any information to this module, we can maintain a ``heartbeat'' to establish that the remote communications manager is actually alive and healthy. The sub-thread runs roughly every resend_millisecs, and it defaults to 200ms. If priority is negative, then it indicates a relative POSIX thread priority of priority less than the current for the communication manager sub-threads. If priority is not negative, it indicates an absolute POSIX thread priority. priority defaults to one priority less than the current.

CommManager::CommManager  )  [protected]
 

special constructor that does not launch threads for SerialCommManager

virtual CommManager::~CommManager  )  [virtual]
 

The destructor cleans up the CommManager memory and terminates the resend thread.


Member Function Documentation

Mailbox* CommManager::createMailbox int  size,
com_id_t  mailbox_id,
com_id_t  exclusive_client = 0
 

This method creates a mailbox of "size" bytes with ID "mailbox_id". If "exclusive_client" is non-zero, only accept mail from remote managers with that ID. Returns NULL for failure.

Mailer* CommManager::createMailer RemoteManager dest,
int  size,
com_id_t  mailbox_id,
com_id_t  exclusive_dest = 0
 

Overloaded method which takes a RemoteManager structure directory

Mailer* CommManager::createMailer const char *  host,
unsigned short  port,
int  size,
com_id_t  mailbox_id,
com_id_t  exclusive_dest = 0
 

This method creates a Mailer to send messages to a mailbox managed by a remote communications manager on the machine "host" with UDP port "port". The destination mailbox will have ID "mailbox_id" and must be have a capacity of at least size bytes. If "exclusive_dest" is non-zero, then the remote manager of the destination mailbox must have ID "exclusive_dest". This covers the case where you are sending mail, and the remote manager dies and restarts and you don't want mail going to the newly created mailbox without restarting some other protocol.

StreamSink* CommManager::createStreamSink int  size,
com_id_t  stream_id,
int  max_buffer,
com_id_t  exclusive_client = 0
 

This method creates a stream sink (or destination) of "size" bytes with ID "stream_id". Any message coming to this stream sink must have a maximum length of "size" bytes. If the user ignores this sink, it will buffer up to "max_buffer" messages before dropping the oldest messages. If "exclusive_client" is non-negative then this sink will only accept messages from a remote communications manager with ID "exclusive_client".

StreamSource* CommManager::createStreamSource const char *  host,
unsigned short  port,
int  size,
com_id_t  stream_id,
com_id_t  exclusive_dest = 0,
MessagePool *  pool = NULL
 

Overloaded method which takes a RemoteManager structure directory

StreamSource* CommManager::createStreamSource RemoteManager dest,
int  size,
com_id_t  stream_id,
com_id_t  exclusive_dest = 0,
MessagePool *  pool = NULL
 

This method creates a stream source. The remote stream sink should have ID "stream_id" on the manager running on the maching "host" on UDP port "port". The remote stream sink should have at least "size" bytes of capacity. If "exclusive_dest" is non-zero, then if the destination manager changes ID's from "exclusive_dest", then no more messages will be sent.

void CommManager::destroyMailbox Mailbox  ) 
 

This method destroys a given mailbox

void CommManager::destroyMailer Mailer  ) 
 

This method destroys the specified Mailer

void CommManager::destroyStreamSink StreamSink  ) 
 

This method destroys the specified stream sink

void CommManager::destroyStreamSource StreamSource  ) 
 

This methods destroys the specified stream source

com_id_t CommManager::getID  )  [inline]
 

Every communications has a non-zero identifier. It should be unique across all communications managers, but it does simply use the lower 32 bits of the time in microseconds at the time the CommManager was constructed. If by some amazing coincidence, two communications managers do have the same ID, it is not a fatal problem, it just means that there is a chance that some operations that are supposed to be ``exclusive'' to one manager will be open to both.

void CommManager::init int  resend_interval = 200  )  [protected]
 

internal initialization routine

virtual RemoteManager* CommManager::lookupRemote const char *  host,
unsigned short  port
[virtual]
 

If any of the mailer's or stream sources have contacted a communications manager on the machine host monitoring UDP port port, this method returns a reference to that remote manager. Otherwise, returns NULL.

com_id_t CommManager::nextMessagingID  )  [inline]
 

This method returns a "unique" ID to be passed to one of the creation functions. Useful when creating mailboxes or streams on the fly. Thus, by general conventions you should only use fixed mailbox ID's less than 16000. If you have more than 15999 mailboxes, you really shouldn't be using this package anyway.

TimeStamp CommManager::now  )  [static]
 

This method is a static convenience function for getting time in microseconds

virtual RemoteManager* CommManager::openRemote const char *  host,
unsigned short  port
[virtual]
 

If any of the mailer's or stream sources have contacted a communications manager on the machine host monitoring UDP port port, this method returns a reference to that remote manager. If there is no contact with that remote manager there, it attempts to make contact, and establish a ping relationship with the remote manager.

int CommManager::sendMailMsg Mailer mailer,
Message msg
 

Creating a mailer is sometimes overkill. This method allows you to send the message msg directly to the mailbox with ID "mailbox_id" managed by the remote manager "dest" without having to explicitly create a mailer. The return value is -1 for an error, or the number of bytes sent.

void CommManager::shutdown  ) 
 

This method shuts down the CommManager threads permanently


RHexLib Reference Documentation