Module connection
Copyright © 2010-2023. Cloud Software Group, Inc. All Rights Reserved. Confidential & Proprietary.
Classes
class Eftl
-
Programs use Eftl Class to connect to an eFTL server.
Initialize the Eftl instance.
Class variables
var CONNECTED
var CONNECTING
var DISCONNECTED
var DISCONNECTING
var RECONNECTING
Methods
async def connect(self, url, **kwargs)
-
Connect to an eFTL server.
This call returns immediately; connecting continues asynchronously. When the connection is ready to use, the eFTL library calls your on_connect method, passing a EftlConnection object that you can use to publish and subscribe.
When a pipe-separated list of URLs is specified this call will attempt a connection to each in turn, in a random order, until one is connected. A program that uses more than one server channel must connect separately to each channel.
Parameters
url
-
The call connects to the eFTL server at this URL. This can be a single URL, or a pipe ('|') separated list of URLs. URLs can be in either of these forms
ws://host:port/channel wss://host:port/channel
Optionally, the URLs can contain the username, password, and/or client identifier
ws://username:password@host:port/channel?clientId=<identifier> wss://username:password@host:port/channel?clientId=<identifier
kwargs :
auto_reconnect_attempts (int): Maximum number of reconnect attempts. The default is 256 attempts auto_reconnect_max_delay (int): Maximum reconnect delay in milliseconds. The default is 30 seconds. max_pending_acks (int): optional Maximum number of unacknowledged messages allowed for the client. user : optional Login credentials to use if not found in the url. password : optional Login credentials to use if not found in the url. client_id : optional User-specified client identifier. handshake_timeout : optional Seconds to wait for websocket handshake to complete. login_timeout : optional Seconds to halt waiting for a login message reply. If a reply is not received in time, raise an EftlClientError. polling_interval : optional Seconds to wait between each message reply check. trust_all : true/false (optional) trust_store : certificate path (optional) loop : event loop provided by user (optional) Callbacks : 'on_connect(connection):' A new connection to the eFTL server is ready to use. Parameters ---------- connection : The connection that is ready to use. 'on_disconnect(connection, loop, code, reason):' A connection to the eFTL server has closed. Parameters ---------- connection : The connection that closed. loop : the event loop used servicing connection events code : A code categorizes the error. Your program can use this value in its response logic. reason : This string provides more detail. Your program can use this value in error reporting and logging. 'on_reconnect(connection):' A connection to the eFTL server has re-opened and is ready to use. The eFTL library invokes this method only after your program calls <code>messaging.eftl.connection.reconnect</code> and not <code>messaging.eftl.connection.connect</code>. Parameters ---------- connection: The connection that reconnected. 'on_error(connection, code, reason):' An error prevented an operation. Parameters ---------- connection : connection object. For publish errors, this argument is the message that was not published. For subscription errors, this argument is an object that represents a subscription identifier. code: A code categorizes the error. Your program can use this value in its response logic. reason: This string provides more detail. Your program can use this value in error reporting and logging. 'on_state_change(connection, state):' The connection state has changed The eFTL library invokes this method whenever the connection state changes. Parameters ---------- connection: The connection whose state has changed. state: The connection has changed to this state
Returns
The EftlConnection object that can used to publish and subscribe
Raises
ValueError
- If any provided urls lack a host name or the correct scheme.
EftlClientError
- If the connection is not established in time.
def get_version(self)
-
Return the eFTL Python library version.
Returns: str: The eFTL Python library version
class EftlAlreadyConnected (*args, **kwargs)
-
Raise when connect method is called but connection is already connected.
Ancestors
- builtins.Exception
- builtins.BaseException
class EftlClientError (*args, **kwargs)
-
Raise when establishing a connection to the server fails.
Ancestors
- builtins.Exception
- builtins.BaseException
class EftlConnection
-
A websocket wrapper exchanging eftl protocol messages with a server.
Different functions implement each of the types of protocol messages (e.g. publish, subscribe, disconnect). The messages are sent asynchronously, but by default these functions halt waiting for a reponse. The user may set a timeout for this waiting period if they desire.
Users also may specify callbacks to trigger when messages get delivered, fail to get delivered, etc.
Initialize the connection object, which is unconnected.
Methods
async def acknowledge(self, message)
-
Acknowledge this message.
Messages consumed from subscriptions with a client acknowledgment mode must be explicitly acknowledged. The eFTL server will stop delivering messages to the client once the server's configured maximum number of unacknowledged messages is reached.
Parameters
message
:The message being acknowledged
Returns
async def acknowledge_all(self, message)
-
Acknowledge all messages up to and including this message.
Messages consumed from subscriptions with a client acknowledgment mode must be explicitly acknowledged. The eFTL server will stop delivering messages to the client once the server's configured maximum number of unacknowledged messages is reached.
Parameters
message
:The message being acknowledged1
Returns
async def close_all_subscriptions(self)
-
Close all the subscriptions on this connection.
For durable subscriptions, this call will cause the persistence service to stop delivering messages while leaving the durable subscriptions to continue accumulating messages. Any unacknowledged messages will be made available for redelivery.
Programs receive subscription identifiers through their on_subscribe method.
async def close_subscription(self, subscription_id)
-
Close subscription for specified subscription identifier.
For durable subscriptions, this call will cause the persistence service to stop delivering messages while leaving the durable subscriptions to continue accumulating messages. Any unacknowledged messages will be made available for redelivery.
Programs receive subscription identifiers through their on_subscribe method.
async def create_kv_map(self, name)
-
Return a new EftlKVMap associated with this connection.
Parameters
name: name of the KVMap
Return
The KVmap object
def create_message(self)
-
Create an EftlMessage that can be used to publish or send request/replies.
Return
The EftlMessage Object
async def disconnect(self)
-
Disconnect from the eFTL server.
Programs may disconnect to free server resources.
This call returns immediately; disconnecting continues asynchronously.
When the connection has closed, the eFTL library calls your on_disconnect callback.
Return
def get_clientId(self)
-
Get the client identifier for this connection.
Return
The client's identifier.
def is_connected(self)
-
Determine whether this connection to the eFTL server is open or closed.
Return
True if this connection is open, False otherwise
async def publish(self, message, **kwargs)
-
Publish a one-to-many message to all subscribing clients. This call returns immediately; publishing continues asynchronously. When the publish completes successfully, the eFTL library calls your on_completel callback.
When communicating with EMS, to publish a messages on a specific EMS destination include the message field name
_dest
. To distinguish between topics and queues the destination can be prefixed with either "TOPIC:" or "QUEUE:", for example "TOPIC:MyDest" or "QUEUE:MyDest". A destination with no prefix is a topic.example To publish a message on a specific EMS destination add a string field to the message; for example: message.set("_dest", "MyDest");
Parameters
message: Publish this message.
kwargs:
Callbacks: 'on_publish(message):' A publish operation has completed successfully Parameters __________ message: This message has been published. 'on_error(connection, code, reason):' An error prevented an operation. Parameters ---------- connection : connection object. For publish errors, this argument is the message that was not published. For subscription errors, this argument is an object that represents a subscription identifier. code: A code categorizes the error. Your program can use this value in its response logic. reason: This string provides more detail. Your program can use this value in error reporting and logging.
Return
ValueError: If the message size exceeds the maximum. TypeError: If the message is not of type EftlMessage ConnectionError: If the connection is closed.
async def reconnect(self)
-
Reopen a closed connection.
You may call this method within your on_disconnect method.
This call returns immediately; connecting continues asynchronously. When the connection is ready to use, the eFTL library calls your on_reconnect callback
Reconnecting automatically re-activates all subscriptions on the connection. The eFTL library invokes your on_subscribe callback for each successful resubscription.
Return
async def remove_kv_map(self, name)
-
Remove a EftlKVMap object.
Parameters
name
:the name
ofthe map
Return
async def send_reply(self, requestMessage, replyMessage, **kwargs)
-
Send a reply message in response to a request message.
This call returns immediately. When the send completes successfully the eFTL library calls your on_complete callback.
Parameters
requestMessage: The reply message to send replyMessage: The request msg kwargs: Callbacks 'on_complete(message):' Parameters ---------- message: The reply message that completed successfully
Return
async def send_request(self, message, timeout, **kwargs)
-
Publish a request message.
This call returns immediately. When the reply is received the eFTL library calls your 'on_reply' callback.
Parameters
message: The request message to publish. timeout: timeout seconds to wait for reply kwargs: Callbacks: 'on_reply(message):' A request operation has received a reply. Parameters ---------- message: The reply message.
Returns
async def start_subscription(self, subscription_id)
-
Resume message delivery to the specified subscription if stopped.
Parameters
subscription_id: Subscription identifier of the subscription to start.
async def stop_subscription(self, subscription_id)
-
Stop message delivery to the specified subscription.
Parameters
subscription_id: Subscription identifier of the subscription to stop.
async def subscribe(self, **kwargs)
-
Subscribe to messages. Register a subscription for one-to-many messages. This call returns immediately; subscribing continues asynchronously. When the subscription is ready to receive messages, the eFTL library calls your on_subscribe callback.
A matcher can narrow subscription interest in the inbound message stream.
An acknowledgment mode for the subscription can be set to automatically acknowledge consumed messages, require explicit client acknowledgment of the consumed messages, or to disable message acknowledgment altogether. The default is to automatically acknowledge consumed messages.
When explicit client acknowledgment is specified the eFTL server will stop delivering messages to the client once the server's configured maximum number of unacknowledged messages is reached.
When communicating with EMS, to subscribe to messages published on a specific EMS destination use a subscription matcher that includes the message field name
_dest
. To distinguish between topics and queues the destination name can be prefixed with either "TOPIC:" or "QUEUE:", for example "TOPIC:MyDest" or "QUEUE:MyDest". A destination name with no prefix is a topic.example To subscribe to messages published on a specific EMS destination, create a subscription matcher for that destination; for example: var matcher = '{"_dest":"MyDest"}';
Parameters
kwargs:
timeout : optional Number of seconds to halt waiting on acknowledgement from the server messaging.eftl.connect(). matcher : str, optional JSON content matcher to subscribe to. type : {"standard", "shared", "last-value"}, optional Durable type. key : optional The last-value index key, if <code>type</code> is "last-value". durable : str, optional Name to give the durable. ack_mode: client, None. Default is auto
Returns
sub_id: The subcription identifier of the new subscription.
async def unsubscribe(self, sub_id)
-
Unsubscribe from messages on a subscription.
For durable subscriptions, this call will cause the persistence service to remove the durable subscription, along with any persisted messages.
Programs receive subscription identifiers through their methods.
Parameters
sub_id: Subscription identifier of the subscription to delete.
Return
async def unsubscribe_all(self)
-
Unsubscribe from messages on all subscriptions. For durable subscriptions, this call will cause the persistence service to remove the durable subscription, along with any persisted messages.
class EftlKVMap (connection, name)
-
A key-value map object represents a program's connection to an FTL map.
Programs use key-value map objects to set, get, and remove key-value pairs in an FTL map.
Callbacks
'on_success(key, message):' A key-value map operation has completed successfully.
Parameters
key: The key being operated upon. message: The value of the key.
'on_error(key, message, code, reason):' An error prevented a key-value map operation.
Parameters
key: The key being operated upon. message: The value of the key. code: A code categorizes the error. Your program can use this value in its response logic. reason: This string provides more detail. Your program can use this value in error reporting and logging.
Methods
async def get(self, key, **kwargs)
-
Get the value of a key from the map, or
null if the key is not set.Parameters
key: Get the value for this key kwargs: Callbacks: 'on_success(key, message):' A key-value map operation has completed successfully. Parameters __________ key: The key being operated upon. message: The value of the key. 'on_error(key, message, code, reason):' An error prevented a key-value map operation. Parameters __________ key: The key being operated upon. message: The value of the key. code: A code categorizes the error. Your program can use this value in its response logic. reason: This string provides more detail. Your program can use this value in error reporting and logging.
Return
The value as a EftlMessage
async def remove(self, key, **kwargs)
-
Remove a key-value pair from the map.
This call returns immediately; removing continues asynchronously. When the remove completes successfully, the eFTL library calls your on_success callback.
Parameters
key: Remove the value for this key kwargs: Callbacks: 'on_success(key, message):' A key-value map operation has completed successfully. Parameters __________ key: The key being operated upon. message: The value of the key. 'on_error(key, message, code, reason):' An error prevented a key-value map operation. Parameters __________ key: The key being operated upon. message: The value of the key. code: A code categorizes the error. Your program can use this value in its response logic. reason: This string provides more detail. Your program can use this value in error reporting and logging.
Return
async def set(self, key, value, **kwargs)
-
Set a key-value pair in the map, overwriting any existing value.
This call returns immediately; setting continues asynchronously. When the set completes successfully, the eFTL library calls your on_success callback.
Parameters
key: Set the value for this key. value: Set this value for the key. kwargs: Callbacks: 'on_success(key, message):' A key-value map operation has completed successfully. Parameters __________ key: The key being operated upon. message: The value of the key. 'on_error(key, message, code, reason):' An error prevented a key-value map operation. Parameters __________ key: The key being operated upon. message: The value of the key. code: A code categorizes the error. Your program can use this value in its response logic. reason: This string provides more detail. Your program can use this value in error reporting and logging.
Return
class EftlMessage (obj=None)
-
Programs use message objects to publish messages on an eFTL connection or receive messages for a given subscription.
Message objects contain typed fields that map names to values.
Initialize the EftlMessage instance properties.
Methods
def clear_all_fields(self)
-
Remove all the fields in this message.
def clear_field(self, field_name)
-
Remove the given field from this message.
Parameters
field_name: field_name to clear
def get_datetime(self, field_name)
-
Get the value of a long field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type datetime
def get_datetime_array(self, field_name)
-
Get the value of a long field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type datetime array
def get_delivery_count(self)
-
Message's delivery count assigned by the persistence service.
Return
The message delivery count.
def get_double(self, field_name)
-
Get the value of a long field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type double
def get_double_array(self, field_name)
-
Get the value of a long field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type double array
def get_field_names(self)
-
Return the list of field names.
Parameters
The field names of this message as a list object.
Return
def get_field_type(self, field_name)
-
Return the type of value of this field.
Parameters
field_name: The name of the field
Return
Type of value of this field. Possible return values : 'int' represents Integer 'str' represents String 'float' represents Float 'datetime.datetime' represents datetime.datetime 'message' represents EftlMessage object type 'opaque' represents opaque type (bytes) 'unknown'
def get_long(self, field_name)
-
Get the value of a long field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type long
def get_long_array(self, field_name)
-
Get the value of a long array field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type long array
def get_message(self, field_name)
-
Get the value of a long field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type message
def get_message_array(self, field_name)
-
Get the value of a message array field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type message array (list of messages)
def get_opaque(self, field_name)
-
Get the value of opaque field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present
def get_store_message_id(self)
-
Message's unique store identifier assigned by the persistence service.
Return
A monotonically increasing long value that represents message's unique store identifier
def get_string(self, field_name)
-
Get the value of a string field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type string
def get_string_array(self, field_name)
-
Get the value of a string array field from a message.
Parameters
field_name: The name of the field
Return
The value of the field if the field is present and has type string array
def is_field_set(self, field_name)
-
Return True if the field is set, False otherwise.
Parameters
field_name: The name of the field
Return
True if the field is set, False otherwise
def set_datetime(self, field_name, value)
-
Set a datetime field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_datetime_array(self, field_name, values)
-
Set a datetime array field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_double(self, field_name, value)
-
Set a double field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_double_array(self, field_name, values)
-
Set a double array field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_long(self, field_name, value)
-
Set a long field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_long_array(self, field_name, values)
-
Set a long array field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_message(self, field_name, value)
-
Set a message field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_message_array(self, field_name, values)
-
Set a message array field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_opaque(self, field_name, value)
-
Set an opaque field in a message.
Parameters
field_name: The call sets this field value: The call sets this value.
Return
def set_string(self, field_name, value)
-
Set a string field in a message.
Parameters
field_name : The call sets this field value : The call sets this value.
Return
def set_string_array(self, field_name, values)
-
Set a string array field in a message.
Parameters
field_name: The call sets this field values: The call sets this value.
Return
class EftlMessageSizeTooLarge (*args, **kwargs)
-
Raise when EftlMesssage size is larger than the maximum allowed size.
Ancestors
- builtins.Exception
- builtins.BaseException
class EftlUnsupportedError (*args, **kwargs)
-
Raise when unsupported behavior is attempted.
Ancestors
- builtins.Exception
- builtins.BaseException