ChaskiNode: Distributed Node Communication and Management

This module defines the ChaskiNode class and its associated classes for managing network communication between distributed nodes. It provides a framework for creating Nodes which can connect to each other over TCP/IP, handle messaging and serialization of data, and perform network-wide functions, such as discovery and pairing of nodes based on shared subscriptions.

Classes

  • MessagesPool: Manages a collection of messages with a fixed maximum size, ensuring thread safety.

  • Edge: Represents a connection to a peer in the network, managing the input/output streams and storing metadata such as latency, jitter, and subscription topics.

  • Message: A container class for messages, packing together the command, data, and timestamp information.

  • UDPProtocol: An asyncio protocol class for handling UDP packets, interfacing with the asyncio Datagram Protocol.

  • ChaskiNode: The main class representing a node in the network, which can initiate connections, handle incoming requests, and orchestrate network-wide actions.

class chaski.node.ChaskiNode(ip: str = '127.0.0.1', port: int = 0, serializer: ~typing.Callable[[~typing.Any], bytes] = <built-in function dumps>, deserializer: ~typing.Callable[[bytes], ~typing.Any] = <built-in function loads>, name: str | None = None, subscriptions: str | ~typing.List[str] = [], run: bool = True, ttl: int = 64, root: bool = False, paired: bool = False, max_connections: int = 5, reconnections: int = 32, messages_pool_maxzise: int = 128, message_propagation: bool = False, keep_alive: bool = True, ssl_context_client: ~ssl.SSLContext | None = None, ssl_context_server: ~ssl.SSLContext | None = None, ssl_certificate_attributes: dict | None = {}, ssl_certificates_location: str | None = None, request_ssl_certificate: str | None = None)[source]

Bases: object

Represents a ChaskiNode for distributed network communication.

The ChaskiNode class orchestrates the management of network communication between distributed nodes. It can initiate, handle incoming requests, and manage connections. The node is capable of:

  • Creating TCP and UDP endpoints.

  • Performing message serialization and deserialization.

  • Implementing automatic network discovery and pairing based on subscriptions.

__init__(ip: str = '127.0.0.1', port: int = 0, serializer: ~typing.Callable[[~typing.Any], bytes] = <built-in function dumps>, deserializer: ~typing.Callable[[bytes], ~typing.Any] = <built-in function loads>, name: str | None = None, subscriptions: str | ~typing.List[str] = [], run: bool = True, ttl: int = 64, root: bool = False, paired: bool = False, max_connections: int = 5, reconnections: int = 32, messages_pool_maxzise: int = 128, message_propagation: bool = False, keep_alive: bool = True, ssl_context_client: ~ssl.SSLContext | None = None, ssl_context_server: ~ssl.SSLContext | None = None, ssl_certificate_attributes: dict | None = {}, ssl_certificates_location: str | None = None, request_ssl_certificate: str | None = None) None[source]

Represent a ChaskiNode, which handles various network operations and manages connections.

ChaskiNode is responsible for creating TCP and UDP endpoints, handling incoming connections, and executing network commands. It manages a list of edges, which are connections to other nodes, and performs message serialization and deserialization for network communication. The node can also participate in network-wide actions like discovery, to find and connect with nodes sharing similar subscriptions.

Parameters:
  • ip (int) – The IP address to listen on or bind to.

  • ip – The port number to listen on or bind to.

  • serializer (Callable[[Any], bytes], optional) – The function to serialize data before sending it over the network. Defaults to pickle.dumps.

  • deserializer (Callable[[bytes], Any], optional) – The function to deserialize received data. Defaults to pickle.loads.

  • name (Optional[str], optional) – The name of the node, used for identification and logging purposes. Defaults to None.

  • subscriptions (Union[str, List[str]], optional) – A string or list of subscription topic strings this node is interested in. Defaults to an empty list.

  • run (bool, optional) – A flag determining whether the TCP and UDP servers should start immediately upon the node’s initialization. Defaults to False.

  • ttl (int, optional) – Time-to-live value for discovery messages. Defaults to 64.

  • root (bool, optional) – Flag to indicate whether this node is the root in the network topology. Defaults to False.

  • reconnections (int, optional) – The number of reconnection attempts to make if a node loses connection. Defaults to 32.

  • messages_pool_maxzise (int, optional) – The maximum size allowed for the message pool, determining how many messages can be stored in the pool before it starts removing the oldest ones. Defaults to 128.

  • message_propagation (bool, optional) – Flag to indicate whether the node should propagate received messages to other nodes. If set to True, messages received by this node will be forwarded to other connected nodes except for the edge it received the message from, helping in message dissemination across the network. Defaults to False.

  • ssl_context_client (Optional[ssl.SSLContext], optional) –

  • ssl_context_server (Optional[ssl.SSLContext], optional) –

  • ssl_certificate_attributes (dict, optional) – A dictionary containing attributes to use when generating an SSL certificate, such as ‘Country Name’, ‘State or Province Name’, ‘Locality Name’, and others. These attributes provide metadata for the SSL certificate, ensuring that it is correctly identified and validated within the network. Defaults to an empty dictionary.

  • ssl_certificates_location (Optional[str] = '.',) – Location of the directory where SSL/TLS certificates are stored. This directory should include the necessary certificate files for establishing secure connections using the configured SSL context. If not specified, defaults to the current directory (‘.’).

Notes

