ChaskiRemote: Transparent Python Framework for Remote Method Invocation¶
ChaskiRemote is a transparent proxy python objects framework for remote method invocation, enabling transparent interaction with objects across distributed network nodes. Key classes include ChaskitProxy and ChaskiRemote, building upon the foundation provided by the ChaskiNode class. These classes facilitate the creation and management of proxies that allow remote method invocations, making distributed computations seamless.
Classes¶
ChaskiObjectProxying: Provides the ability to create proxy objects for remote method invocation transparently.
ChaskitProxy: Wraps an object allowing remote method invocation and attribute access as though the object were local.
ChaskiRemote: Extends ChaskiNode to create and manage proxies, enabling remote interactions and method invocations.
- class chaski.remote.ChaskiObjectProxying(obj: Any, *args: Any, **kwargs: Any)[source]¶
Bases:
objectThis class provides proxying capabilities for enabling remote method invocation transparently. It acts as an intermediary to forward method calls and attribute access to the real object being proxied.
Notes
The _special_names list contains names of special methods to be intercepted.
Custom __getattr__, __delattr__, and __setattr__ methods ensure delegation of attribute access and modification to the proxied object.
- __enter__() Any[source]¶
Enter the context manager.
This method is called when entering a context managed by ChaskiObjectProxying. It returns the proxied object, allowing it to be used within the context.
- Returns:
The proxied object instance.
- Return type:
Any
- __exit__(*args: Any, **kwargs: Any) None[source]¶
Exit the context manager.
This method is called when exiting a context managed by ChaskiObjectProxying. It performs any necessary cleanup, but in this implementation, it does nothing.
- __init__(name: str, obj: Any, instance: Any)[source]¶
Initialize an ChaskiObjectProxying instance.
This constructor sets up the necessary attributes for the ChaskiObjectProxying instance, allowing it to delegate method calls and attribute access to the proxied object.
- Parameters:
name (str) – The name of the proxy.
obj (Any) – The object that is being proxied.
instance (Any) – The instance of the class that contains the ChaskitProxy as a descriptor.
Notes
The attributes are set using object.__setattr__ to avoid triggering custom __setattr__ implementations of the proxied object.
- static __new__(cls, obj: Any, *args: Any, **kwargs: Any) Any[source]¶
Create a new instance of the class, potentially a class proxy.
This method attempts to use a class proxy cache to find or create a proxy class for the given object. If a proxy class already exists in the cache for the object’s class, it is used; otherwise, a new class proxy is created and cached.
- Parameters:
cls (type) – The class being instantiated.
obj (Any) – The object that needs to be proxied.
*args (Any) – Additional positional arguments.
**kwargs (Any) – Additional keyword arguments.
- Returns:
A new instance of the class proxy for the given object.
- Return type:
Any
- classmethod _create_class_proxy(theclass: type) type[source]¶
Create a proxy class for the given class type.
This method generates a proxy class that wraps the methods of the specified class type (theclass). This allows for method calls on the class to be intercepted and processed through the proxy mechanism.
- Parameters:
cls (type) – The proxy class that is being constructed.
theclass (type) – The original class type for which the proxy is to be created.
- Returns:
A new proxy class that wraps the specified class type.
- Return type:
type
Notes
This method defines a make_method inner function that handles the invocation of methods, ensuring that the chain of proxied method calls is properly managed. If an exception occurs during method invocation, the processor_method of the parent proxy is called instead.
- class chaski.remote.ChaskiProxy(name: str, node: Any | None = None, obj: Any | None = None, edge: Any | None = None, chain: list[str] | None = None, root: ChaskiProxy | None = None)[source]¶
Bases:
objectA class that represents a proxy for remote method invocation.
The ChaskitProxy class facilitates interaction with a remote object as if it were local. It supports dynamic attribute access and method invocation, enabling seamless distributed computations.
- __get__(instance: Any, owner: Any) ChaskiObjectProxying[source]¶
Retrieve the proxied attribute for the instance.
This method is called when an attribute is accessed on an instance of a class that contains a ChaskitProxy as a descriptor. It returns an ChaskiObjectProxying instance that acts as an intermediary, allowing dynamic retrieval of the proxied object’s attribute.
- Parameters:
instance (Any) – The instance of the class from which the ChaskitProxy is being accessed.
owner (Any) – The owner class of the instance.
- Returns:
An ChaskiObjectProxying instance that will delegate attribute access to the underlying proxied object.
- Return type:
- __getattr__(attr: str) Any[source]¶
Retrieve the attribute of the proxy object.
This method appends the requested attribute to the chain of attributes being accessed on the proxied object. If the attribute starts with an underscore, it will be ignored (commonly used for internal variables).
- Parameters:
attr (str) – The name of the attribute to retrieve.
- Returns:
The proxy object itself, allowing for chained attribute access.
- Return type:
Any
- __init__(name: str, node: Any | None = None, obj: Any | None = None, edge: Any | None = None, chain: list[str] | None = None, root: ChaskiProxy | None = None)[source]¶
Initialize a ChaskiProxy instance.
- Parameters:
name (str) – The name of the proxy.
node (Any, optional) – The node associated with the proxied object. Default is None.
obj (Any, optional) – The object that is being proxied. Default is None.
edge (Any, optional) – The edge associated with the proxied object. Default is None.
chain (list[str], optional) – The chain of attribute names being accessed on the proxied object. Default is None.
root (ChaskiProxy, optional) – The root proxy reference, if this is a nested proxy. Default is None.
Notes
The chain parameter is used to keep track of the sequence of attribute names accessed on the proxied object. If no chain is provided, the proxy’s name will be used as the initial chain.
- __repr__() str[source]¶
Provide a string representation of the ChaskiProxy instance.
This method returns a string that describes the ChaskiProxy instance, including its name, whether it is a remote or local proxy, and the address of the associated node.
- Returns:
A string representation of the ChaskiProxy instance.
- Return type:
str
Notes
This string representation is useful for debugging and logging purposes.
- _cleanup_dynamic_attribute(attr) None[source]¶
Clean up the attributes of the proxy class.
This method attempts to remove the specified attribute from the proxy class, effectively cleaning up dynamically added attributes.
- Parameters:
attr (str) – The name of the attribute to be cleaned up from the proxy class.
Notes
This method uses delattr to remove the attribute. If the attribute does not exist, the method simply passes without raising an error.
- _object(obj_chain: list[str]) Any[source]¶
Retrieve the object specified by the chain of attribute names.
This method traverses the chain of attribute names provided as obj_chain on the proxied object and returns the final object obtained after the traversal.
- Parameters:
obj_chain (list of str) – A list of attribute names to be accessed sequentially on the proxied object.
- Returns:
The final object obtained after traversing the attribute chain on the proxied object.
- Return type:
Any
Notes
This method is primarily used internally by the proxy mechanism to dynamically access attributes of the proxied object.
- _reset() None[source]¶
Reset the attribute chain to its initial state.
This method reinitializes the self._chain attribute, which tracks the sequence of attribute accesses on the proxied object. It sets the chain back to its initial state, containing only the name of the root attribute. This is typically used after a method invocation to ensure that subsequent attribute accesses start from the root.
- processor_method(instance=None, args: tuple | None = None, kwargs: dict | None = None) Any[source]¶
Process the method invocation on the proxied object.
This method constructs a data dictionary containing details of the method invocation, such as the name of the proxy service, the chain of object attributes, the positional and keyword arguments, and the timestamp of the request. It then performs the asynchronous request to invoke the method on the remote object.
- Parameters:
args (tuple, optional) – Positional arguments to be passed to the remote method. Default is None.
kwargs (dict, optional) – Keyword arguments to be passed to the remote method. Default is None.
- Returns:
The result of the method invocation on the remote object. The return value will be deserialized from the response obtained from the remote call.
- Return type:
Any
- Raises:
Exception – If an exception is encountered during the execution of the remote method, it will be raised.
Notes
This method uses asyncio’s event loop to synchronously wait for the completion of the asynchronous remote request. The execution is blocked until the remote method call completes and the result is returned or an exception is raised.
- class chaski.remote.ChaskiRemote(available: str | None = None, *args: tuple[Any, ...], **kwargs: dict[str, Any])[source]¶
Bases:
ChaskiNodeRepresents a remote Chaski node.
The ChaskiRemote class extends the ChaskiNode class to enable the creation of proxies that facilitate remote method invocations. It maintains a dictionary of proxy objects associated with the services to be accessed remotely.
- __init__(available: str | None = None, *args: tuple[Any, ...], **kwargs: dict[str, Any])[source]¶
Initialize a ChaskiRemote instance.
This constructor initializes a ChaskiRemote node, inheriting from the ChaskiNode base class. It also sets up a dictionary to hold proxy objects associated with services to be remotely accessed.
- Parameters:
available (str, optional) – A string indicating available services for the remote node.
*args (tuple of Any) – Positional arguments to be passed to the parent ChaskiNode class.
**kwargs (dict of {str: Any}) – Keyword arguments to be passed to the parent ChaskiNode class.
- __repr__() str[source]¶
Represent the ChaskiRemote node as a string.
This method returns a string representation of the ChaskiRemote node, indicating its address. If the node is paired, the address is prefixed with an asterisk (*).
- Returns:
The representation of the ChaskiRemote node, optionally prefixed with an asterisk if paired.
- Return type:
str
- async _call_obj_by_proxy(**kwargs: dict[str, Any]) Any[source]¶
Asynchronously call a method on a proxied object with provided arguments.
This method performs an asynchronous remote method invocation using the proxy. It logs the call and retrieves the result from the proxied object.
- Parameters:
kwargs (dict) –
- A dictionary containing the following keys:
- ’name’: str
The name of the proxy service.
- ’obj’: list of str
A chain of object attributes to traverse for the method call.
- ’args’: tuple
Positional arguments to pass to the method.
- ’kwargs’: dict
Keyword arguments to pass to the method.
- ’timestamp’: datetime
The timestamp representing when the request was initiated.
- Returns:
The result of the remote method call based on the proxied service.
- Return type:
Any
Notes
This method uses async calls and expects the proxied methods to be asynchronous. The call is logged with the service name, method, and arguments.
- async _connect_to_peer(node: ChaskiNode, peer_port: int | None = None, paired: bool = False, data: dict = {}) None¶
Asynchronously establish a TCP connection to a peer node.
Initiate a TCP connection to the specified peer node. If a connection is already established, or if the target node is the same as the current one, the function will produce a warning and not proceed further. This function also supports marking a connection as ‘paired’, updating corresponding state information about the peer node.
- Parameters:
node ('ChaskiNode') – The target node instance or the ip string to connect to.
peer_port (Optional[int]) – The port number of the target node if the node parameter is not a ChaskiNode instance.
paired (bool) – Flag indicating whether the connection should be marked as ‘paired’.
data (dict) – Additional data to include in the report_paired command if the connection is paired.
- async _connected(reader: StreamReader, writer: StreamWriter) None¶
Handle an incoming TCP connection.
This coroutine is called when a new TCP connection is established. It will create a new Edge instance representing this connection and start listening to incoming messages from the peer.
- Parameters:
reader (asyncio.StreamReader) – The StreamReader object to read data from the connection.
writer (asyncio.StreamWriter) – The StreamWriter object to write data to the connection.
- async _generic_request_udp(callback: str, kwargs: dict[str, Any] = {}, edge: Edge | None = None) Any¶
Make a generic UDP request to a peer node and await the response.
This method sends a UDP request with a specified callback function and additional keyword arguments. It generates a unique identifier for the request, sends the message, and waits for a response from the peer node. The response is then retrieved and returned as the result of the method call.
- Parameters:
callback (str) – The name of the method to call on the peer node.
kwargs (dict[str, Any], optional) – A dictionary of keyword arguments to pass to the callback method on the peer node.
- Returns:
The response data received from the peer node.
- Return type:
Any
- Raises:
asyncio.TimeoutError – If the response is not received within a specified timeout period.
Notes
This method is typically used for internal communication between nodes in the network. It helps in sending requests and receiving responses asynchronously over UDP.
- _get_status(**kwargs) dict¶
Retrieve the status of the node.
This method compiles and returns a dictionary containing the current status details of the node. The status includes information about the node’s paired events for each subscription, whether the server is closing, and whether the node is in the process of attempting to reconnect.
- Parameters:
**kwargs (dict, optional) – Additional status information that can be passed as key-value pairs and will be included in the returned status dictionary.
- Returns:
A dictionary containing the status details of the node. The keys include: - ‘paired’: A dictionary where keys are subscription topics and values are boolean
indicating whether the node is paired for that subscription.
’serving’: Boolean value indicating whether the server is closing (False) or not (True).
- ’reconnecting’: Boolean value indicating whether the node is currently attempting to
reconnect to a peer (True) or not (False).
- Return type:
dict
- async _keep_alive(interval: int = 7) None¶
Keep the node alive by sending periodic heartbeat messages.
This coroutine sends a heartbeat message at defined intervals to ensure that the connection remains active. It is useful for preventing timeouts and maintaining the session state.
- Parameters:
interval (int) – The number of seconds between each heartbeat message.
- async _loop_message(message: Message, edge: Edge) None¶
Asynchronously process a received message and invoke the appropriate handler.
This method acts as a dispatcher, delegating the received message to a specific method based on the command of the message. It utilizes dynamic method resolution to determine the handler for each command. If no specific handler is found for the command, a warning is logged indicating the missing processor.
- async _ping(server_edge: Edge, delay: float = 0, latency_update: bool = True, size: int = 0) None¶
Send a ping message to measure latency and connectivity.
This method sends a single ping message to a specified edge or to all server pairs if no edge is specified. It also allows for setting a size for the payload in bytes and a delay before sending the ping. If the response option is true, a pong message will be sent back immediately after receiving a ping.
- Parameters:
server_edge (Edge, optional) – The edge (network connection) to ping. If provided, the ping will be sent only to this edge. If None, pings will be sent to all server_pairs.
delay (float, optional) – The delay in seconds before sending the ping message. Defaults to 0 seconds.
latency_update (bool, optional) – If True, the latency information for the edge will be updated based on the ping response. Defaults to True.
size (int, optional) – The size of the dummy payload data in bytes to be included in the ping message. Defaults to 0 bytes, meaning no additional data is sent.
- async _process_discovery(message: Message, edge: Edge | None = None) None¶
Processes a network discovery message and propagates it if necessary.
This method is responsible for processing discovery messages as part of a network-wide search for ChaskiNodes with matching subscriptions. The method checks if the message should be propagated based on the TTL and visited nodes. If the current node’s subscriptions match the origin’s, a connection is attempted. Otherwise, the discovery message is forwarded to other ChaskiNodes, avoiding nodes that have already been visited.
- Parameters:
message (Message) – The discovery message containing details about the discovery process, including the sender’s information, visited nodes, and TTL.
edge (Optional[Edge], optional) – The edge where the discovery message was received from. It may be used to avoid sending the discovery message back to the sender. Defaults to None.
- async _process_handshake(message: Message, edge: Edge) None¶
Process a handshake command received from a peer node.
This coroutine is triggered upon receiving a handshake message, indicating an initiation of communication protocol by another ChaskiNode. It prepares and sends a handshake response back to the origin node to acknowledge the handshake and completes the two-way communication setup.
- Parameters:
message (Message) – The handshake message received, containing the timestamp and data that includes the peer’s name, ip, port, and subscription information.
edge (Edge) – The edge associated with the peer node from which the handshake message was received, representing the communication connection to the peer.
- async _process_handshake_back(message: Message, edge: Edge) None¶
Handle a handshake response (back) from a peer node after an initial handshake.
This coroutine is invoked upon receiving a handshake response from a peer node in the network. It updates the edge information with the name, ip, port, and subscriptions of the responding node and adds the edge to the server’s active connections list.
- async _process_ping(message: Message, edge: Edge) None¶
Handle incoming ping messages and optionally send a pong response.
When a ping message is received, this method processes the message and sends a pong response back to the sender if requested. The method updates the edge’s latency measurements based on the round trip time of the ping-pong exchange if the latency_update flag in the message is set to True. It also sets the edge’s name, ip, port, and subscriptions based on the information received in the pong message.
- async _process_pong(message: Message, edge: Edge) None¶
Process a pong message and update edge latency measurements.
This coroutine is triggered when a pong message is received in response to a ping request. It uses the time difference between the pong message’s timestamp and the current time to calculate the round-trip latency. If the ‘latency_update’ flag in the message data is True, this latency value will be used to update the edge’s latency statistics. Additionally, the edge’s identifying information such as name, ip, and port is updated based on the pong message data.
- async _process_report_paired(message: Message, edge: Edge) None¶
Process a ‘report_paired’ network message.
This method gets executed when a ‘report_paired’ command is received, indicating that a pairing action has occurred. Depending on the ‘on_pair’ behavior specified in the message, the node may disconnect after pairing or take no action.
- Parameters:
message (Message) – The message instance containing the ‘report_paired’ command and associated data, such as pairing status and actions to take upon pairing.
edge (Edge) – The edge from which the ‘report_paired’ message was received. It provides context for where to apply the action specified in the message data.
- async _process_request_udp(message: Message, edge: Edge) None¶
Process an incoming UDP request and dispatch a response.
This method handles the reception of a UDP request message. It uses the callback function specified in the message to generate a response and then sends this response back to the requester.
- async _process_response_udp(message: Message, edge: Edge) None¶
Process a response to a UDP request.
This method is invoked when a response to a UDP request is received. It extracts the corresponding request ID from the message, retrieves the response data, and sets the corresponding event to indicate that the response has been processed.
- async _process_udp_message(data: bytes, addr: Tuple[str, int]) None¶
Process incoming UDP messages routed to this node’s UDP server.
This asynchronous handler is called when the UDP server receives a new message. It deserializes the received bytes back into a message object and processes it according to the command it contains. The method handles ‘status’ and ‘response’ commands used for node status checks and responses.
- Parameters:
data (bytes) – The raw bytes received from the UDP client.
addr (Tuple[str, int]) – A tuple containing the sender’s IP address as a string and the port number as an integer.
- Raises:
ValueError – If the received message cannot be processed or contains an invalid command not supported by the node.
- async _reader_loop(edge: Edge) None¶
Listen and process messages from a given edge in the network.
This asynchronous method is the main loop for managing incoming messages from the connected peer node represented by the provided ‘edge’. It constantly reads data from the StreamReader of the edge until the connection is closed, or an error is encountered. It handles framing, deserialization, and dispatching of the messages using the ‘_process_message’ coroutine for further handling.
- Parameters:
edge (Edge) – The network edge (connection) object from which the messages are read and processed. It contains the StreamReader and StreamWriter for network I/O.
- async _remove_closing_connection() None¶
Identify and remove server pairs that have closed connections.
This coroutine iterates through the server pairs of the ChaskiNode instance and filters out any edges where the StreamWriter’s associated connection is determined to be closed. This serves to maintain an accurate list of active connections on the server and ensures that operations are not attempted on closed connections.
- async _request_status(dest_ip: str, dest_port: int) Message¶
Request the status of a node via UDP and wait for a response.
This asynchronous method sends a UDP message to the target ip and port, requesting its status. It generates a unique identifier for the request, sends the message, and then waits for a response that matches the identifier. Once the response is received, it is returned as a Message object.
- Parameters:
dest_ip (str) – The IP address of the destination node to query for status.
dest_port (int) – The port number of the destination node to communicate the status request.
- Returns:
The status response message from the destination node, containing information such as whether it is paired and actively serving.
- Return type:
- async _send_udp_message(command: str, message: Any, dest_ip: str, dest_port: int) None¶
Send a UDP message to the specified destination ip and port.
This coroutine sends a pre-formatted message over UDP to a given destination ip and port. It serializes the message, which includes a command and its associated data, before transmission. This method is utilized for communication protocols that require UDP for message passing, like status checks or discovery procedures.
- Parameters:
command (str) – The command type that dictates the kind of operation to perform, included in the message.
message (Any) – The payload associated with the command that contains data necessary for carrying out the operation.
dest_ip (str) – The destination IP address to which the message will be sent.
dest_port (int) – The port number on the destination host to which the message should be directed.
- async _start_tcp_server() None¶
Configure and start the asyncio TCP server.
A coroutine that sets up and starts the asyncio TCP server on the ip and port attributes of the ChaskiNode instance. The server will handle incoming client connections using the ‘connected’ coroutine as the protocol factory. In addition, a background keep-alive task is started to manage node heartbeat and connectivity. The server will run until explicitly stopped or an unhandled exception occurs.
- async _start_udp_server() None¶
Start an asyncio UDP server to handle incoming datagrams.
This coroutine is responsible for creating and binding a UDP socket to listen for incoming datagram messages. It then creates a UDP protocol endpoint, providing mechanics for handling UDP communications. The protocol handler, defined by the UDPProtocol class, specifies how incoming datagrams and error events are processed.
- Raises:
ValueError – If the address provided for the UDP socket can’t be resolved.
- async _test_generic_request_udp(echo_data: dict[str, Any] = {}) Any¶
Send a test UDP request and await the response.
This method constructs and sends a generic UDP request with the provided echo_data to a peer node and waits for the response. It is useful for testing the UDP communication mechanism between nodes.
- Parameters:
echo_data (dict, optional) – A dictionary of data to be included in the UDP request. The default is an empty dictionary.
- Returns:
The response data received from the peer node.
- Return type:
Any
- async _test_generic_response_udp(**echo_data: Any) Any¶
Respond to a test UDP request by echoing the received data.
This coroutine processes a generic UDP request for testing purposes. It simply echoes back the provided echo_data to the requester. This method can be used to verify the correct handling of UDP requests and responses between nodes.
- Parameters:
**echo_data (dict) – A dictionary of data received in the UDP request. This data will be echoed back in the response.
- Returns:
The echo_data dictionary received in the request, echoed back to the requester.
- Return type:
dict
- async _verify_availability(module: str, edge=None) Any[source]¶
Verify the availability of a specified module across connected nodes.
This asynchronous method checks if a specified module is available on any of the connected edges (nodes) in the network. It sends a generic UDP request to each edge to verify if the module is available for remote interaction.
- Parameters:
module (str) – The name of the module to check for availability.
edge (Optional[str]) – The specific edge to check for the module’s availability. If not provided, checks all edges.
- Returns:
The edge where the module is available if found, otherwise False.
- Return type:
Any
Notes
This method iterates through all connected edges and sends a UDP request to verify the module’s availability. It returns the first edge that confirms the module’s presence or False if no such edge is found.
- async _verify_module(**kwargs: dict[str, Any]) Any[source]¶
Verify the availability of a specified module and register it if available.
This method checks whether a given module is available for import on the remote node. If the module can be successfully imported, it registers the module as a service with the node. It logs the registration process and returns True if successful, or False otherwise.
- Parameters:
kwargs (dict) –
- A dictionary containing the following key:
- ’module’: str
The name of the module to verify and potentially register.
- Returns:
True if the module is successfully imported and registered, False otherwise.
- Return type:
bool
Notes
This method uses the importlib to dynamically load modules and asyncio to manage asynchronous operations.
- async _write(command: str, data: Any, edge: Edge | None = None, topic: str = 'All') None¶
Write data to the specified writer or all connected peers.
Sends a packaged message with a particular command and associated data to either a single specified writer or broadcast it to all connected server peers. The message includes the command name and data, which gets serialized before being sent. This method ensures the data is properly framed with its length for transmission over TCP.
- Parameters:
command (str) – The name of the command or type of the message to be sent.
data (Any) – The payload of the message, which may consist of any type of data compatible with the serializer.
writer (Optional[asyncio.StreamWriter], optional) – The stream writer to which the message should be sent. If None, the message will be sent to all server pairs. Defaults to None.
- async _write_data(data: bytes, edges: List[Edge] | None = None, excluded_edges: List[Edge] = []) None¶
Send serialized data to specified edges or all connected edges, excluding optional edges.
This method writes serialized data to the provided list of edges or all connected edges. It ensures that data is only sent to edges that are not closing their connections and have not been explicitly excluded.
- Parameters:
data (bytes) – The serialized data to be sent to the edges.
edges (Optional[List[Edge]], optional) – A list of Edge instances to which the data should be sent. If None, the data is sent to all edges. Defaults to None.
excluded_edges (List[Edge], optional) – A list of Edge instances to exclude from data transmission. Defaults to an empty list.
- Raises:
ConnectionResetError – If an edge connection is lost while writing data, an attempt to reconnect is made.
- add_propagation_command(command: str) None¶
Add a command to the list of commands that should be propagated.
This method appends a given command to the internal list of commands that are allowed to be propagated to other nodes. It ensures that each command is unique within the list by converting it to a set and back to a list.
- Parameters:
command (str) – The command to add to the propagation list. This command should be a string representing a specific type of message that can be propagated to other nodes.
- property address: str¶
Construct and retrieve the address string for the ChaskiRemote node.
This property method returns a formatted string representing the address of the ChaskiRemote node, showing its IP and port.
- Returns:
A formatted string in the form “ChaskiRemote@<IP>:<Port>” indicating the node’s address.
- Return type:
str
- async close_connection(edge: Edge, port: int | None = None) None¶
Close the connection associated with a given edge, optionally specifying a port.
This coroutine handles the termination of a network connection that corresponds to the provided edge. If a port number is specified, the connection to that port will be closed. All resources associated with the connection, such as stream writers, are properly finalized. If the current node ends up without any connections, a warning is logged, and an attempt to reconnect is made.
- Parameters:
edge (Edge) – The edge object representing the network connection to be closed. If a port is specified, only the connection to that port is closed.
port (Optional[int]) – An optional port number to specifically close the connection to. If None, all connections associated with the edge are closed.
- async connect(address_or_ip_or_node: str | ChaskiNode, port: int | None = None, discovery=False, discovery_timeout=0.5) Edge | None¶
Establish a connection to the specified node or address.
This method initiates a TCP connection to a specified node or an IP address and port. It leverages the _connect_to_peer method to create the connection. The input can be an instance of ChaskiNode, a string representing the IP address, or an address string in the format “ip:port” or “[ipv6]:port”.
- Parameters:
address_or_ip_or_node (Union[str, ChaskiNode]) – The target node instance or IP string to connect to. Acceptable formats include: - ChaskiNode instance - IP address string (e.g., “192.168.1.1”) - Address string with port (e.g., “192.168.1.1:65432” or “[2001:db8::1]:65432”)
port (Optional[int]) – The port number of the target node if an IP address string is provided. Ignored if address_or_ip_or_node includes port information or is a ChaskiNode instance.
- Raises:
ValueError – If the address cannot be resolved.
- Returns:
Optional[Edge]
The matching Edge instance if found and discovery is False; otherwise, None.
- deserializer(data: bytes) Any¶
Deserialize the provided data using the configured deserializer.
This method attempts to convert a bytes object back into its original form (such as an object, string, or other data structures) using the deserializer function specified at the node’s initialization.
- Parameters:
data (bytes) – The serialized data as a bytes object that needs to be deserialized.
- Returns:
The deserialized data, converted back to its original form. This can be any data type that was originally serialized.
- Return type:
Any
Notes
If the deserialization process fails, this method returns the raw data as a fallback.
- disable_message_propagation() None¶
Disable the message propagation feature for the node.
This method turns off the node’s ability to forward received messages to other connected nodes. Once disabled, the node will stop relaying messages but will continue to receive and process incoming messages.
- async discovery(node: ChaskiNode | None = None, on_pair: str | Literal['none', 'disconnect'] = 'none', timeout: int | float = 10) None¶
Conducts a network-wide discovery process.
Executes a discovery process across the network to find and potentially connect with other ChaskiNodes. This function is used to find nodes with overlapping subscriptions to establish a peer-to-peer connection. It allows the node to expand its network by connecting to more nodes, which may be of interest based on the subscriptions. Depending on the ‘on_pair’ setting, nodes may connect permanently or just acknowledge the presence of each other.
- Parameters:
node (Optional['ChaskiNode'], optional) – A reference to a ChaskiNode instance to start the discovery process from. If None, discovery will be attempted using the current node’s server pairs. Defaults to None.
on_pair (Union[str, Literal['none', 'disconnect']], optional) – The action to take when a peer is discovered. ‘none’ means no action is taken, while ‘disconnect’ causes the node to disconnect after pairing. Defaults to ‘none’.
timeout (int, optional) – The maximum time in seconds to wait for the discovery process to complete before considering the node as paired. Defaults to 10 seconds.
- enable_message_propagation() None¶
Enable the message propagation feature for the node.
When this method is called, it sets the propagate attribute to True. This allows the node to forward received messages to other connected nodes. By enabling message propagation, the node helps in disseminating messages across the network, except for the edge from which the message was received.
- get_edge(ip: str, port: int) Edge | None¶
Retrieve an existing edge by its ip and port.
This method looks up and returns an Edge instance that matches the given ip and port. If no such edge is found, it returns None.
- Parameters:
ip (str) – The IP address of the edge to find.
port (int) – The port number of the edge to find.
- Returns:
The Edge instance with the specified ip and port if found, else None.
- Return type:
Optional[Edge]
- get_edge_by_name(node_name: str) Edge | None¶
Retrieve an existing edge by its node name.
This method looks up and returns an Edge instance that matches the given node name. If no such edge is found, it returns None.
- Parameters:
node_name (str) – The name of the node to find.
- Returns:
The Edge instance with the specified node name if found, else None.
- Return type:
Optional[Edge]
- get_free_port() int¶
Get a free port for the node to use.
This method creates a temporary socket to bind to a port with value 0, which causes the operating system to allocate an available port. Once the socket is bound, the port number assigned by the operating system is retrieved and the socket is closed. This port number can be used for subsequent network operations requiring a free and available port.
- Returns:
The port number assigned by the operating system that is free and available for use.
- Return type:
int
- async handshake(server_edge: Edge, delay: float = 0, response: bool = False)¶
Initiate or respond to a handshake with the given edge.
This method sends a handshake message to the specified edge and optionaly awaits for a handshake response. It is used to initiate or confirm a connection establishment between two ChaskiNodes.
- Parameters:
server_edge (Edge) – The edge instance to which the handshake message is to be sent.
delay (float, optional) – The amount of time (in seconds) to wait before sending the handshake message.
back_delay (TODO) –
response (bool, optional) – Indicates whether a response is expected. Set to True if waiting for a handshake back.
- is_connected_to(node: ChaskiNode) bool¶
Check if this node is connected to another specified node.
Determines whether the current ChaskiNode instance has an established TCP connection with the given node. It checks the server pairs list for a matching ip and port pair to confirm connectivity.
- Parameters:
node (ChaskiNode) – The node to check for connectivity with the current node.
- Returns:
True if the current node is connected to the specified node; otherwise, False.
- Return type:
bool
- is_port_available(port: int) bool¶
Check if a specific port is available for use.
This method attempts to bind to a given port to determine if it is available for use. It creates a temporary socket and tries to bind it to the specified port on the current node’s ip. If the binding is successful, the port is considered available. Otherwise, it is in use.
- Parameters:
port (int) – The port number to check for availability.
- Returns:
True if the port is available; False if the port is already in use.
- Return type:
bool
- Raises:
OSError – If the port binding operation encounters an error other than the port being in use.
- property paired: bool¶
Check if the node is paired for all its subscriptions.
This property returns a boolean value indicating whether the node is paired for all the subscriptions it subscribes to.
- Returns:
True if the node is paired for all subscriptions, otherwise False.
- Return type:
bool
- paired_for(subscription) bool¶
- async ping(server_edge: Edge | None = None, size: int = 0) None¶
Send ping messages to one or all connected edges.
This method sends a ping message either to a specified edge or broadcasts it to all connected edges in the server_pairs list. It is used to measure network latency and can be used to ensure connectivity. The method allows specifying the size of each ping message and the number of times it should be repeated.
- Parameters:
server_edge (Optional[Edge], optional) – The specific edge to which the ping message should be sent. If None, the ping message is sent to all edges in the server_pairs list. Defaults to None.
size (int, optional) – The size of the dummy data to be sent with the ping message in bytes. This allows simulating payload sizes and their effect on latency. Defaults to 0.
- async propagate(message: Message, source_edge: Edge) None¶
Propagate a message to other connected edges, excluding the source edge.
This method is used to propagate messages received by a node to other connected edges in the network, excluding the edge from which the message was received. This helps in disseminating the message across the network while preventing immediate looping of the message back to its origin.
- Parameters:
Notes
The message’s TTL (Time-to-Live) will be decremented by one. If the TTL reaches zero or less, the message will not be propagated further.
The message’s UUID will be appended to the message pool to track its propagation and avoid duplicate processing.
- proxy(module: str, edge=None) ChaskiProxy[source]¶
Retrieve a proxy object for the specified service name.
This asynchronous method obtains a proxy associated with a given service name. The proxy can be used to remotely invoke methods on the registered service.
- Parameters:
module (str) – The name of the service/module to retrieve a proxy for.
edge (Optional[str]) – The specific edge to connect with for the module’s availability. Default is None.
- Returns:
The proxy object associated with the specified service name.
- Return type:
ChaskitProxy
- register_module(module: str, service: Any) None[source]¶
Register a service with a proxy.
This method registers a service with the node by associating it with a proxy. The proxy can then be used to remotely invoke methods on the registered service.
- Parameters:
module (str) – The name to associate with the service.
service (Any) – The service object to register. This object can have methods that will be accessible remotely via the proxy.
- async remove_duplicated_connections() None¶
Remove duplicate connections from the server pairs.
Iterates over the list of server pairs and closes connections that have the same ip and port as an already seen connection. This ensures that each peer is connected to the node only once, avoiding redundant connections.
- remove_propagation_command(command: str) None¶
Remove a command from the list of commands that should be propagated.
This method removes a given command from the internal list of commands that are allowed to be propagated to other nodes. It ensures that the command is removed if it exists in the list. If the command does not exist in the list, the method completes silently without any changes.
- Parameters:
command (str) – The command to remove from the propagation list. This command should be a string representing a specific type of message that was previously added for propagation to other nodes.
- async request_ssl_certificate(ca_address: str) None¶
Request an SSL certificate from a Certificate Authority (CA).
This coroutine initiates a request for an SSL certificate from the specified Certificate Authority (CA) address. It generates a Certificate Signing Request (CSR), sends it to the CA, and waits for the CA to sign the CSR and return the signed certificate. Additionally, it retrieves the CA’s certificate and updates the node’s SSL context.
- Parameters:
ca_address (str) – The address of the Certificate Authority (CA) node to request the SSL certificate from.
- Raises:
Exception – If there is an error during the SSL certificate request or signing process.
Notes
The method performs the following steps: 1. Generates a CSR locally. 2. Establishes a connection with the CA node. 3. Sends the CSR to the CA for signing. 4. Receives the signed certificate and CA’s certificate from the CA. 5. Writes the signed certificate and CA’s certificate to disk. 6. Updates the SSL context of the node with the new certificate. 7. Closes the connection with the CA node. 8. Restarts the node with the newly updated SSL context.
- async run() None¶
Launch TCP and UDP servers for the node.
This coroutine starts the TCP and UDP server tasks to listen for incoming connections and handle UDP datagrams. It is an essential part of the node’s operation, enabling it to accept connections from other nodes and exchange messages over the network.
- serialize_message(message: Message) bytes¶
Serialize a Message instance into a bytes object.
This method takes a Message instance and converts it into a bytes object that can be transmitted over the network. It first serializes the message and its topic, and then prepends the lengths of these serialized components in bytes for proper framing.
- Parameters:
message (Message) – The message object to be serialized. It contains the command, topic, data, timestamp, TTL, and UUID associated with the message.
- Returns:
A bytes object that represents the serialized message. The format ensures both the message and its topic can be accurately reconstructed on the receiving end.
- Return type:
bytes
- serializer(obj: Any) bytes¶
Serialize the provided object using the configured serializer.
This method converts a Python object into a bytes representation using the serializer function specified at node initialization. This is typically used for preparing data to be sent over the network.
- Parameters:
obj (Any) – The Python object to be serialized. This can be any serializable object, such as dictionaries, lists, or custom objects.
- Returns:
The serialized representation of the input object as bytes.
- Return type:
bytes
Notes
If the serialization process fails, this method returns the string representation of the object as a fallback.
- property status: dict¶
Retrieve the current status of the ChaskiNode.
This property compiles and returns a dictionary containing the current status details of the node. The status includes information about the node’s paired events for each subscription, whether the server is closing, and whether the node is in the process of attempting to reconnect.
- Returns:
A dictionary containing the status details of the node. The keys include: - ‘paired’: A dictionary where keys are subscription topics and values are boolean
indicating whether the node is paired for that subscription.
’serving’: Boolean value indicating whether the server is closing (False) or not (True).
- ’reconnecting’: Boolean value indicating whether the node is currently attempting to
reconnect to a peer (True) or not (False).
- Return type:
dict
- async stop() None¶
Stop all activities of the node, ensuring proper cleanup.
This coroutine is responsible for gracefully stopping all network services of the node. It closes both TCP and UDP connections, cancels background tasks such as keep-alive checks, and finalizes any pending operations. After invoking this function, the node will no longer serve as part of the network until restarted.
- subscribe(subscriptions: List[str], paired: bool = False) None¶
Subscribe the node to specified topics.
This method allows the node to subscribe to additional topics for receiving messages. If the node is already paired for the given subscriptions, it does not duplicate the subscriptions.
- Parameters:
subscriptions (List[str]) – A list of subscription topics to which the node should subscribe.
paired (bool) – A flag indicating whether the subscriptions should be marked as paired.
Notes
Subscriptions are tracked within the node, allowing it to receive relevant messages from other nodes that share similar interests or topics.
Examples
>>> node = ChaskiNode() >>> node.subscribe(['topic1', 'topic2'], paired=True) >>> assert 'topic1' in node.subscriptions >>> assert 'topic2' in node.subscriptions
- track_task(coro)¶
- async try_to_reconnect(edge: Edge) None¶
Continuously attempt to reconnect to a given edge.
This coroutine will retry to establish a TCP connection with the specified edge in case the initial connection has been lost. The attempts will be made at 1-second intervals until a successful connection is established or the coroutine is explicitly cancelled. This method is useful for maintaining persistent connections in a network of ChaskiNodes.
- Parameters:
edge (Edge) – The edge to which the reconnection attempts will be made. This represents the lost connection that needs to be restored.
- uuid() str¶
Generate a unique identifier for the node.
This method generates and returns a universally unique identifier (UUID) as a string. The UUID is used to uniquely identify objects and events within the network communication context, ensuring that each identifier is distinct across the distributed node system.
- Returns:
A string representation of a UUID, which can be used to uniquely identify an object, event, or message within the node network.
- Return type:
str
Notes
The UUID generation is based on Python’s uuid module, which provides a way to create universally unique identifiers compliant with RFC 4122.
See also
uuid.uuid4Generates a random UUID.
References