Module connection

Copyright © 2010-2023. Cloud Software Group, Inc. All Rights Reserved. Confidential & Proprietary.

Classes

class Eftl

Programs use Eftl Class to connect to an eFTL server.

Initialize the Eftl instance.

Class variables

var CONNECTED
var CONNECTING
var DISCONNECTED
var DISCONNECTING
var RECONNECTING

Methods

async def connect(self, url, **kwargs)

Connect to an eFTL server.

This call returns immediately; connecting continues asynchronously. When the connection is ready to use, the eFTL library calls your on_connect method, passing a EftlConnection object that you can use to publish and subscribe.

When a pipe-separated list of URLs is specified this call will attempt a connection to each in turn, in a random order, until one is connected. A program that uses more than one server channel must connect separately to each channel.

Parameters

url

The call connects to the eFTL server at this URL. This can be a single URL, or a pipe ('|') separated list of URLs. URLs can be in either of these forms

ws://host:port/channel
wss://host:port/channel

Optionally, the URLs can contain the username, password, and/or client identifier

ws://username:password@host:port/channel?clientId=<identifier>
wss://username:password@host:port/channel?clientId=<identifier

kwargs :

auto_reconnect_attempts (int): Maximum number of reconnect attempts. The default is 256 attempts
auto_reconnect_max_delay (int): Maximum reconnect delay in milliseconds. The default is 30 seconds.
max_pending_acks (int): optional
                  Maximum number of unacknowledged messages allowed for the
                  client.
user : optional
            Login credentials to use if not found in the url.
password : optional
            Login credentials to use if not found in the url.
client_id : optional
           User-specified client identifier.
handshake_timeout : optional
             Seconds to wait for websocket handshake to complete.
login_timeout : optional
             Seconds to halt waiting for a login message reply. If a
             reply is not received in time, raise an EftlClientError.
polling_interval : optional
            Seconds to wait between each message reply check.
trust_all : true/false (optional)
trust_store :  certificate path (optional)
loop : event loop provided by user (optional)

Callbacks :

   'on_connect(connection):'
            A new connection to the eFTL server is ready to use.

   Parameters
   ----------
   connection : The connection that is ready to use.

   'on_disconnect(connection, loop, code, reason):' A connection to the eFTL server has closed.

   Parameters
   ----------
   connection : The connection that closed.
   loop : the event loop used servicing connection events
   code : A code categorizes the error. Your program can use this value in its response logic.
   reason : This string provides more detail. Your program can use this value in error reporting and logging.

   'on_reconnect(connection):' A connection to the eFTL server has re-opened and is ready to use.
                               The eFTL library invokes this method only after your 
                               program calls <code>messaging.eftl.connection.reconnect</code> and not <code>messaging.eftl.connection.connect</code>.

   Parameters
   ----------
   connection: The connection that reconnected.

   'on_error(connection, code, reason):' An error prevented an operation.

   Parameters
   ----------
   connection : connection object. For publish errors, this argument is the message that was not published. 
                                   For subscription errors, this argument is an object that represents 
                                   a subscription identifier.
   code: A code categorizes the error. Your program can use this value in its response logic.
   reason: This string provides more detail. Your program can use this value in error reporting and logging.

   'on_state_change(connection, state):' The connection state has changed
                                         The eFTL library invokes this method whenever the connection state changes.

    Parameters
    ----------
    connection: The connection whose state has changed.
    state: The connection has changed to this state

Returns

The EftlConnection object that can used to publish and subscribe

Raises

ValueError
If any provided urls lack a host name or the correct scheme.
EftlClientError
If the connection is not established in time.
def get_version(self)

Return the eFTL Python library version.

Returns: str: The eFTL Python library version

class EftlAlreadyConnected (*args, **kwargs)

Raise when connect method is called but connection is already connected.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class EftlClientError (*args, **kwargs)

Raise when establishing a connection to the server fails.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class EftlConnection

A websocket wrapper exchanging eftl protocol messages with a server.

Different functions implement each of the types of protocol messages (e.g. publish, subscribe, disconnect). The messages are sent asynchronously, but by default these functions halt waiting for a reponse. The user may set a timeout for this waiting period if they desire.

Users also may specify callbacks to trigger when messages get delivered, fail to get delivered, etc.

Initialize the connection object, which is unconnected.