The combination of the root and port parameters in the configuration of a ChaskiNode determines how and on which port the node attempts to connect or listen.

  • root=True, port specified:
    • The node is initialized as a root node and uses the specified port to establish connections.

  • root=True, port not specified (None or 0):
    • A port will be chosen from the FAVORITE_PORTS list that is available. If no port is found, a free port will be dynamically assigned.

  • root=False, port specified:
    • The node is initialized as a non-root node and listens on the specified port.

  • root=False, port not specified (None or 0):
    • The node is not a root node and if no port is specified, a free port is dynamically assigned using self.port = self.get_free_port().

__repr__() str[source]

Represent a node in a network graph.

This class represents an Edge in a network graph, which is part of a ChaskiNode. It encapsulates the necessary properties and methods for managing the state and behavior of a network connection. Edges track connection details like latency and jitter, and they store information about the IP, port, and name of the connection, as well as the subscriptions of topics of interest. Furthermore, an edge provides functionality for sending pings to measure latency, and it can reset its performance statistics.

async _connect_to_peer(node: ChaskiNode, peer_port: int | None = None, paired: bool = False, data: dict = {}) None[source]

Asynchronously establish a TCP connection to a peer node.

Initiate a TCP connection to the specified peer node. If a connection is already established, or if the target node is the same as the current one, the function will produce a warning and not proceed further. This function also supports marking a connection as ‘paired’, updating corresponding state information about the peer node.

Parameters:
  • node ('ChaskiNode') – The target node instance or the ip string to connect to.

  • peer_port (Optional[int]) – The port number of the target node if the node parameter is not a ChaskiNode instance.

  • paired (bool) – Flag indicating whether the connection should be marked as ‘paired’.

  • data (dict) – Additional data to include in the report_paired command if the connection is paired.

async _connected(reader: StreamReader, writer: StreamWriter) None[source]

Handle an incoming TCP connection.

This coroutine is called when a new TCP connection is established. It will create a new Edge instance representing this connection and start listening to incoming messages from the peer.

Parameters:
  • reader (asyncio.StreamReader) – The StreamReader object to read data from the connection.

  • writer (asyncio.StreamWriter) – The StreamWriter object to write data to the connection.

async _generic_request_udp(callback: str, kwargs: dict[str, Any] = {}, edge: Edge | None = None) Any[source]

Make a generic UDP request to a peer node and await the response.

This method sends a UDP request with a specified callback function and additional keyword arguments. It generates a unique identifier for the request, sends the message, and waits for a response from the peer node. The response is then retrieved and returned as the result of the method call.

Parameters:
  • callback (str) – The name of the method to call on the peer node.

  • kwargs (dict[str, Any], optional) – A dictionary of keyword arguments to pass to the callback method on the peer node.

Returns:

The response data received from the peer node.

Return type:

Any

Raises:

asyncio.TimeoutError – If the response is not received within a specified timeout period.

Notes

This method is typically used for internal communication between nodes in the network. It helps in sending requests and receiving responses asynchronously over UDP.

_get_status(**kwargs) dict[source]

Retrieve the status of the node.

This method compiles and returns a dictionary containing the current status details of the node. The status includes information about the node’s paired events for each subscription, whether the server is closing, and whether the node is in the process of attempting to reconnect.

Parameters:

**kwargs (dict, optional) – Additional status information that can be passed as key-value pairs and will be included in the returned status dictionary.

Returns:

A dictionary containing the status details of the node. The keys include: - ‘paired’: A dictionary where keys are subscription topics and values are boolean

indicating whether the node is paired for that subscription.

  • ’serving’: Boolean value indicating whether the server is closing (False) or not (True).

  • ’reconnecting’: Boolean value indicating whether the node is currently attempting to

    reconnect to a peer (True) or not (False).

Return type:

dict

async _keep_alive(interval: int = 7) None[source]

Keep the node alive by sending periodic heartbeat messages.

This coroutine sends a heartbeat message at defined intervals to ensure that the connection remains active. It is useful for preventing timeouts and maintaining the session state.

Parameters:

interval (int) – The number of seconds between each heartbeat message.

async _loop_message(message: Message, edge: Edge) None[source]

Asynchronously process a received message and invoke the appropriate handler.

This method acts as a dispatcher, delegating the received message to a specific method based on the command of the message. It utilizes dynamic method resolution to determine the handler for each command. If no specific handler is found for the command, a warning is logged indicating the missing processor.

Parameters:
  • message (Message) – The received message containing a command, associated data, and a timestamp.

  • edge (Edge) – The network edge (connection) associated with the message source.

async _ping(server_edge: Edge, delay: float = 0, latency_update: bool = True, size: int = 0) None[source]

Send a ping message to measure latency and connectivity.

This method sends a single ping message to a specified edge or to all server pairs if no edge is specified. It also allows for setting a size for the payload in bytes and a delay before sending the ping. If the response option is true, a pong message will be sent back immediately after receiving a ping.

Parameters:
  • server_edge (Edge, optional) – The edge (network connection) to ping. If provided, the ping will be sent only to this edge. If None, pings will be sent to all server_pairs.

  • delay (float, optional) – The delay in seconds before sending the ping message. Defaults to 0 seconds.

  • latency_update (bool, optional) – If True, the latency information for the edge will be updated based on the ping response. Defaults to True.

  • size (int, optional) – The size of the dummy payload data in bytes to be included in the ping message. Defaults to 0 bytes, meaning no additional data is sent.

async _process_discovery(message: Message, edge: Edge | None = None) None[source]

Processes a network discovery message and propagates it if necessary.

