#include <CommManager.hh>
Inherited by SerialCommManager.
The CommManager class is at the heart of communications in RHexLib. All communication channels are created using an instance of the CommManager class. In addition, the creation of the CommManager class initiates a sub-thread which manages the periodic resending of unacknowledged stream messages. The CommManager class also maintains a list of all remote communications managers which the various channels are contacting. Another sub-thread is created which processes incoming UDP messages and sorts them into the proper channels.
CommManager uses two paradigms for communications in RHexLib: the mailbox paradigm and the stream paradigm.
The mailbox paradigm is the simpler of the two. Mailbox messages are very close in spirit to raw UDP packets, i.e., users just dispatch mail to a mailbox, and there is no guarantee that the mail will ever arrive, or even arrive in the correct order. Mailbox messages are not queued, so only the most recently received mail message is available to be read. Now, in a reliable communications environment, it is very unlikely that mail will not be delievered, and it is extremely unlikely that mail will be received out of order, but there are no guarantees on either.
The stream paradigm provides some measure of guarantees on delivery. Users establish stream sinks as destinations for messages. Users connect stream sources to those stream sinks in order to send messages. Every stream message that is received by a stream sink is acknowledged, and stream sources will periodically resend messages if they are not acknowledged. The stream source holds a queue of the received messages for processing by the user. This protocol is analogous to TCP/IP, but is much simpler, and users have much finer access to and control over the list of unacknowledged messages left in the stream sources. Note that while this protocol can guarantee that a message sent is received, its simplicity means that in the case of intermittent communications failures, it is actually very likely that there will be multiple copies of a message received at a stream sink, and that the messages will be received out of order.
Public Member Functions | |
CommManager (int resend_millisecs=200, int priority=-2) | |
~CommManager () | |
com_id_t | getID () |
Mailbox * | createMailbox (int size, com_id_t mailbox_id, com_id_t exclusive_client=0) |
void | destroyMailbox (Mailbox *) |
Mailer * | createMailer (const char *spec, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0) |
Mailer * | createMailer (RemoteManager *dest, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0) |
void | destroyMailer (Mailer *) |
StreamSink * | createStreamSink (int size, com_id_t stream_id, int max_buffer, com_id_t exclusive_client=0) |
void | destroyStreamSink (StreamSink *) |
StreamSource * | createStreamSource (const char *spec, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL) |
StreamSource * | createStreamSource (RemoteManager *dest, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL) |
void | destroyStreamSource (StreamSource *) |
RemoteManager * | openRemote (const char *spec) |
RemoteManager * | openRemoteByHost (const char *host, int port) |
StreamSource * | createStreamSourceByHost (const char *host, int port, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL) |
Mailer * | createMailerByHost (const char *host, int port, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0) |
com_id_t | nextMessagingID () |
void | shutdown () |
com_id_t | nextMessageID () |
int | sendMailMsg (Mailer *mailer, Message *msg) |
int | sendStreamMsg (StreamSource *source, Message *msg) |
bool | immediateMailMsg (RemoteManager *dest, com_id_t mailbox_id, Message *msg) |
utils::SymbolTable * | getSymbolTable () |
void * | getSymbol (const char *name) |
bool | setSymbol (const char *name, const void *data, utils::SymbolManager *cleaner=(utils::SymbolManager *) NULL, bool overwrite=false) |
void | lockSymbolTable () |
void | unlockSymbolTable () |
bool | initPortal (const char *spec) |
bool | processMessage (Message *msg) |
bool | launchAuxiliaryThread (pthread_t *tid, void *(*start_routine)(void *), void *arg) |
Static Public Member Functions | |
CommManager * | createManager (const char *spec, int resend_millisecs=200, int priority=-2) |
TimeStamp | now () |
CommManager * | instance (utils::SymbolTable *table) |
Protected Member Functions | |
void | init (int resend_interval=200) |
void | startThreads (int priority=-2) |
void | runSendThread () |
void | runResendThread () |
void | record_contact (RemoteManager *dest) |
void | send (Message *msg) |
void | sendPing (RemoteManager *dest) |
void | sendProtocolMsg (ProtocolBuffer *buf, RemoteManager *dest) |
bool | immediateStreamMsg (RemoteManager *dest, com_id_t stream_id, com_id_t source_id, Message *msg) |
void | pack_byte (Byte *&buf, Byte data) |
void | pack_short (Byte *&buf, short data) |
void | pack_int (Byte *&buf, int data) |
void | pack_id (Byte *&buf, int data) |
Byte | unpack_byte (Byte *&buf, int &len) |
short | unpack_short (Byte *&buf, int &len) |
int | unpack_int (Byte *&buf, int &len) |
com_id_t | unpack_id (Byte *&buf, int &len) |
Mailbox * | find_mailbox (com_id_t mailbox_id) |
Mailer * | find_mailer (com_id_t mailbox_id) |
StreamSource * | find_source (com_id_t stream_id) |
StreamSink * | find_sink (com_id_t stream_id) |
void | resend_bucket (int i) |
ProtocolBuffer * | acquire_protocol () |
void | release_protocol (ProtocolBuffer *) |
utils::List< StreamSource > & | stream_bucket (com_id_t stream_id) |
Static Protected Member Functions | |
void * | resend_entry (void *) |
void * | send_entry (void *) |
Protected Attributes | |
com_id_t | _id |
int | _resend_interval |
utils::ManagedDict< Mailbox > | _mailboxes |
utils::ManagedDict< Mailer > | _mailers |
utils::ManagedDict< StreamSink > | _stream_sinks |
utils::ManagedDict< StreamSource > | _stream_sources |
utils::ManagedList< RemoteManager > | _remotes |
utils::ManagedList< ProtocolBuffer > | _protocol_list |
utils::ManagedList< ProtocolBuffer > | _protocol_pool |
utils::ManagedList< Message > | _send_list |
utils::List< StreamSource > | _stream_buckets [NUM_BUCKETS-1] |
utils::List< CommPortal > | _portal_list |
pthread_rwlock_t | _list_lock |
pthread_rwlock_t | _remote_lock |
pthread_mutex_t | _send_mutex |
pthread_cond_t | _send_var |
pthread_mutex_t | _protocol_pool_mutex |
pthread_t | _resend_tid |
pthread_t | _send_tid |
com_id_t | _next_message_id |
com_id_t | _next_messaging_id |
bool | _running |
utils::SymbolTable | _table |
Private symbol table. | |
pthread_mutex_t | _table_mutex |
Mutex protection for _table. | |
int | _auxiliary_priority |
Static Protected Attributes | |
const int | NUM_BUCKETS |
|
This is the CommManager constructor. The constructor initiates a thread which has two basic purposes. The main purpose is to resend stream messages which have not yet been acknowledged. A secondary purpose is to notice that it has not heard from a remote communications manager in a while, and if this is the case try and ``ping'' that module with a dummy stream message which will then be acknowledged. Thus, even if a remote communications manager never explicity sends any information to this module, we can maintain a ``heartbeat'' to establish that the remote communications manager is actually alive and healthy. The sub-thread runs roughly every resend_millisecs, and it defaults to 200ms. If priority is negative, then it indicates a relative POSIX thread priority of priority less than the current for the communication manager sub-threads. If priority is not negative, it indicates an absolute POSIX thread priority. priority defaults to one priority less than the current. |
|
The destructor cleans up the CommManager memory and terminates the resend thread. |
|
This method creates a mailbox of "size" bytes with ID "mailbox_id". If "exclusive_client" is non-zero, only accept mail from remote managers with that ID. Returns NULL for failure. |
|
Overloaded method which takes a RemoteManager structure directory |
|
This method creates a Mailer to send messages to a mailbox managed by a remote communications manager specified by "spec". The destination mailbox will have ID "mailbox_id" and must be have a capacity of at least size bytes. If "exclusive_dest" is non-zero, then the remote manager of the destination mailbox must have ID "exclusive_dest". This covers the case where you are sending mail, and the remote manager dies and restarts and you don't want mail going to the newly created mailbox without restarting some other protocol. |
|
Convenience method which uses CommManager::openRemoteByHost and CommManager::createMailer to create the mailer |
|
Create a communications manager and initialize with one portal By various heuristin we will build up a specification string from the specification string.
|
|
This method creates a stream sink (or destination) of "size" bytes with ID "stream_id". Any message coming to this stream sink must have a maximum length of "size" bytes. If the user ignores this sink, it will buffer up to "max_buffer" messages before dropping the oldest messages. If "exclusive_client" is non-negative then this sink will only accept messages from a remote communications manager with ID "exclusive_client". |
|
Overloaded method which takes a RemoteManager structure directory |
|
This method creates a stream source. The stream is contacted through the remote manager with the spec, "spec". The remote stream sink should have ID "stream_id". The remote stream sink should have at least "size" bytes of capacity. If "exclusive_dest" is non-zero, then if the destination manager changes ID's from "exclusive_dest", then no more messages will be sent. |
|
Convenience method which uses CommManager::openRemoteByHost and CommManager createStreamSource to create the stream source |
|
This method destroys a given mailbox |
|
This method destroys the specified Mailer |
|
This method destroys the specified stream sink |
|
This methods destroys the specified stream source |
|
Every communications has a non-zero identifier. It should be unique across all communications managers, but it does simply use the lower 32 bits of the time in microseconds at the time the CommManager was constructed. If by some amazing coincidence, two communications managers do have the same ID, it is not a fatal problem, it just means that there is a chance that some operations that are supposed to be ``exclusive'' to one manager will be open to both. |
|
Get a symbol in a threadsafe manner
|
|
Access to the symbol table
|
|
internal initialization routine |
|
Initialize a communications portal.
|
|
Get the communicaitons manager instance stored in the symbol table [in] table The symbol table
|
|
Convenience routine for launching an auxiliary thread (i.e. low priority) thread the correct way |
|
Lock the symbol table mutex |
|
This method returns a "unique" ID to be passed to one of the creation functions. Useful when creating mailboxes or streams on the fly. Thus, by general conventions you should only use fixed mailbox ID's less than 16000. If you have more than 15999 mailboxes, you really shouldn't be using this package anyway. |
|
This method is a static convenience function for getting time in microseconds |
|
If any of the mailer's or stream sources have contacted a communications manager equivalent to the one specified by "spec," the method returns a reference to that remote manager. If there is no contact with that remote manager there, it attempts to make contact, and establish a ping relationship with the remote manager. |
|
Convenience function for opening a remote by a host and port.
|
|
Process a message. Route messages to the appropriate mail box, stream, or protocol handler.
|
|
Set a symbol in a threadsafe manner.
|
|
This method shuts down the CommManager threads permanently |
|
Unlock the symbol table mutex |