Methods

async def acknowledge(self, message)

Acknowledge this message.

Messages consumed from subscriptions with a client acknowledgment mode must be explicitly acknowledged. The eFTL server will stop delivering messages to the client once the server's configured maximum number of unacknowledged messages is reached.

Parameters

message : The message being acknowledged
 

Returns

async def acknowledge_all(self, message)

Acknowledge all messages up to and including this message.

Messages consumed from subscriptions with a client acknowledgment mode must be explicitly acknowledged. The eFTL server will stop delivering messages to the client once the server's configured maximum number of unacknowledged messages is reached.

Parameters

message : The message being acknowledged1
 

Returns

async def close_all_subscriptions(self)

Close all the subscriptions on this connection.

For durable subscriptions, this call will cause the persistence service to stop delivering messages while leaving the durable subscriptions to continue accumulating messages. Any unacknowledged messages will be made available for redelivery.

Programs receive subscription identifiers through their on_subscribe method.

async def close_subscription(self, subscription_id)

Close subscription for specified subscription identifier.

For durable subscriptions, this call will cause the persistence service to stop delivering messages while leaving the durable subscriptions to continue accumulating messages. Any unacknowledged messages will be made available for redelivery.

Programs receive subscription identifiers through their on_subscribe method.

async def create_kv_map(self, name)

Return a new EftlKVMap associated with this connection.

Parameters

name: name of the KVMap

Return

The KVmap object
def create_message(self)

Create an EftlMessage that can be used to publish or send request/replies.

Return

The EftlMessage Object
async def disconnect(self)

Disconnect from the eFTL server.

Programs may disconnect to free server resources.

This call returns immediately; disconnecting continues asynchronously.

When the connection has closed, the eFTL library calls your on_disconnect callback.

Return

def get_clientId(self)

Get the client identifier for this connection.

Return

The client's identifier.
def is_connected(self)

Determine whether this connection to the eFTL server is open or closed.

Return

True if this connection is open, False otherwise
async def publish(self, message, **kwargs)

Publish a one-to-many message to all subscribing clients. This call returns immediately; publishing continues asynchronously. When the publish completes successfully, the eFTL library calls your on_completel callback.

When communicating with EMS, to publish a messages on a specific EMS destination include the message field name _dest. To distinguish between topics and queues the destination can be prefixed with either "TOPIC:" or "QUEUE:", for example "TOPIC:MyDest" or "QUEUE:MyDest". A destination with no prefix is a topic.

example To publish a message on a specific EMS destination add a string field to the message; for example: message.set("_dest", "MyDest");

Parameters


message: Publish this message.

kwargs:

Callbacks:

   'on_publish(message):' A publish operation has completed successfully

   Parameters
   __________
   message: This message has been published.

   'on_error(connection, code, reason):' An error prevented an operation.
   Parameters
   ----------
   connection : connection object. For publish errors, this argument is the message that was not published. 
                                   For subscription errors, this argument is an object that represents 
                                   a subscription identifier.
   code: A code categorizes the error. Your program can use this value in its response logic.
   reason: This string provides more detail. Your program can use this value in error reporting and logging.

Return

ValueError: If the message size exceeds the maximum.
TypeError: If the message is not of type EftlMessage
ConnectionError: If the connection is closed.
async def reconnect(self)

Reopen a closed connection.

You may call this method within your on_disconnect method.

This call returns immediately; connecting continues asynchronously. When the connection is ready to use, the eFTL library calls your on_reconnect callback

Reconnecting automatically re-activates all subscriptions on the connection. The eFTL library invokes your on_subscribe callback for each successful resubscription.

Return

async def remove_kv_map(self, name)

Remove a EftlKVMap object.

Parameters

name : the name of the map
 

Return

async def send_reply(self, requestMessage, replyMessage, **kwargs)

Send a reply message in response to a request message.

This call returns immediately. When the send completes successfully the eFTL library calls your on_complete callback.

Parameters

requestMessage: The reply message to send
replyMessage: The request msg
kwargs:
    Callbacks
        'on_complete(message):'
        Parameters
        ----------
            message: The reply message that completed successfully

Return

async def send_request(self, message, timeout, **kwargs)

Publish a request message.

This call returns immediately. When the reply is received the eFTL library calls your 'on_reply' callback.