This method is responsible for processing discovery messages as part of a network-wide search for ChaskiNodes with matching subscriptions. The method checks if the message should be propagated based on the TTL and visited nodes. If the current node’s subscriptions match the origin’s, a connection is attempted. Otherwise, the discovery message is forwarded to other ChaskiNodes, avoiding nodes that have already been visited.

Parameters:
  • message (Message) – The discovery message containing details about the discovery process, including the sender’s information, visited nodes, and TTL.

  • edge (Optional[Edge], optional) – The edge where the discovery message was received from. It may be used to avoid sending the discovery message back to the sender. Defaults to None.

async _process_handshake(message: Message, edge: Edge) None[source]

Process a handshake command received from a peer node.

This coroutine is triggered upon receiving a handshake message, indicating an initiation of communication protocol by another ChaskiNode. It prepares and sends a handshake response back to the origin node to acknowledge the handshake and completes the two-way communication setup.

Parameters:
  • message (Message) – The handshake message received, containing the timestamp and data that includes the peer’s name, ip, port, and subscription information.

  • edge (Edge) – The edge associated with the peer node from which the handshake message was received, representing the communication connection to the peer.

async _process_handshake_back(message: Message, edge: Edge) None[source]

Handle a handshake response (back) from a peer node after an initial handshake.

This coroutine is invoked upon receiving a handshake response from a peer node in the network. It updates the edge information with the name, ip, port, and subscriptions of the responding node and adds the edge to the server’s active connections list.

Parameters:
  • message (Message) – The incoming handshake message containing peer information and a unique handshake identifier.

  • edge (Edge) – The edge associated with the peer node that responded to the handshake, representing the communication link with the peer.

async _process_ping(message: Message, edge: Edge) None[source]

Handle incoming ping messages and optionally send a pong response.

When a ping message is received, this method processes the message and sends a pong response back to the sender if requested. The method updates the edge’s latency measurements based on the round trip time of the ping-pong exchange if the latency_update flag in the message is set to True. It also sets the edge’s name, ip, port, and subscriptions based on the information received in the pong message.

Parameters:
  • message (Message) – The incoming ping message containing the timestamp and data needed to send a pong response.

  • edge (Edge) – The edge associated with the incoming ping message.

async _process_pong(message: Message, edge: Edge) None[source]

Process a pong message and update edge latency measurements.

This coroutine is triggered when a pong message is received in response to a ping request. It uses the time difference between the pong message’s timestamp and the current time to calculate the round-trip latency. If the ‘latency_update’ flag in the message data is True, this latency value will be used to update the edge’s latency statistics. Additionally, the edge’s identifying information such as name, ip, and port is updated based on the pong message data.

Parameters:
  • message (Message) – The incoming pong message containing the original timestamp, sender’s name, ip, port, and subscription information, as well as a unique identifier for the ping event.

  • edge (Edge) – The edge object representing the connection to the sender of the pong message.

async _process_report_paired(message: Message, edge: Edge) None[source]

Process a ‘report_paired’ network message.

This method gets executed when a ‘report_paired’ command is received, indicating that a pairing action has occurred. Depending on the ‘on_pair’ behavior specified in the message, the node may disconnect after pairing or take no action.

Parameters:
  • message (Message) – The message instance containing the ‘report_paired’ command and associated data, such as pairing status and actions to take upon pairing.

  • edge (Edge) – The edge from which the ‘report_paired’ message was received. It provides context for where to apply the action specified in the message data.

async _process_request_udp(message: Message, edge: Edge) None[source]

Process an incoming UDP request and dispatch a response.

This method handles the reception of a UDP request message. It uses the callback function specified in the message to generate a response and then sends this response back to the requester.

Parameters:
  • message (Message) – The received UDP request message containing the required callback and arguments.

  • edge (Edge) – The edge representing the connection from which the request was received.

async _process_response_udp(message: Message, edge: Edge) None[source]

Process a response to a UDP request.

This method is invoked when a response to a UDP request is received. It extracts the corresponding request ID from the message, retrieves the response data, and sets the corresponding event to indicate that the response has been processed.

Parameters:
  • message (Message) – The received message containing the response data.

  • edge (Edge) – The edge from which the response was received.

async _process_udp_message(data: bytes, addr: Tuple[str, int]) None[source]

Process incoming UDP messages routed to this node’s UDP server.

This asynchronous handler is called when the UDP server receives a new message. It deserializes the received bytes back into a message object and processes it according to the command it contains. The method handles ‘status’ and ‘response’ commands used for node status checks and responses.

Parameters:
  • data (bytes) – The raw bytes received from the UDP client.

  • addr (Tuple[str, int]) – A tuple containing the sender’s IP address as a string and the port number as an integer.

Raises:

ValueError – If the received message cannot be processed or contains an invalid command not supported by the node.

async _reader_loop(edge: Edge) None[source]

Listen and process messages from a given edge in the network.

This asynchronous method is the main loop for managing incoming messages from the connected peer node represented by the provided ‘edge’. It constantly reads data from the StreamReader of the edge until the connection is closed, or an error is encountered. It handles framing, deserialization, and dispatching of the messages using the ‘_process_message’ coroutine for further handling.

Parameters:

edge (Edge) – The network edge (connection) object from which the messages are read and processed. It contains the StreamReader and StreamWriter for network I/O.

async _remove_closing_connection() None[source]

Identify and remove server pairs that have closed connections.

This coroutine iterates through the server pairs of the ChaskiNode instance and filters out any edges where the StreamWriter’s associated connection is determined to be closed. This serves to maintain an accurate list of active connections on the server and ensures that operations are not attempted on closed connections.

