Main Page | Modules | Class Hierarchy | Compound List | File List | Compound Members | File Members | Related Pages

CommManager Class Reference
[Communications Facilities]

#include <CommManager.hh>

Inherited by SerialCommManager.

List of all members.


Detailed Description

Manager class for mailbox and stream based network communications.

The CommManager class is at the heart of communications in RHexLib. All communication channels are created using an instance of the CommManager class. In addition, the creation of the CommManager class initiates a sub-thread which manages the periodic resending of unacknowledged stream messages. The CommManager class also maintains a list of all remote communications managers which the various channels are contacting. Another sub-thread is created which processes incoming UDP messages and sorts them into the proper channels.

CommManager uses two paradigms for communications in RHexLib: the mailbox paradigm and the stream paradigm.

The mailbox paradigm is the simpler of the two. Mailbox messages are very close in spirit to raw UDP packets, i.e., users just dispatch mail to a mailbox, and there is no guarantee that the mail will ever arrive, or even arrive in the correct order. Mailbox messages are not queued, so only the most recently received mail message is available to be read. Now, in a reliable communications environment, it is very unlikely that mail will not be delievered, and it is extremely unlikely that mail will be received out of order, but there are no guarantees on either.

The stream paradigm provides some measure of guarantees on delivery. Users establish stream sinks as destinations for messages. Users connect stream sources to those stream sinks in order to send messages. Every stream message that is received by a stream sink is acknowledged, and stream sources will periodically resend messages if they are not acknowledged. The stream source holds a queue of the received messages for processing by the user. This protocol is analogous to TCP/IP, but is much simpler, and users have much finer access to and control over the list of unacknowledged messages left in the stream sources. Note that while this protocol can guarantee that a message sent is received, its simplicity means that in the case of intermittent communications failures, it is actually very likely that there will be multiple copies of a message received at a stream sink, and that the messages will be received out of order.


Public Member Functions

 CommManager (int resend_millisecs=200, int priority=-2)
 ~CommManager ()
com_id_t getID ()
MailboxcreateMailbox (int size, com_id_t mailbox_id, com_id_t exclusive_client=0)
void destroyMailbox (Mailbox *)
MailercreateMailer (const char *spec, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0)
MailercreateMailer (RemoteManager *dest, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0)
void destroyMailer (Mailer *)
StreamSinkcreateStreamSink (int size, com_id_t stream_id, int max_buffer, com_id_t exclusive_client=0)
void destroyStreamSink (StreamSink *)
StreamSourcecreateStreamSource (const char *spec, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL)
StreamSourcecreateStreamSource (RemoteManager *dest, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL)
void destroyStreamSource (StreamSource *)
RemoteManageropenRemote (const char *spec)
RemoteManageropenRemoteByHost (const char *host, int port)
StreamSourcecreateStreamSourceByHost (const char *host, int port, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL)
MailercreateMailerByHost (const char *host, int port, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0)
com_id_t nextMessagingID ()
void shutdown ()
com_id_t nextMessageID ()
int sendMailMsg (Mailer *mailer, Message *msg)
int sendStreamMsg (StreamSource *source, Message *msg)
bool immediateMailMsg (RemoteManager *dest, com_id_t mailbox_id, Message *msg)
utils::SymbolTable * getSymbolTable ()
void * getSymbol (const char *name)
bool setSymbol (const char *name, const void *data, utils::SymbolManager *cleaner=(utils::SymbolManager *) NULL, bool overwrite=false)
void lockSymbolTable ()
void unlockSymbolTable ()
bool initPortal (const char *spec)
bool processMessage (Message *msg)
bool launchAuxiliaryThread (pthread_t *tid, void *(*start_routine)(void *), void *arg)

Static Public Member Functions

CommManagercreateManager (const char *spec, int resend_millisecs=200, int priority=-2)
TimeStamp now ()
CommManagerinstance (utils::SymbolTable *table)

Protected Member Functions

void init (int resend_interval=200)
void startThreads (int priority=-2)
void runSendThread ()
void runResendThread ()
void record_contact (RemoteManager *dest)
void send (Message *msg)
void sendPing (RemoteManager *dest)
void sendProtocolMsg (ProtocolBuffer *buf, RemoteManager *dest)
bool immediateStreamMsg (RemoteManager *dest, com_id_t stream_id, com_id_t source_id, Message *msg)
void pack_byte (Byte *&buf, Byte data)
void pack_short (Byte *&buf, short data)
void pack_int (Byte *&buf, int data)
void pack_id (Byte *&buf, int data)
Byte unpack_byte (Byte *&buf, int &len)
short unpack_short (Byte *&buf, int &len)
int unpack_int (Byte *&buf, int &len)
com_id_t unpack_id (Byte *&buf, int &len)
Mailboxfind_mailbox (com_id_t mailbox_id)
Mailerfind_mailer (com_id_t mailbox_id)
StreamSourcefind_source (com_id_t stream_id)
StreamSinkfind_sink (com_id_t stream_id)
void resend_bucket (int i)
ProtocolBuffer * acquire_protocol ()
void release_protocol (ProtocolBuffer *)
utils::List< StreamSource > & stream_bucket (com_id_t stream_id)