Parameters

message: The request message to publish.
timeout: timeout seconds to wait for reply
kwargs:
    Callbacks:
        'on_reply(message):' A request operation has received a reply.
        Parameters
        ----------
            message: The reply message.

Returns

async def start_subscription(self, subscription_id)

Resume message delivery to the specified subscription if stopped.

Parameters


subscription_id: Subscription identifier of the subscription to start.
async def stop_subscription(self, subscription_id)

Stop message delivery to the specified subscription.

Parameters


subscription_id: Subscription identifier of the subscription to stop.
async def subscribe(self, **kwargs)

Subscribe to messages. Register a subscription for one-to-many messages. This call returns immediately; subscribing continues asynchronously. When the subscription is ready to receive messages, the eFTL library calls your on_subscribe callback.

A matcher can narrow subscription interest in the inbound message stream.

An acknowledgment mode for the subscription can be set to automatically acknowledge consumed messages, require explicit client acknowledgment of the consumed messages, or to disable message acknowledgment altogether. The default is to automatically acknowledge consumed messages.

When explicit client acknowledgment is specified the eFTL server will stop delivering messages to the client once the server's configured maximum number of unacknowledged messages is reached.

When communicating with EMS, to subscribe to messages published on a specific EMS destination use a subscription matcher that includes the message field name _dest. To distinguish between topics and queues the destination name can be prefixed with either "TOPIC:" or "QUEUE:", for example "TOPIC:MyDest" or "QUEUE:MyDest". A destination name with no prefix is a topic.

example To subscribe to messages published on a specific EMS destination, create a subscription matcher for that destination; for example: var matcher = '{"_dest":"MyDest"}';

Parameters

kwargs:

timeout : optional
    Number of seconds to halt waiting on acknowledgement from
    the server messaging.eftl.connect().

matcher : str, optional
    JSON content matcher to subscribe to.

type : {"standard", "shared", "last-value"}, optional
    Durable type.

key : optional
    The last-value index key, if <code>type</code> is "last-value".

durable : str, optional
    Name to give the durable.

ack_mode: client, None. Default is auto

Returns

sub_id: The subcription identifier of the new subscription.
async def unsubscribe(self, sub_id)

Unsubscribe from messages on a subscription.

For durable subscriptions, this call will cause the persistence service to remove the durable subscription, along with any persisted messages.

Programs receive subscription identifiers through their methods.

Parameters

sub_id:  Subscription identifier of the subscription to delete.

Return

async def unsubscribe_all(self)

Unsubscribe from messages on all subscriptions. For durable subscriptions, this call will cause the persistence service to remove the durable subscription, along with any persisted messages.

class EftlKVMap (connection, name)

A key-value map object represents a program's connection to an FTL map.

Programs use key-value map objects to set, get, and remove key-value pairs in an FTL map.

Callbacks

'on_success(key, message):' A key-value map operation has completed successfully.

Parameters


key: The key being operated upon.
message: The value of the key.

'on_error(key, message, code, reason):' An error prevented a key-value map operation.

Parameters


key: The key being operated upon.
message: The value of the key.
code: A code categorizes the error. Your program can use this value in its response logic.
reason: This string provides more detail. Your program can use this value in error reporting and logging.

Methods

async def get(self, key, **kwargs)

Get the value of a key from the map, or null if the key is not set.

Parameters


 key: Get the value for this key
 kwargs:
     Callbacks:

     'on_success(key, message):' A key-value map operation has completed successfully.
     Parameters
     __________
     key: The key being operated upon.
     message: The value of the key.

     'on_error(key, message, code, reason):' An error prevented a key-value map operation.
     Parameters
     __________
     key: The key being operated upon.
     message: The value of the key.
     code: A code categorizes the error. Your program can use this value in its response logic.
     reason: This string provides more detail. Your program can use this value in error reporting and logging.

Return

The value as a EftlMessage
async def remove(self, key, **kwargs)

Remove a key-value pair from the map.

This call returns immediately; removing continues asynchronously. When the remove completes successfully, the eFTL library calls your on_success callback.

Parameters