async _request_status(dest_ip: str, dest_port: int) Message[source]

Request the status of a node via UDP and wait for a response.

This asynchronous method sends a UDP message to the target ip and port, requesting its status. It generates a unique identifier for the request, sends the message, and then waits for a response that matches the identifier. Once the response is received, it is returned as a Message object.

Parameters:
  • dest_ip (str) – The IP address of the destination node to query for status.

  • dest_port (int) – The port number of the destination node to communicate the status request.

Returns:

The status response message from the destination node, containing information such as whether it is paired and actively serving.

Return type:

Message

async _send_udp_message(command: str, message: Any, dest_ip: str, dest_port: int) None[source]

Send a UDP message to the specified destination ip and port.

This coroutine sends a pre-formatted message over UDP to a given destination ip and port. It serializes the message, which includes a command and its associated data, before transmission. This method is utilized for communication protocols that require UDP for message passing, like status checks or discovery procedures.

Parameters:
  • command (str) – The command type that dictates the kind of operation to perform, included in the message.

  • message (Any) – The payload associated with the command that contains data necessary for carrying out the operation.

  • dest_ip (str) – The destination IP address to which the message will be sent.

  • dest_port (int) – The port number on the destination host to which the message should be directed.

async _start_tcp_server() None[source]

Configure and start the asyncio TCP server.

A coroutine that sets up and starts the asyncio TCP server on the ip and port attributes of the ChaskiNode instance. The server will handle incoming client connections using the ‘connected’ coroutine as the protocol factory. In addition, a background keep-alive task is started to manage node heartbeat and connectivity. The server will run until explicitly stopped or an unhandled exception occurs.

async _start_udp_server() None[source]

Start an asyncio UDP server to handle incoming datagrams.

This coroutine is responsible for creating and binding a UDP socket to listen for incoming datagram messages. It then creates a UDP protocol endpoint, providing mechanics for handling UDP communications. The protocol handler, defined by the UDPProtocol class, specifies how incoming datagrams and error events are processed.

Raises:

ValueError – If the address provided for the UDP socket can’t be resolved.

async _test_generic_request_udp(echo_data: dict[str, Any] = {}) Any[source]

Send a test UDP request and await the response.

This method constructs and sends a generic UDP request with the provided echo_data to a peer node and waits for the response. It is useful for testing the UDP communication mechanism between nodes.

Parameters:

echo_data (dict, optional) – A dictionary of data to be included in the UDP request. The default is an empty dictionary.

Returns:

The response data received from the peer node.

Return type:

Any

async _test_generic_response_udp(**echo_data: Any) Any[source]

Respond to a test UDP request by echoing the received data.

This coroutine processes a generic UDP request for testing purposes. It simply echoes back the provided echo_data to the requester. This method can be used to verify the correct handling of UDP requests and responses between nodes.

Parameters:

**echo_data (dict) – A dictionary of data received in the UDP request. This data will be echoed back in the response.

Returns:

The echo_data dictionary received in the request, echoed back to the requester.

Return type:

dict

async _write(command: str, data: Any, edge: Edge | None = None, topic: str = 'All') None[source]

Write data to the specified writer or all connected peers.

Sends a packaged message with a particular command and associated data to either a single specified writer or broadcast it to all connected server peers. The message includes the command name and data, which gets serialized before being sent. This method ensures the data is properly framed with its length for transmission over TCP.

Parameters:
  • command (str) – The name of the command or type of the message to be sent.

  • data (Any) – The payload of the message, which may consist of any type of data compatible with the serializer.

  • writer (Optional[asyncio.StreamWriter], optional) – The stream writer to which the message should be sent. If None, the message will be sent to all server pairs. Defaults to None.

async _write_data(data: bytes, edges: List[Edge] | None = None, excluded_edges: List[Edge] = []) None[source]

Send serialized data to specified edges or all connected edges, excluding optional edges.

This method writes serialized data to the provided list of edges or all connected edges. It ensures that data is only sent to edges that are not closing their connections and have not been explicitly excluded.

Parameters:
  • data (bytes) – The serialized data to be sent to the edges.

  • edges (Optional[List[Edge]], optional) – A list of Edge instances to which the data should be sent. If None, the data is sent to all edges. Defaults to None.

  • excluded_edges (List[Edge], optional) – A list of Edge instances to exclude from data transmission. Defaults to an empty list.

Raises:

ConnectionResetError – If an edge connection is lost while writing data, an attempt to reconnect is made.

add_propagation_command(command: str) None[source]

Add a command to the list of commands that should be propagated.

This method appends a given command to the internal list of commands that are allowed to be propagated to other nodes. It ensures that each command is unique within the list by converting it to a set and back to a list.

Parameters:

command (str) – The command to add to the propagation list. This command should be a string representing a specific type of message that can be propagated to other nodes.

async close_connection(edge: Edge, port: int | None = None) None[source]

Close the connection associated with a given edge, optionally specifying a port.

This coroutine handles the termination of a network connection that corresponds to the provided edge. If a port number is specified, the connection to that port will be closed. All resources associated with the connection, such as stream writers, are properly finalized. If the current node ends up without any connections, a warning is logged, and an attempt to reconnect is made.

Parameters:
  • edge (Edge) – The edge object representing the network connection to be closed. If a port is specified, only the connection to that port is closed.

  • port (Optional[int]) – An optional port number to specifically close the connection to. If None, all connections associated with the edge are closed.