Static Protected Member Functions

void * resend_entry (void *)
void * send_entry (void *)

Protected Attributes

com_id_t _id
int _resend_interval
utils::ManagedDict< Mailbox_mailboxes
utils::ManagedDict< Mailer_mailers
utils::ManagedDict< StreamSink_stream_sinks
utils::ManagedDict< StreamSource_stream_sources
utils::ManagedList< RemoteManager_remotes
utils::ManagedList< ProtocolBuffer > _protocol_list
utils::ManagedList< ProtocolBuffer > _protocol_pool
utils::ManagedList< Message_send_list
utils::List< StreamSource_stream_buckets [NUM_BUCKETS-1]
utils::List< CommPortal_portal_list
pthread_rwlock_t _list_lock
pthread_rwlock_t _remote_lock
pthread_mutex_t _send_mutex
pthread_cond_t _send_var
pthread_mutex_t _protocol_pool_mutex
pthread_t _resend_tid
pthread_t _send_tid
com_id_t _next_message_id
com_id_t _next_messaging_id
bool _running
utils::SymbolTable _table
 Private symbol table.

pthread_mutex_t _table_mutex
 Mutex protection for _table.

int _auxiliary_priority

Static Protected Attributes

const int NUM_BUCKETS


Constructor & Destructor Documentation

CommManager::CommManager int  resend_millisecs = 200,
int  priority = -2
 

This is the CommManager constructor. The constructor initiates a thread which has two basic purposes. The main purpose is to resend stream messages which have not yet been acknowledged. A secondary purpose is to notice that it has not heard from a remote communications manager in a while, and if this is the case try and ``ping'' that module with a dummy stream message which will then be acknowledged. Thus, even if a remote communications manager never explicity sends any information to this module, we can maintain a ``heartbeat'' to establish that the remote communications manager is actually alive and healthy. The sub-thread runs roughly every resend_millisecs, and it defaults to 200ms. If priority is negative, then it indicates a relative POSIX thread priority of priority less than the current for the communication manager sub-threads. If priority is not negative, it indicates an absolute POSIX thread priority. priority defaults to one priority less than the current.

CommManager::~CommManager  ) 
 

The destructor cleans up the CommManager memory and terminates the resend thread.


Member Function Documentation

Mailbox* CommManager::createMailbox int  size,
com_id_t  mailbox_id,
com_id_t  exclusive_client = 0
 

This method creates a mailbox of "size" bytes with ID "mailbox_id". If "exclusive_client" is non-zero, only accept mail from remote managers with that ID. Returns NULL for failure.

Mailer* CommManager::createMailer RemoteManager dest,
int  size,
com_id_t  mailbox_id,
com_id_t  exclusive_dest = 0
 

Overloaded method which takes a RemoteManager structure directory

Mailer* CommManager::createMailer const char *  spec,
int  size,
com_id_t  mailbox_id,
com_id_t  exclusive_dest = 0
 

This method creates a Mailer to send messages to a mailbox managed by a remote communications manager specified by "spec". The destination mailbox will have ID "mailbox_id" and must be have a capacity of at least size bytes. If "exclusive_dest" is non-zero, then the remote manager of the destination mailbox must have ID "exclusive_dest". This covers the case where you are sending mail, and the remote manager dies and restarts and you don't want mail going to the newly created mailbox without restarting some other protocol.

Mailer* CommManager::createMailerByHost const char *  host,
int  port,
int  size,
com_id_t  mailbox_id,
com_id_t  exclusive_dest = 0
 

Convenience method which uses CommManager::openRemoteByHost and CommManager::createMailer to create the mailer

CommManager* CommManager::createManager const char *  spec,
int  resend_millisecs = 200,
int  priority = -2
[static]
 

Create a communications manager and initialize with one portal By various heuristin we will build up a specification string from the specification string.

Parameters:
[in] spec If the spec is a number, it is assumed to be the port number for the net portal we will create. If it starts with a / it is assumed to be the device name of a serial device we will use. otherwise, it is assumed to be the straight specification string.
[in] resend_millisecs How fast to cycle the resends
[in] priority If negative, then it represents a delta priority for the comm. sub threads, if positive it is the absolute priority for the comm. sub threads

StreamSink* CommManager::createStreamSink int  size,
com_id_t  stream_id,
int  max_buffer,
com_id_t  exclusive_client = 0
 

This method creates a stream sink (or destination) of "size" bytes with ID "stream_id". Any message coming to this stream sink must have a maximum length of "size" bytes. If the user ignores this sink, it will buffer up to "max_buffer" messages before dropping the oldest messages. If "exclusive_client" is non-negative then this sink will only accept messages from a remote communications manager with ID "exclusive_client".