key: Remove the value for this key
kwargs:
    Callbacks:
    'on_success(key, message):' A key-value map operation has completed successfully.
    Parameters
    __________
    key: The key being operated upon.
    message: The value of the key.

    'on_error(key, message, code, reason):' An error prevented a key-value map operation.
    Parameters
    __________
    key: The key being operated upon.
    message: The value of the key.
    code: A code categorizes the error. Your program can use this value in its response logic.
    reason: This string provides more detail. Your program can use this value in error reporting and logging.

Return

async def set(self, key, value, **kwargs)

Set a key-value pair in the map, overwriting any existing value.

This call returns immediately; setting continues asynchronously. When the set completes successfully, the eFTL library calls your on_success callback.

Parameters

key: Set the value for this key.
value: Set this value for the key.
kwargs:
    Callbacks:

    'on_success(key, message):' A key-value map operation has completed successfully.
    Parameters
    __________
    key: The key being operated upon.
    message: The value of the key.

    'on_error(key, message, code, reason):' An error prevented a key-value map operation.
    Parameters
    __________
    key: The key being operated upon.
    message: The value of the key.
    code: A code categorizes the error. Your program can use this value in its response logic.
    reason: This string provides more detail. Your program can use this value in error reporting and logging.

Return

class EftlMessage (obj=None)

Programs use message objects to publish messages on an eFTL connection or receive messages for a given subscription.

Message objects contain typed fields that map names to values.

Initialize the EftlMessage instance properties.

Methods

def clear_all_fields(self)

Remove all the fields in this message.

def clear_field(self, field_name)

Remove the given field from this message.

Parameters

field_name: field_name to clear
def get_datetime(self, field_name)

Get the value of a long field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type datetime
def get_datetime_array(self, field_name)

Get the value of a long field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type datetime array
def get_delivery_count(self)

Message's delivery count assigned by the persistence service.

Return

  The message delivery count.
def get_double(self, field_name)

Get the value of a long field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type double
def get_double_array(self, field_name)

Get the value of a long field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type double array
def get_field_names(self)

Return the list of field names.

Parameters

The field names of this message as a list object.

Return

def get_field_type(self, field_name)

Return the type of value of this field.

Parameters

field_name: The name of the field

Return

Type of value of this field.

Possible return values : 
        'int' represents Integer
        'str' represents String
        'float' represents Float
        'datetime.datetime' represents datetime.datetime 
        'message' represents EftlMessage object type
        'opaque' represents opaque type (bytes)
        'unknown'
def get_long(self, field_name)

Get the value of a long field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type long
def get_long_array(self, field_name)

Get the value of a long array field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type long array
def get_message(self, field_name)

Get the value of a long field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type message
def get_message_array(self, field_name)

Get the value of a message array field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type message array (list of messages)
def get_opaque(self, field_name)

Get the value of opaque field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
def get_store_message_id(self)

Message's unique store identifier assigned by the persistence service.

Return

A monotonically increasing long value that represents
message's unique store identifier
def get_string(self, field_name)

Get the value of a string field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type string
def get_string_array(self, field_name)

Get the value of a string array field from a message.

Parameters

field_name: The name of the field

Return

The value of the field if the field is present
and has type string array
def is_field_set(self, field_name)

Return True if the field is set, False otherwise.

Parameters

field_name: The name of the field

Return

True if the field is set, False otherwise
def set_datetime(self, field_name, value)

Set a datetime field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_datetime_array(self, field_name, values)

Set a datetime array field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_double(self, field_name, value)

Set a double field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_double_array(self, field_name, values)

Set a double array field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_long(self, field_name, value)

Set a long field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_long_array(self, field_name, values)

Set a long array field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_message(self, field_name, value)

Set a message field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_message_array(self, field_name, values)

Set a message array field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_opaque(self, field_name, value)

Set an opaque field in a message.

Parameters

field_name: The call sets this field
value: The call sets this value.

Return

def set_string(self, field_name, value)

Set a string field in a message.

Parameters

field_name : The call sets this field
value : The call sets this value.

Return

def set_string_array(self, field_name, values)

Set a string array field in a message.

Parameters

field_name: The call sets this field
values: The call sets this value.

Return

class EftlMessageSizeTooLarge (*args, **kwargs)

Raise when EftlMesssage size is larger than the maximum allowed size.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class EftlUnsupportedError (*args, **kwargs)

Raise when unsupported behavior is attempted.

Ancestors

  • builtins.Exception
  • builtins.BaseException