async connect(address_or_ip_or_node: str | ChaskiNode, port: int | None = None, discovery=False, discovery_timeout=0.5) Edge | None[source]

Establish a connection to the specified node or address.

This method initiates a TCP connection to a specified node or an IP address and port. It leverages the _connect_to_peer method to create the connection. The input can be an instance of ChaskiNode, a string representing the IP address, or an address string in the format “ip:port” or “[ipv6]:port”.

Parameters:
  • address_or_ip_or_node (Union[str, ChaskiNode]) – The target node instance or IP string to connect to. Acceptable formats include: - ChaskiNode instance - IP address string (e.g., “192.168.1.1”) - Address string with port (e.g., “192.168.1.1:65432” or “[2001:db8::1]:65432”)

  • port (Optional[int]) – The port number of the target node if an IP address string is provided. Ignored if address_or_ip_or_node includes port information or is a ChaskiNode instance.

Raises:

ValueError – If the address cannot be resolved.

Returns:

  • Optional[Edge]

  • The matching Edge instance if found and discovery is False; otherwise, None.

deserializer(data: bytes) Any[source]

Deserialize the provided data using the configured deserializer.

This method attempts to convert a bytes object back into its original form (such as an object, string, or other data structures) using the deserializer function specified at the node’s initialization.

Parameters:

data (bytes) – The serialized data as a bytes object that needs to be deserialized.

Returns:

The deserialized data, converted back to its original form. This can be any data type that was originally serialized.

Return type:

Any

Notes

If the deserialization process fails, this method returns the raw data as a fallback.

disable_message_propagation() None[source]

Disable the message propagation feature for the node.

This method turns off the node’s ability to forward received messages to other connected nodes. Once disabled, the node will stop relaying messages but will continue to receive and process incoming messages.

async discovery(node: ChaskiNode | None = None, on_pair: str | Literal['none', 'disconnect'] = 'none', timeout: int | float = 10) None[source]

Conducts a network-wide discovery process.

Executes a discovery process across the network to find and potentially connect with other ChaskiNodes. This function is used to find nodes with overlapping subscriptions to establish a peer-to-peer connection. It allows the node to expand its network by connecting to more nodes, which may be of interest based on the subscriptions. Depending on the ‘on_pair’ setting, nodes may connect permanently or just acknowledge the presence of each other.

Parameters:
  • node (Optional['ChaskiNode'], optional) – A reference to a ChaskiNode instance to start the discovery process from. If None, discovery will be attempted using the current node’s server pairs. Defaults to None.

  • on_pair (Union[str, Literal['none', 'disconnect']], optional) – The action to take when a peer is discovered. ‘none’ means no action is taken, while ‘disconnect’ causes the node to disconnect after pairing. Defaults to ‘none’.

  • timeout (int, optional) – The maximum time in seconds to wait for the discovery process to complete before considering the node as paired. Defaults to 10 seconds.

enable_message_propagation() None[source]

Enable the message propagation feature for the node.

When this method is called, it sets the propagate attribute to True. This allows the node to forward received messages to other connected nodes. By enabling message propagation, the node helps in disseminating messages across the network, except for the edge from which the message was received.

get_edge(ip: str, port: int) Edge | None[source]

Retrieve an existing edge by its ip and port.

This method looks up and returns an Edge instance that matches the given ip and port. If no such edge is found, it returns None.

Parameters:
  • ip (str) – The IP address of the edge to find.

  • port (int) – The port number of the edge to find.

Returns:

The Edge instance with the specified ip and port if found, else None.

Return type:

Optional[Edge]

get_edge_by_name(node_name: str) Edge | None[source]

Retrieve an existing edge by its node name.

This method looks up and returns an Edge instance that matches the given node name. If no such edge is found, it returns None.

Parameters:

node_name (str) – The name of the node to find.

Returns:

The Edge instance with the specified node name if found, else None.

Return type:

Optional[Edge]

get_free_port() int[source]

Get a free port for the node to use.

This method creates a temporary socket to bind to a port with value 0, which causes the operating system to allocate an available port. Once the socket is bound, the port number assigned by the operating system is retrieved and the socket is closed. This port number can be used for subsequent network operations requiring a free and available port.

Returns:

The port number assigned by the operating system that is free and available for use.

Return type:

int

async handshake(server_edge: Edge, delay: float = 0, response: bool = False)[source]

Initiate or respond to a handshake with the given edge.

This method sends a handshake message to the specified edge and optionaly awaits for a handshake response. It is used to initiate or confirm a connection establishment between two ChaskiNodes.

Parameters:
  • server_edge (Edge) – The edge instance to which the handshake message is to be sent.

  • delay (float, optional) – The amount of time (in seconds) to wait before sending the handshake message.

  • back_delay (TODO) –

  • response (bool, optional) – Indicates whether a response is expected. Set to True if waiting for a handshake back.

is_connected_to(node: ChaskiNode) bool[source]

Check if this node is connected to another specified node.

Determines whether the current ChaskiNode instance has an established TCP connection with the given node. It checks the server pairs list for a matching ip and port pair to confirm connectivity.

Parameters:

node (ChaskiNode) – The node to check for connectivity with the current node.

Returns:

True if the current node is connected to the specified node; otherwise, False.

Return type:

bool

is_port_available(port: int) bool[source]

Check if a specific port is available for use.

This method attempts to bind to a given port to determine if it is available for use. It creates a temporary socket and tries to bind it to the specified port on the current node’s ip. If the binding is successful, the port is considered available. Otherwise, it is in use.