StreamSource* CommManager::createStreamSource RemoteManager dest,
int  size,
com_id_t  stream_id,
com_id_t  exclusive_dest = 0,
MessagePool *  pool = NULL
 

Overloaded method which takes a RemoteManager structure directory

StreamSource* CommManager::createStreamSource const char *  spec,
int  size,
com_id_t  stream_id,
com_id_t  exclusive_dest = 0,
MessagePool *  pool = NULL
 

This method creates a stream source. The stream is contacted through the remote manager with the spec, "spec". The remote stream sink should have ID "stream_id". The remote stream sink should have at least "size" bytes of capacity. If "exclusive_dest" is non-zero, then if the destination manager changes ID's from "exclusive_dest", then no more messages will be sent.

StreamSource* CommManager::createStreamSourceByHost const char *  host,
int  port,
int  size,
com_id_t  stream_id,
com_id_t  exclusive_dest = 0,
MessagePool *  pool = NULL
 

Convenience method which uses CommManager::openRemoteByHost and CommManager createStreamSource to create the stream source

void CommManager::destroyMailbox Mailbox  ) 
 

This method destroys a given mailbox

void CommManager::destroyMailer Mailer  ) 
 

This method destroys the specified Mailer

void CommManager::destroyStreamSink StreamSink  ) 
 

This method destroys the specified stream sink

void CommManager::destroyStreamSource StreamSource  ) 
 

This methods destroys the specified stream source

com_id_t CommManager::getID  )  [inline]
 

Every communications has a non-zero identifier. It should be unique across all communications managers, but it does simply use the lower 32 bits of the time in microseconds at the time the CommManager was constructed. If by some amazing coincidence, two communications managers do have the same ID, it is not a fatal problem, it just means that there is a chance that some operations that are supposed to be ``exclusive'' to one manager will be open to both.

void* CommManager::getSymbol const char *  name  ) 
 

Get a symbol in a threadsafe manner

Parameters:
[in] name Symbol name
Returns:
Returns NULL if the symbol is undefined, a pointer to the value if not.

utils::SymbolTable* CommManager::getSymbolTable  )  [inline]
 

Access to the symbol table

Returns:
Returns a pointer to the symbol table

void CommManager::init int  resend_interval = 200  )  [protected]
 

internal initialization routine

bool CommManager::initPortal const char *  spec  ) 
 

Initialize a communications portal.

Parameters:
[in] spec The portal specification string
Returns:
Returns true for success, false for failure

CommManager* CommManager::instance utils::SymbolTable *  table  )  [static]
 

Get the communicaitons manager instance stored in the symbol table [in] table The symbol table

Returns:
Return NULL if not there, the comm. manager if there.

bool CommManager::launchAuxiliaryThread pthread_t *  tid,
void *(*  start_routine)(void *),
void *  arg
 

Convenience routine for launching an auxiliary thread (i.e. low priority) thread the correct way

void CommManager::lockSymbolTable  ) 
 

Lock the symbol table mutex

com_id_t CommManager::nextMessagingID  )  [inline]
 

This method returns a "unique" ID to be passed to one of the creation functions. Useful when creating mailboxes or streams on the fly. Thus, by general conventions you should only use fixed mailbox ID's less than 16000. If you have more than 15999 mailboxes, you really shouldn't be using this package anyway.

TimeStamp CommManager::now  )  [static]
 

This method is a static convenience function for getting time in microseconds

RemoteManager* CommManager::openRemote const char *  spec  ) 
 

If any of the mailer's or stream sources have contacted a communications manager equivalent to the one specified by "spec," the method returns a reference to that remote manager. If there is no contact with that remote manager there, it attempts to make contact, and establish a ping relationship with the remote manager.

RemoteManager* CommManager::openRemoteByHost const char *  host,
int  port
 

Convenience function for opening a remote by a host and port.

Parameters:
[in] host If host starts with a /, we we will use a serial connection with "host" as the device name, otherwise we will use a net connection with host as the hostname and port as the port number
[in] port The port number to use, if a network connection

bool CommManager::processMessage Message msg  ) 
 

Process a message. Route messages to the appropriate mail box, stream, or protocol handler.

Parameters:
[in] msg The message to process, includes a source
Returns:
Returns true for a successful processing

bool CommManager::setSymbol const char *  name,
const void *  data,
utils::SymbolManager *  cleaner = (utils::SymbolManager *) NULL,
bool  overwrite = false
 

Set a symbol in a threadsafe manner.

Parameters:
[in] name Symbol name
[in] data Symbol value
[in] cleaner Symbol memory manager for overwriting and destroying
[in] overwrite If false, disable symbol overwriting
Returns:
Returns true if symbol set successfully, false if not.

void CommManager::shutdown  ) 
 

This method shuts down the CommManager threads permanently

void CommManager::unlockSymbolTable  ) 
 

Unlock the symbol table mutex


RHexLib Reference Documentation