#include <CommManager.hh>
Inherited by SerialCommManager.
The CommManager class is at the heart of communications in RHexLib. All communication channels are created using an instance of the CommManager class. In addition, the creation of the CommManager class initiates a sub-thread which manages the periodic resending of unacknowledged stream messages. The CommManager class also maintains a list of all remote communications managers which the various channels are contacting. Another sub-thread is created which processes incoming UDP messages and sorts them into the proper channels.
CommManager uses two paradigms for communications in RHexLib: the mailbox paradigm and the stream paradigm.
The mailbox paradigm is the simpler of the two. Mailbox messages are very close in spirit to raw UDP packets, i.e., users just dispatch mail to a mailbox, and there is no guarantee that the mail will ever arrive, or even arrive in the correct order. Mailbox messages are not queued, so only the most recently received mail message is available to be read. Now, in a reliable communications environment, it is very unlikely that mail will not be delievered, and it is extremely unlikely that mail will be received out of order, but there are no guarantees on either.
The stream paradigm provides some measure of guarantees on delivery. Users establish stream sinks as destinations for messages. Users connect stream sources to those stream sinks in order to send messages. Every stream message that is received by a stream sink is acknowledged, and stream sources will periodically resend messages if they are not acknowledged. The stream source holds a queue of the received messages for processing by the user. This protocol is analogous to TCP/IP, but is much simpler, and users have much finer access to and control over the list of unacknowledged messages left in the stream sources. Note that while this protocol can guarantee that a message sent is received, its simplicity means that in the case of intermittent communications failures, it is actually very likely that there will be multiple copies of a message received at a stream sink, and that the messages will be received out of order.
Public Member Functions | |
CommManager (unsigned short port, int resend_millisecs=200, int priority=-2) | |
virtual | ~CommManager () |
com_id_t | getID () |
Mailbox * | createMailbox (int size, com_id_t mailbox_id, com_id_t exclusive_client=0) |
void | destroyMailbox (Mailbox *) |
Mailer * | createMailer (const char *host, unsigned short port, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0) |
Mailer * | createMailer (RemoteManager *dest, int size, com_id_t mailbox_id, com_id_t exclusive_dest=0) |
void | destroyMailer (Mailer *) |
StreamSink * | createStreamSink (int size, com_id_t stream_id, int max_buffer, com_id_t exclusive_client=0) |
void | destroyStreamSink (StreamSink *) |
StreamSource * | createStreamSource (RemoteManager *dest, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL) |
StreamSource * | createStreamSource (const char *host, unsigned short port, int size, com_id_t stream_id, com_id_t exclusive_dest=0, MessagePool *pool=NULL) |
void | destroyStreamSource (StreamSource *) |
virtual RemoteManager * | lookupRemote (const char *host, unsigned short port) |
virtual RemoteManager * | openRemote (const char *host, unsigned short port) |
com_id_t | nextMessagingID () |
void | shutdown () |
com_id_t | nextMessageID () |
int | sendMailMsg (Mailer *mailer, Message *msg) |
int | sendStreamMsg (StreamSource *source, Message *msg) |
virtual int | immediateMailMsg (RemoteManager *dest, com_id_t mailbox_id, Message *msg) |
Static Public Member Functions | |
TimeStamp | now () |
Protected Member Functions | |
CommManager () | |
void | init (int resend_interval=200) |
void | startThreads (int priority=-2) |
virtual void | runReceiveThread () |
virtual void | runSendThread () |
void | runResendThread () |
void | send (Message *msg) |
virtual void | sendPing (RemoteManager *dest) |
virtual void | sendProtocolMsg (ProtocolBuffer *buf, sockaddr_in *from) |
virtual int | immediateStreamMsg (RemoteManager *dest, com_id_t stream_id, com_id_t source_id, Message *msg) |
void | pack_byte (Byte *&buf, Byte data) |
void | pack_short (Byte *&buf, short data) |
void | pack_int (Byte *&buf, int data) |
void | pack_id (Byte *&buf, int data) |
Byte | unpack_byte (Byte *&buf, int &len) |
short | unpack_short (Byte *&buf, int &len) |
int | unpack_int (Byte *&buf, int &len) |
com_id_t | unpack_id (Byte *&buf, int &len) |
virtual RemoteManager * | record_contact (sockaddr_in *dest, com_id_t mgr_id, TimeStamp &receipt_time) |
Mailbox * | find_mailbox (com_id_t mailbox_id) |
Mailer * | find_mailer (com_id_t mailbox_id) |
StreamSource * | find_source (com_id_t stream_id) |
StreamSink * | find_sink (com_id_t stream_id) |
void | resend_bucket (int i) |
virtual ProtocolBuffer * | acquire_protocol () |
virtual void | release_protocol (ProtocolBuffer *) |
utils::List< StreamSource > & | stream_bucket (com_id_t stream_id) |
Static Protected Member Functions | |
void * | receive_entry (void *) |
void * | resend_entry (void *) |
void * | send_entry (void *) |
Protected Attributes | |
com_id_t | _id |
int | _resend_interval |
utils::ManagedDict< Mailbox > | _mailboxes |
utils::ManagedDict< Mailer > | _mailers |
utils::ManagedDict< StreamSink > | _stream_sinks |
utils::ManagedDict< StreamSource > | _stream_sources |
utils::ManagedList< RemoteManager > | _remotes |
utils::ManagedList< ProtocolBuffer > | _protocol_list |
utils::ManagedList< ProtocolBuffer > | _protocol_pool |
utils::List< Message > | _send_list |
utils::List< StreamSource > | _stream_buckets [NUM_BUCKETS-1] |
int | _socket_fd |
Byte * | _incoming_buffer |
pthread_rwlock_t | _list_lock |
pthread_rwlock_t | _remote_lock |
pthread_mutex_t | _send_mutex |
pthread_cond_t | _send_var |
pthread_mutex_t | _protocol_pool_mutex |
pthread_t | _receive_tid |
pthread_t | _resend_tid |
pthread_t | _send_tid |
com_id_t | _next_message_id |
com_id_t | _next_messaging_id |
bool | _running |
Static Protected Attributes | |
const int | NUM_BUCKETS |
|
This is the CommManager constructor. port is the actual UDP port number which this communications manager will use to send and receive messages. The constructor initiates a thread which has two basic purposes. The main purpose is to resend stream messages which have not yet been acknowledged. A secondary purpose is to notice that it has not heard from a remote communications manager in a while, and if this is the case try and ``ping'' that module with a dummy stream message which will then be acknowledged. Thus, even if a remove communications manager never explicity sends any information to this module, we can maintain a ``heartbeat'' to establish that the remote communications manager is actually alive and healthy. The sub-thread runs roughly every resend_millisecs, and it defaults to 200ms. If priority is negative, then it indicates a relative POSIX thread priority of priority less than the current for the communication manager sub-threads. If priority is not negative, it indicates an absolute POSIX thread priority. priority defaults to one priority less than the current. |
|
special constructor that does not launch threads for SerialCommManager |
|
The destructor cleans up the CommManager memory and terminates the resend thread. |
|
This method creates a mailbox of "size" bytes with ID "mailbox_id". If "exclusive_client" is non-zero, only accept mail from remote managers with that ID. Returns NULL for failure. |
|
Overloaded method which takes a RemoteManager structure directory |
|
This method creates a Mailer to send messages to a mailbox managed by a remote communications manager on the machine "host" with UDP port "port". The destination mailbox will have ID "mailbox_id" and must be have a capacity of at least size bytes. If "exclusive_dest" is non-zero, then the remote manager of the destination mailbox must have ID "exclusive_dest". This covers the case where you are sending mail, and the remote manager dies and restarts and you don't want mail going to the newly created mailbox without restarting some other protocol. |
|
This method creates a stream sink (or destination) of "size" bytes with ID "stream_id". Any message coming to this stream sink must have a maximum length of "size" bytes. If the user ignores this sink, it will buffer up to "max_buffer" messages before dropping the oldest messages. If "exclusive_client" is non-negative then this sink will only accept messages from a remote communications manager with ID "exclusive_client". |
|
Overloaded method which takes a RemoteManager structure directory |
|
This method creates a stream source. The remote stream sink should have ID "stream_id" on the manager running on the maching "host" on UDP port "port". The remote stream sink should have at least "size" bytes of capacity. If "exclusive_dest" is non-zero, then if the destination manager changes ID's from "exclusive_dest", then no more messages will be sent. |
|
This method destroys a given mailbox |
|
This method destroys the specified Mailer |
|
This method destroys the specified stream sink |
|
This methods destroys the specified stream source |
|
Every communications has a non-zero identifier. It should be unique across all communications managers, but it does simply use the lower 32 bits of the time in microseconds at the time the CommManager was constructed. If by some amazing coincidence, two communications managers do have the same ID, it is not a fatal problem, it just means that there is a chance that some operations that are supposed to be ``exclusive'' to one manager will be open to both. |
|
internal initialization routine |
|
If any of the mailer's or stream sources have contacted a communications manager on the machine host monitoring UDP port port, this method returns a reference to that remote manager. Otherwise, returns NULL. |
|
This method returns a "unique" ID to be passed to one of the creation functions. Useful when creating mailboxes or streams on the fly. Thus, by general conventions you should only use fixed mailbox ID's less than 16000. If you have more than 15999 mailboxes, you really shouldn't be using this package anyway. |
|
This method is a static convenience function for getting time in microseconds |
|
If any of the mailer's or stream sources have contacted a communications manager on the machine host monitoring UDP port port, this method returns a reference to that remote manager. If there is no contact with that remote manager there, it attempts to make contact, and establish a ping relationship with the remote manager. |
|
Creating a mailer is sometimes overkill. This method allows you to send the message msg directly to the mailbox with ID "mailbox_id" managed by the remote manager "dest" without having to explicitly create a mailer. The return value is -1 for an error, or the number of bytes sent. |
|
This method shuts down the CommManager threads permanently |