Parameters:

port (int) – The port number to check for availability.

Returns:

True if the port is available; False if the port is already in use.

Return type:

bool

Raises:

OSError – If the port binding operation encounters an error other than the port being in use.

property paired: bool

Check if the node is paired for all its subscriptions.

This property returns a boolean value indicating whether the node is paired for all the subscriptions it subscribes to.

Returns:

True if the node is paired for all subscriptions, otherwise False.

Return type:

bool

paired_for(subscription) bool[source]
async ping(server_edge: Edge | None = None, size: int = 0) None[source]

Send ping messages to one or all connected edges.

This method sends a ping message either to a specified edge or broadcasts it to all connected edges in the server_pairs list. It is used to measure network latency and can be used to ensure connectivity. The method allows specifying the size of each ping message and the number of times it should be repeated.

Parameters:
  • server_edge (Optional[Edge], optional) – The specific edge to which the ping message should be sent. If None, the ping message is sent to all edges in the server_pairs list. Defaults to None.

  • size (int, optional) – The size of the dummy data to be sent with the ping message in bytes. This allows simulating payload sizes and their effect on latency. Defaults to 0.

async propagate(message: Message, source_edge: Edge) None[source]

Propagate a message to other connected edges, excluding the source edge.

This method is used to propagate messages received by a node to other connected edges in the network, excluding the edge from which the message was received. This helps in disseminating the message across the network while preventing immediate looping of the message back to its origin.

Parameters:
  • message (Message) – The message to be propagated. This contains the command, topic, data, timestamp, TTL, and UUID of the message.

  • source_edge (Edge) – The edge from which the message was originally received. This edge will be excluded from the propagation to prevent looping.

Notes

  • The message’s TTL (Time-to-Live) will be decremented by one. If the TTL reaches zero or less, the message will not be propagated further.

  • The message’s UUID will be appended to the message pool to track its propagation and avoid duplicate processing.

async remove_duplicated_connections() None[source]

Remove duplicate connections from the server pairs.

Iterates over the list of server pairs and closes connections that have the same ip and port as an already seen connection. This ensures that each peer is connected to the node only once, avoiding redundant connections.

remove_propagation_command(command: str) None[source]

Remove a command from the list of commands that should be propagated.

This method removes a given command from the internal list of commands that are allowed to be propagated to other nodes. It ensures that the command is removed if it exists in the list. If the command does not exist in the list, the method completes silently without any changes.

Parameters:

command (str) – The command to remove from the propagation list. This command should be a string representing a specific type of message that was previously added for propagation to other nodes.

async request_ssl_certificate(ca_address: str) None[source]

Request an SSL certificate from a Certificate Authority (CA).

This coroutine initiates a request for an SSL certificate from the specified Certificate Authority (CA) address. It generates a Certificate Signing Request (CSR), sends it to the CA, and waits for the CA to sign the CSR and return the signed certificate. Additionally, it retrieves the CA’s certificate and updates the node’s SSL context.

Parameters:

ca_address (str) – The address of the Certificate Authority (CA) node to request the SSL certificate from.

Raises:

Exception – If there is an error during the SSL certificate request or signing process.

Notes

The method performs the following steps: 1. Generates a CSR locally. 2. Establishes a connection with the CA node. 3. Sends the CSR to the CA for signing. 4. Receives the signed certificate and CA’s certificate from the CA. 5. Writes the signed certificate and CA’s certificate to disk. 6. Updates the SSL context of the node with the new certificate. 7. Closes the connection with the CA node. 8. Restarts the node with the newly updated SSL context.

async run() None[source]

Launch TCP and UDP servers for the node.

This coroutine starts the TCP and UDP server tasks to listen for incoming connections and handle UDP datagrams. It is an essential part of the node’s operation, enabling it to accept connections from other nodes and exchange messages over the network.

serialize_message(message: Message) bytes[source]

Serialize a Message instance into a bytes object.

This method takes a Message instance and converts it into a bytes object that can be transmitted over the network. It first serializes the message and its topic, and then prepends the lengths of these serialized components in bytes for proper framing.

Parameters:

message (Message) – The message object to be serialized. It contains the command, topic, data, timestamp, TTL, and UUID associated with the message.

Returns:

A bytes object that represents the serialized message. The format ensures both the message and its topic can be accurately reconstructed on the receiving end.

Return type:

bytes

serializer(obj: Any) bytes[source]

Serialize the provided object using the configured serializer.

This method converts a Python object into a bytes representation using the serializer function specified at node initialization. This is typically used for preparing data to be sent over the network.

Parameters:

obj (Any) – The Python object to be serialized. This can be any serializable object, such as dictionaries, lists, or custom objects.

Returns:

The serialized representation of the input object as bytes.

Return type:

bytes

Notes

If the serialization process fails, this method returns the string representation of the object as a fallback.

property status: dict

Retrieve the current status of the ChaskiNode.

This property compiles and returns a dictionary containing the current status details of the node. The status includes information about the node’s paired events for each subscription, whether the server is closing, and whether the node is in the process of attempting to reconnect.

Returns:

A dictionary containing the status details of the node. The keys include: - ‘paired’: A dictionary where keys are subscription topics and values are boolean

indicating whether the node is paired for that subscription.

  • ’serving’: Boolean value indicating whether the server is closing (False) or not (True).

  • ’reconnecting’: Boolean value indicating whether the node is currently attempting to

    reconnect to a peer (True) or not (False).

Return type:

dict

async stop() None[source]

Stop all activities of the node, ensuring proper cleanup.

This coroutine is responsible for gracefully stopping all network services of the node. It closes both TCP and UDP connections, cancels background tasks such as keep-alive checks, and finalizes any pending operations. After invoking this function, the node will no longer serve as part of the network until restarted.

subscribe(subscriptions: List[str], paired: bool = False) None[source]

Subscribe the node to specified topics.

This method allows the node to subscribe to additional topics for receiving messages. If the node is already paired for the given subscriptions, it does not duplicate the subscriptions.

Parameters:
  • subscriptions (List[str]) – A list of subscription topics to which the node should subscribe.

  • paired (bool) – A flag indicating whether the subscriptions should be marked as paired.

Notes

Subscriptions are tracked within the node, allowing it to receive relevant messages from other nodes that share similar interests or topics.

Examples

>>> node = ChaskiNode()
>>> node.subscribe(['topic1', 'topic2'], paired=True)
>>> assert 'topic1' in node.subscriptions
>>> assert 'topic2' in node.subscriptions
track_task(coro)[source]
async try_to_reconnect(edge: Edge) None[source]

Continuously attempt to reconnect to a given edge.

This coroutine will retry to establish a TCP connection with the specified edge in case the initial connection has been lost. The attempts will be made at 1-second intervals until a successful connection is established or the coroutine is explicitly cancelled. This method is useful for maintaining persistent connections in a network of ChaskiNodes.

Parameters:

edge (Edge) – The edge to which the reconnection attempts will be made. This represents the lost connection that needs to be restored.

uuid() str[source]

Generate a unique identifier for the node.

This method generates and returns a universally unique identifier (UUID) as a string. The UUID is used to uniquely identify objects and events within the network communication context, ensuring that each identifier is distinct across the distributed node system.

Returns:

A string representation of a UUID, which can be used to uniquely identify an object, event, or message within the node network.

Return type:

str

Notes

The UUID generation is based on Python’s uuid module, which provides a way to create universally unique identifiers compliant with RFC 4122.

See also

uuid.uuid4

Generates a random UUID.

References

class chaski.node.Edge(writer: ~asyncio.streams.StreamWriter, reader: ~asyncio.streams.StreamReader, latency: float = 0, jitter: float = 0, name: str = '', ip: str = '', port: int = 0, subscriptions: set = <factory>, ping_in_progress: bool = False, paired: bool = False)[source]

Bases: object

Represents a connection to a peer node, encompassing essential communication features and performance metrics.

The Edge class is designed to manage and provide detailed insights into network connections between nodes. This class focuses on TCP connections, offering methods for performance evaluation, address management, and connection properties.

writer

StreamWriter for sending data to the connected peer node.

Type:

asyncio.StreamWriter

reader

StreamReader for reading data from the connected peer node.

Type:

asyncio.StreamReader

latency

Current latency in the connection, default is 0 milliseconds.

Type:

float, optional

jitter

Variation in latency, default is 0 milliseconds.

Type:

float, optional

name

The name identifier of the edge, typically used for logging and monitoring.

Type:

str, optional

ip

The IP of the connected peer node.

Type:

str, optional

port

The port number of the connected peer node.

Type:

int, optional

subscriptions

Set of topics this edge subscribes to.

Type:

set, optional

ping_in_progress

A flag to indicate if a ping operation is in progress, default is False.

Type:

bool, optional

paired

A flag to indicate if the node is paired, default is False.

Type:

bool, optional

__repr__() str[source]

Return a string representation of the Edge.

Generates a human-readable string that includes the Edge’s name, latency, jitter, ip, and port. The string format highlights the state of the Edge in terms of network performance and connection details.

Returns:

A formatted string characterizing the Edge instance with details like name, latency (in milliseconds), jitter (in milliseconds), IP, and port.

Return type:

str

property address: tuple[str, int]

Retrieve the address of the connected remote peer.

This method returns the remote address to which the edge’s writer is connected to. It extracts the ‘peername’ information from the underlying socket associated with the StreamWriter instance held by the edge.

Returns:

A tuple of two elements where the first element is the IP of the remote peer as a string, and the second element is the port number as an integer.

Return type:

tuple[str, int]

ip: str = ''
jitter: float = 0
latency: float = 0
property local_address: Tuple[str, int]

Retrieve the local address to which the edge’s writer is connected.

This cached property returns a tuple containing the local IP address or hostname, and the local port number, obtained from the writer socket’s information. It represents the local end of the connection managed by the edge.

Returns:

A tuple containing the local address of the writer socket. The first element is the IP address or hostname as a string, and the second element is the port number as an integer.

Return type:

Tuple[str, int]

name: str = ''
paired: bool = False
ping_in_progress: bool = False
port: int = 0
reader: StreamReader
reset_latency() None[source]

Reset the latency and jitter values for the edge.

This function resets the latency and jitter values to their default initial state, which is 0 for latency and 100 for jitter. This is usually called to clear any existing latency and jitter measurements and start fresh, typically before starting a new set of latency tests or after a significant network event.

subscriptions: set
update_latency(new_latency: float) None[source]

Update the edge latency based on a new latency measurement.

This method updates the edge latency statistics by combining the new latency value with the existing latency and jitter information. It uses a simple Bayesian update approach to compute a new posterior mean and variance for the edge latency, representing the updated belief about the edge’s latency characteristics given the new data.

Parameters:

new_latency (float) – The new latency measurement to incorporate into the edge’s latency statistics.

writer: StreamWriter
class chaski.node.Message(command: str, topic: str = '', data: Any = None, timestamp: datetime = 0, ttl: int = 64, uuid: str = None)[source]

Bases: object

A class to represent a message with a specific command and associated data, along with a timestamp indicating when it was created, a topic for categorization, a TTL (time-to-live) value, and a unique identifier (UUID).

This class is designed to encapsulate all necessary details of a message within a network communication context. Each message carries a command that indicates the action to be performed, the data required to execute the action, the time at which the message was instantiated, an optional topic to categorize the message, a TTL value to indicate the lifespan of the message in hops, and a UUID to uniquely identify the message.

Parameters:
  • command (str) – The specific command or instruction that this message signifies. Commands are typically predefined and understood by both the sender and receiver in the communication protocol being implemented.

  • topic (str, optional) – An optional topic string used to categorize the message. Defaults to an empty string.

  • data (Any, optional) – The payload of the message containing the data that the command operates on. This can be any type of data struct, such as a string, dictionary, or a custom object, and its structure depends on the specific needs of the command. Defaults to None.

  • timestamp (datetime, optional) – The exact date and time when the message was created, represented as a datetime object. The timestamp provides chronological context for the message’s creation, aiding in message tracking, ordering, and latency calculations. Defaults to 0.

  • ttl (int, optional) – Time-to-live value for the message, indicating the maximum number of hops the message can take. Defaults to 64.

  • uuid (str, optional) – A unique identifier for the message, represented as a string. Defaults to None.

command: str
data: Any = None
decrement_ttl() None[source]
timestamp: datetime = 0
topic: str = ''
ttl: int = 64
uuid: str = None
class chaski.node.MessagesPool(maxzise: int = 128)[source]

Bases: object

A pool for managing a collection of messages with a fixed maximum size.

The MessagesPool class is designed to provide a thread-safe environment for storing, retrieving, and managing messages in a fixed-size list. When the pool exceeds its maximum size, the oldest message is removed to accommodate new ones.

__init__(maxzise: int = 128)[source]

Initialize a new MessagesPool instance.

Parameters:

maxzise (int, optional) – The maximum size of the message pool. If the pool exceeds this size, the oldest messages will be removed to make room for new ones. The default maximum size is 128.

async append(item: Any) None[source]

Append an item to the message pool.

This coroutine safely appends an item to the message pool, ensuring that the pool size does not exceed the defined maximum size (self.maxzise). If the pool is full, the oldest item in the pool is removed to make space for the new item. If the item already exists in the pool, it is moved to the end of the pool.

Parameters:

item (Any) – The item to append to the message pool. This can be any object that is comparable to the items stored in the pool.

async contains(item: Any) bool[source]

Check if the specified item is present in the message pool.

This coroutine acquires a lock to ensure thread safety while checking the presence of an item in the pool. It returns True if the item is found, otherwise False.

Parameters:

item (Any) – The item to check for presence in the pool. This can be any object that is comparable to the items stored in the pool.

Returns:

True if the item is present in the pool, False otherwise.

Return type:

bool

class chaski.node.UDPProtocol(node: ChaskiNode, on_message_received: Awaitable)[source]

Bases: DatagramProtocol

An asyncio protocol class for processing UDP packets.

This class defines a custom protocol to handle UDP communications for a node. It outlines methods providing core functionality for sending, receiving, and effectively managing UDP connections.

connection_lost(exc: Exception | None) None[source]

Respond to a lost connection or the closing of the UDP endpoint.

This event handler is called when the UDP connection used by the protocol is no longer connected or has been explicitly closed. Connection loss could be due to a variety of reasons, such as network issues, or the remote end closing the connection. If the connection is closed because of an error, the exception will be passed to this handler. Otherwise, the handler is called with None if the closing was clean.

Parameters:

exc (Optional[Exception]) – The exception object if the connection was lost due to an error, or None if the connection was closed cleanly.

connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

datagram_received(message: bytes, addr: tuple[str, int]) None[source]

Handle incoming datagram messages and dispatch them for processing.

This method is invoked automatically whenever a UDP packet is received. It is responsible for creating a coroutine that will handle the incoming message asynchronously. This allows the event loop to continue handling other tasks while the message is processed.

Parameters:
  • message (bytes) – The datagram message received from the sender. The content is raw bytes and is expected to be deserialized and processed by the designated handler.

  • addr (tuple[str, int]) – The sender’s address where the first element is a string representing the IP address or hostname of the sender and the second element is an integer representing the port number.

error_received(exc: Exception | None) None[source]

Handle any errors received during the UDP transaction.

This method is called automatically when an error is encountered during the UDP communication. It logs the error using the UDP-specific logger. The method is a part - of the asyncio protocol and provides a standardized interface for error handling in asynchronous UDP operations.

Parameters:

exc (Optional[Exception]) – The exception that occurred during UDP operations, if any. It is None if the error was triggered by something other than an Exception, such as a connection problem.

node: ChaskiNode
on_message_received: Awaitable
pause_writing()

Called when the transport’s buffer goes over the high-water mark.

Pause and resume calls are paired – pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

Note that if the buffer size equals the high-water mark, pause_writing() is not called – it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.

NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() – if it were, it would have no effect when it’s most needed (when the app keeps writing without yielding until pause_writing() is called).

resume_writing()

Called when the transport’s buffer drains below the low-water mark.

See pause_writing() for details.

track_task(coro)[source]