ChaskiStreamer: Asynchronous Message Streaming with a Distributed Network¶
The ChaskiStreamer module provides the functionality to stream messages asynchronously within a distributed network environment. It leverages the base class ChaskiNode and extends its capabilities to handle an internal message queue for efficient and scalable message processing.
Classes¶
ChaskiStreamer: Extends ChaskiNode to provide asynchronous message streaming capabilities.
- class chaski.streamer.ChaskiStreamer(destination_folder: str = '.', chunk_size: int = 8192, file_handling_callback: callable = None, allow_incoming_files: bool = False, sync: bool = False, persistent_storage: bool = False, *args: tuple, **kwargs: dict)[source]¶
Bases:
ChaskiNodeStream messages with ChaskiStreamer.
The ChaskiStreamer class inherits from ChaskiNode and provides an implementation to handle asynchronous message streaming within a distributed network. It sets up an internal message queue to manage incoming messages, processes these messages, and allows the asynchronous sending of messages to designated topics.
- async __aenter__() AsyncGenerator[source]¶
Enter the asynchronous context for streaming messages.
This method is called when entering the asynchronous context using the async with statement. It returns the message stream generator which will yield messages asynchronously from the internal message queue.
- Returns:
A generator that yields Message objects as they arrive in the message queue.
- Return type:
AsyncGenerator[Message, None, None]
- async __aexit__(exception_type: type, exception_value: BaseException, exception_traceback: TracebackType) None[source]¶
Exit the runtime context related to this object and stop the streamer.
This method is invoked to exit the asynchronous runtime context, typically used in conjunction with an asynchronous context manager. It ensures that any resources or operations related to this object are properly cleaned up and stopped.
- Parameters:
exception_type (type, optional) – The exception type if an exception was raised, otherwise None.
exception_value (BaseException, optional) – The exception instance if an exception was raised, otherwise None.
exception_traceback (TracebackType, optional) – The traceback object if an exception was raised, otherwise None.
Notes
This method ensures that the streamer is stopped and any pending messages are handled gracefully. It is intended to be used within an asynchronous context that supports the asynchronous context manager protocol.
- __repr__()[source]¶
Provide a string representation of the ChaskiStreamer instance.
This method returns a string that includes the class name and network information such as the IP address and port. If the instance is a root node, it prepends an asterisk (*) to the string.
- async _connect_to_peer(node: ChaskiNode, peer_port: int | None = None, paired: bool = False, data: dict = {}) None¶
Asynchronously establish a TCP connection to a peer node.
Initiate a TCP connection to the specified peer node. If a connection is already established, or if the target node is the same as the current one, the function will produce a warning and not proceed further. This function also supports marking a connection as ‘paired’, updating corresponding state information about the peer node.
- Parameters:
node ('ChaskiNode') – The target node instance or the ip string to connect to.
peer_port (Optional[int]) – The port number of the target node if the node parameter is not a ChaskiNode instance.
paired (bool) – Flag indicating whether the connection should be marked as ‘paired’.
data (dict) – Additional data to include in the report_paired command if the connection is paired.
- async _connected(reader: StreamReader, writer: StreamWriter) None¶
Handle an incoming TCP connection.
This coroutine is called when a new TCP connection is established. It will create a new Edge instance representing this connection and start listening to incoming messages from the peer.
- Parameters:
reader (asyncio.StreamReader) – The StreamReader object to read data from the connection.
writer (asyncio.StreamWriter) – The StreamWriter object to write data to the connection.
- async _generic_request_udp(callback: str, kwargs: dict[str, Any] = {}, edge: Edge | None = None) Any¶
Make a generic UDP request to a peer node and await the response.
This method sends a UDP request with a specified callback function and additional keyword arguments. It generates a unique identifier for the request, sends the message, and waits for a response from the peer node. The response is then retrieved and returned as the result of the method call.
- Parameters:
callback (str) – The name of the method to call on the peer node.
kwargs (dict[str, Any], optional) – A dictionary of keyword arguments to pass to the callback method on the peer node.
- Returns:
The response data received from the peer node.
- Return type:
Any
- Raises:
asyncio.TimeoutError – If the response is not received within a specified timeout period.
Notes
This method is typically used for internal communication between nodes in the network. It helps in sending requests and receiving responses asynchronously over UDP.
- _get_status(**kwargs) dict[source]¶
Retrieve the status of the node.
This method compiles and returns a dictionary containing the current status details of the node. The status includes information about the node’s paired events for each subscription, whether the server is closing, and whether the node is in the process of attempting to reconnect.
- Parameters:
**kwargs (dict, optional) – Additional status information that can be passed as key-value pairs and will be included in the returned status dictionary.
- Returns:
A dictionary containing the status details of the node. The keys include: - ‘paired’: A dictionary where keys are subscription topics and values are boolean
indicating whether the node is paired for that subscription.
’serving’: Boolean value indicating whether the server is closing (False) or not (True).
- ’reconnecting’: Boolean value indicating whether the node is currently attempting to
reconnect to a peer (True) or not (False).
- Return type:
dict
- async _keep_alive(interval: int = 7) None¶
Keep the node alive by sending periodic heartbeat messages.
This coroutine sends a heartbeat message at defined intervals to ensure that the connection remains active. It is useful for preventing timeouts and maintaining the session state.
- Parameters:
interval (int) – The number of seconds between each heartbeat message.
- async _loop_message(message: Message, edge: Edge) None¶
Asynchronously process a received message and invoke the appropriate handler.
This method acts as a dispatcher, delegating the received message to a specific method based on the command of the message. It utilizes dynamic method resolution to determine the handler for each command. If no specific handler is found for the command, a warning is logged indicating the missing processor.
- async _ping(server_edge: Edge, delay: float = 0, latency_update: bool = True, size: int = 0) None¶
Send a ping message to measure latency and connectivity.
This method sends a single ping message to a specified edge or to all server pairs if no edge is specified. It also allows for setting a size for the payload in bytes and a delay before sending the ping. If the response option is true, a pong message will be sent back immediately after receiving a ping.
- Parameters:
server_edge (Edge, optional) – The edge (network connection) to ping. If provided, the ping will be sent only to this edge. If None, pings will be sent to all server_pairs.
delay (float, optional) – The delay in seconds before sending the ping message. Defaults to 0 seconds.
latency_update (bool, optional) – If True, the latency information for the edge will be updated based on the ping response. Defaults to True.
size (int, optional) – The size of the dummy payload data in bytes to be included in the ping message. Defaults to 0 bytes, meaning no additional data is sent.
- async _process_ChaskiFile(message: Message, edge: Edge) None[source]¶
Process an incoming ChaskiFile message and append each chunk of data to the target file.
This method handles the processing of incoming file chunks sent over the network within a ChaskiFile message. Each chunk is appended to the file specified by the filename in the message data. When all chunks have been received, the callback function specified by file_input_callback is invoked.
- Parameters:
Notes
This method performs asynchronous file I/O using the open function with the ‘ab’ mode to append each chunk of data. It checks if the chunk data is empty, indicating that all chunks have been received, and then invokes the file_input_callback function, if provided.
- async _process_ChaskiMessage(message: Message, edge: Edge) None[source]¶
Process an incoming Chaski message and place it onto the message queue.
This method is responsible for handling Chaski messages received from the network. Upon receiving a message, it places the message into the internal message queue for further processing.
- Parameters:
Notes
This method operates asynchronously to ensure non-blocking behavior. The received message is added to the internal message queue using the put method. Once placed in the queue, the message can be retrieved and processed by other components of the application.
- async _process_ChaskiStorageRequest(message: Message, edge: Edge) None[source]¶
Process a storage request received via Chaski.
- async _process_discovery(message: Message, edge: Edge | None = None) None¶
Processes a network discovery message and propagates it if necessary.
This method is responsible for processing discovery messages as part of a network-wide search for ChaskiNodes with matching subscriptions. The method checks if the message should be propagated based on the TTL and visited nodes. If the current node’s subscriptions match the origin’s, a connection is attempted. Otherwise, the discovery message is forwarded to other ChaskiNodes, avoiding nodes that have already been visited.
- Parameters:
message (Message) – The discovery message containing details about the discovery process, including the sender’s information, visited nodes, and TTL.
edge (Optional[Edge], optional) – The edge where the discovery message was received from. It may be used to avoid sending the discovery message back to the sender. Defaults to None.
- async _process_handshake(message: Message, edge: Edge) None¶
Process a handshake command received from a peer node.
This coroutine is triggered upon receiving a handshake message, indicating an initiation of communication protocol by another ChaskiNode. It prepares and sends a handshake response back to the origin node to acknowledge the handshake and completes the two-way communication setup.
- Parameters:
message (Message) – The handshake message received, containing the timestamp and data that includes the peer’s name, ip, port, and subscription information.
edge (Edge) – The edge associated with the peer node from which the handshake message was received, representing the communication connection to the peer.
- async _process_handshake_back(message: Message, edge: Edge) None¶
Handle a handshake response (back) from a peer node after an initial handshake.
This coroutine is invoked upon receiving a handshake response from a peer node in the network. It updates the edge information with the name, ip, port, and subscriptions of the responding node and adds the edge to the server’s active connections list.
- async _process_ping(message: Message, edge: Edge) None¶
Handle incoming ping messages and optionally send a pong response.
When a ping message is received, this method processes the message and sends a pong response back to the sender if requested. The method updates the edge’s latency measurements based on the round trip time of the ping-pong exchange if the latency_update flag in the message is set to True. It also sets the edge’s name, ip, port, and subscriptions based on the information received in the pong message.
- async _process_pong(message: Message, edge: Edge) None¶
Process a pong message and update edge latency measurements.
This coroutine is triggered when a pong message is received in response to a ping request. It uses the time difference between the pong message’s timestamp and the current time to calculate the round-trip latency. If the ‘latency_update’ flag in the message data is True, this latency value will be used to update the edge’s latency statistics. Additionally, the edge’s identifying information such as name, ip, and port is updated based on the pong message data.
- async _process_report_paired(message: Message, edge: Edge) None¶
Process a ‘report_paired’ network message.
This method gets executed when a ‘report_paired’ command is received, indicating that a pairing action has occurred. Depending on the ‘on_pair’ behavior specified in the message, the node may disconnect after pairing or take no action.
- Parameters:
message (Message) – The message instance containing the ‘report_paired’ command and associated data, such as pairing status and actions to take upon pairing.
edge (Edge) – The edge from which the ‘report_paired’ message was received. It provides context for where to apply the action specified in the message data.
- async _process_request_udp(message: Message, edge: Edge) None¶
Process an incoming UDP request and dispatch a response.
This method handles the reception of a UDP request message. It uses the callback function specified in the message to generate a response and then sends this response back to the requester.
- async _process_response_udp(message: Message, edge: Edge) None¶
Process a response to a UDP request.
This method is invoked when a response to a UDP request is received. It extracts the corresponding request ID from the message, retrieves the response data, and sets the corresponding event to indicate that the response has been processed.
- async _process_udp_message(data: bytes, addr: Tuple[str, int]) None¶
Process incoming UDP messages routed to this node’s UDP server.
This asynchronous handler is called when the UDP server receives a new message. It deserializes the received bytes back into a message object and processes it according to the command it contains. The method handles ‘status’ and ‘response’ commands used for node status checks and responses.
- Parameters:
data (bytes) – The raw bytes received from the UDP client.
addr (Tuple[str, int]) – A tuple containing the sender’s IP address as a string and the port number as an integer.
- Raises:
ValueError – If the received message cannot be processed or contains an invalid command not supported by the node.
- _process_udp_storage(message: Message) None[source]¶
Process a storage response received via UDP.
- Parameters:
message (Message) – The incoming message containing the storage response details.
- async _reader_loop(edge: Edge) None¶
Listen and process messages from a given edge in the network.
This asynchronous method is the main loop for managing incoming messages from the connected peer node represented by the provided ‘edge’. It constantly reads data from the StreamReader of the edge until the connection is closed, or an error is encountered. It handles framing, deserialization, and dispatching of the messages using the ‘_process_message’ coroutine for further handling.
- Parameters:
edge (Edge) – The network edge (connection) object from which the messages are read and processed. It contains the StreamReader and StreamWriter for network I/O.
- async _remove_closing_connection() None¶
Identify and remove server pairs that have closed connections.
This coroutine iterates through the server pairs of the ChaskiNode instance and filters out any edges where the StreamWriter’s associated connection is determined to be closed. This serves to maintain an accurate list of active connections on the server and ensures that operations are not attempted on closed connections.
- async _request_status(dest_ip: str, dest_port: int) Message¶
Request the status of a node via UDP and wait for a response.
This asynchronous method sends a UDP message to the target ip and port, requesting its status. It generates a unique identifier for the request, sends the message, and then waits for a response that matches the identifier. Once the response is received, it is returned as a Message object.
- Parameters:
dest_ip (str) – The IP address of the destination node to query for status.
dest_port (int) – The port number of the destination node to communicate the status request.
- Returns:
The status response message from the destination node, containing information such as whether it is paired and actively serving.
- Return type:
- async _send_udp_message(command: str, message: Any, dest_ip: str, dest_port: int) None¶
Send a UDP message to the specified destination ip and port.
This coroutine sends a pre-formatted message over UDP to a given destination ip and port. It serializes the message, which includes a command and its associated data, before transmission. This method is utilized for communication protocols that require UDP for message passing, like status checks or discovery procedures.
- Parameters:
command (str) – The command type that dictates the kind of operation to perform, included in the message.
message (Any) – The payload associated with the command that contains data necessary for carrying out the operation.
dest_ip (str) – The destination IP address to which the message will be sent.
dest_port (int) – The port number on the destination host to which the message should be directed.
- async _start_tcp_server() None¶
Configure and start the asyncio TCP server.
A coroutine that sets up and starts the asyncio TCP server on the ip and port attributes of the ChaskiNode instance. The server will handle incoming client connections using the ‘connected’ coroutine as the protocol factory. In addition, a background keep-alive task is started to manage node heartbeat and connectivity. The server will run until explicitly stopped or an unhandled exception occurs.
- async _start_udp_server() None¶
Start an asyncio UDP server to handle incoming datagrams.
This coroutine is responsible for creating and binding a UDP socket to listen for incoming datagram messages. It then creates a UDP protocol endpoint, providing mechanics for handling UDP communications. The protocol handler, defined by the UDPProtocol class, specifies how incoming datagrams and error events are processed.
- Raises:
ValueError – If the address provided for the UDP socket can’t be resolved.
- async _test_generic_request_udp(echo_data: dict[str, Any] = {}) Any¶
Send a test UDP request and await the response.
This method constructs and sends a generic UDP request with the provided echo_data to a peer node and waits for the response. It is useful for testing the UDP communication mechanism between nodes.
- Parameters:
echo_data (dict, optional) – A dictionary of data to be included in the UDP request. The default is an empty dictionary.
- Returns:
The response data received from the peer node.
- Return type:
Any
- async _test_generic_response_udp(**echo_data: Any) Any¶
Respond to a test UDP request by echoing the received data.
This coroutine processes a generic UDP request for testing purposes. It simply echoes back the provided echo_data to the requester. This method can be used to verify the correct handling of UDP requests and responses between nodes.
- Parameters:
**echo_data (dict) – A dictionary of data received in the UDP request. This data will be echoed back in the response.
- Returns:
The echo_data dictionary received in the request, echoed back to the requester.
- Return type:
dict
- async _write(command: str, data: Any, edge: Edge | None = None, topic: str = 'All') None¶
Write data to the specified writer or all connected peers.
Sends a packaged message with a particular command and associated data to either a single specified writer or broadcast it to all connected server peers. The message includes the command name and data, which gets serialized before being sent. This method ensures the data is properly framed with its length for transmission over TCP.
- Parameters:
command (str) – The name of the command or type of the message to be sent.
data (Any) – The payload of the message, which may consist of any type of data compatible with the serializer.
writer (Optional[asyncio.StreamWriter], optional) – The stream writer to which the message should be sent. If None, the message will be sent to all server pairs. Defaults to None.
- async _write_data(data: bytes, edges: List[Edge] | None = None, excluded_edges: List[Edge] = []) None¶
Send serialized data to specified edges or all connected edges, excluding optional edges.
This method writes serialized data to the provided list of edges or all connected edges. It ensures that data is only sent to edges that are not closing their connections and have not been explicitly excluded.
- Parameters:
data (bytes) – The serialized data to be sent to the edges.
edges (Optional[List[Edge]], optional) – A list of Edge instances to which the data should be sent. If None, the data is sent to all edges. Defaults to None.
excluded_edges (List[Edge], optional) – A list of Edge instances to exclude from data transmission. Defaults to an empty list.
- Raises:
ConnectionResetError – If an edge connection is lost while writing data, an attempt to reconnect is made.
- activate_file_transfer() None[source]¶
Enable the processing of incoming file chunks.
This method sets the allow_incoming_files flag to True, allowing the ChaskiStreamer to process incoming file chunks. When enabled, the ChaskiStreamer can receive and handle file transfers as messages containing file chunks are received.
- add_propagation_command(command: str) None¶
Add a command to the list of commands that should be propagated.
This method appends a given command to the internal list of commands that are allowed to be propagated to other nodes. It ensures that each command is unique within the list by converting it to a set and back to a list.
- Parameters:
command (str) – The command to add to the propagation list. This command should be a string representing a specific type of message that can be propagated to other nodes.
- property address: str¶
Get the address of the ChaskiStreamer instance.
This property returns the address of the ChaskiStreamer in the format “ChaskiStreamer@ip:port”.
- Returns:
A string representation of the ChaskiStreamer address.
- Return type:
str
- async close_connection(edge: Edge, port: int | None = None) None¶
Close the connection associated with a given edge, optionally specifying a port.
This coroutine handles the termination of a network connection that corresponds to the provided edge. If a port number is specified, the connection to that port will be closed. All resources associated with the connection, such as stream writers, are properly finalized. If the current node ends up without any connections, a warning is logged, and an attempt to reconnect is made.
- Parameters:
edge (Edge) – The edge object representing the network connection to be closed. If a port is specified, only the connection to that port is closed.
port (Optional[int]) – An optional port number to specifically close the connection to. If None, all connections associated with the edge are closed.
- async connect(address_or_ip_or_node: str | ChaskiNode, port: int | None = None, discovery=False, discovery_timeout=0.5) Edge | None¶
Establish a connection to the specified node or address.
This method initiates a TCP connection to a specified node or an IP address and port. It leverages the _connect_to_peer method to create the connection. The input can be an instance of ChaskiNode, a string representing the IP address, or an address string in the format “ip:port” or “[ipv6]:port”.
- Parameters:
address_or_ip_or_node (Union[str, ChaskiNode]) – The target node instance or IP string to connect to. Acceptable formats include: - ChaskiNode instance - IP address string (e.g., “192.168.1.1”) - Address string with port (e.g., “192.168.1.1:65432” or “[2001:db8::1]:65432”)
port (Optional[int]) – The port number of the target node if an IP address string is provided. Ignored if address_or_ip_or_node includes port information or is a ChaskiNode instance.
- Raises:
ValueError – If the address cannot be resolved.
- Returns:
Optional[Edge]
The matching Edge instance if found and discovery is False; otherwise, None.
- deactivate_file_transfer() None[source]¶
Disable the processing of incoming file chunks.
This method sets the allow_incoming_files flag to False, preventing the ChaskiStreamer from processing any incoming file chunks until re-enabled.
- deserializer(data: bytes) Any¶
Deserialize the provided data using the configured deserializer.
This method attempts to convert a bytes object back into its original form (such as an object, string, or other data structures) using the deserializer function specified at the node’s initialization.
- Parameters:
data (bytes) – The serialized data as a bytes object that needs to be deserialized.
- Returns:
The deserialized data, converted back to its original form. This can be any data type that was originally serialized.
- Return type:
Any
Notes
If the deserialization process fails, this method returns the raw data as a fallback.
- disable_message_propagation() None¶
Disable the message propagation feature for the node.
This method turns off the node’s ability to forward received messages to other connected nodes. Once disabled, the node will stop relaying messages but will continue to receive and process incoming messages.
- async discovery(node: ChaskiNode | None = None, on_pair: str | Literal['none', 'disconnect'] = 'none', timeout: int | float = 10) None¶
Conducts a network-wide discovery process.
Executes a discovery process across the network to find and potentially connect with other ChaskiNodes. This function is used to find nodes with overlapping subscriptions to establish a peer-to-peer connection. It allows the node to expand its network by connecting to more nodes, which may be of interest based on the subscriptions. Depending on the ‘on_pair’ setting, nodes may connect permanently or just acknowledge the presence of each other.
- Parameters:
node (Optional['ChaskiNode'], optional) – A reference to a ChaskiNode instance to start the discovery process from. If None, discovery will be attempted using the current node’s server pairs. Defaults to None.
on_pair (Union[str, Literal['none', 'disconnect']], optional) – The action to take when a peer is discovered. ‘none’ means no action is taken, while ‘disconnect’ causes the node to disconnect after pairing. Defaults to ‘none’.
timeout (int, optional) – The maximum time in seconds to wait for the discovery process to complete before considering the node as paired. Defaults to 10 seconds.
- enable_message_propagation() None¶
Enable the message propagation feature for the node.
When this method is called, it sets the propagate attribute to True. This allows the node to forward received messages to other connected nodes. By enabling message propagation, the node helps in disseminating messages across the network, except for the edge from which the message was received.
- async fetch_storage(id_: str) None[source]¶
Request to fetch storage data.
- Parameters:
id (str) – The unique identifier of the data to fetch.
- get_edge(ip: str, port: int) Edge | None¶
Retrieve an existing edge by its ip and port.
This method looks up and returns an Edge instance that matches the given ip and port. If no such edge is found, it returns None.
- Parameters:
ip (str) – The IP address of the edge to find.
port (int) – The port number of the edge to find.
- Returns:
The Edge instance with the specified ip and port if found, else None.
- Return type:
Optional[Edge]
- get_edge_by_name(node_name: str) Edge | None¶
Retrieve an existing edge by its node name.
This method looks up and returns an Edge instance that matches the given node name. If no such edge is found, it returns None.
- Parameters:
node_name (str) – The name of the node to find.
- Returns:
The Edge instance with the specified node name if found, else None.
- Return type:
Optional[Edge]
- get_free_port() int¶
Get a free port for the node to use.
This method creates a temporary socket to bind to a port with value 0, which causes the operating system to allocate an available port. Once the socket is bound, the port number assigned by the operating system is retrieved and the socket is closed. This port number can be used for subsequent network operations requiring a free and available port.
- Returns:
The port number assigned by the operating system that is free and available for use.
- Return type:
int
- classmethod get_hash(file: str, algorithm: str = 'sha256') str[source]¶
Compute the hash of a file using the specified algorithm.
This method reads the file in chunks and computes the hash digest using the provided hashing algorithm. The default algorithm is SHA-256.
- Parameters:
file (str) – The path to the file for which to compute the hash.
algorithm (str, optional) – The hashing algorithm to use. Defaults to ‘sha256’. Other common algorithms include ‘md5’, ‘sha1’, ‘sha512’, etc.
- Returns:
The hexadecimal hash digest of the file.
- Return type:
str
- async handshake(server_edge: Edge, delay: float = 0, response: bool = False)¶
Initiate or respond to a handshake with the given edge.
This method sends a handshake message to the specified edge and optionaly awaits for a handshake response. It is used to initiate or confirm a connection establishment between two ChaskiNodes.
- Parameters:
server_edge (Edge) – The edge instance to which the handshake message is to be sent.
delay (float, optional) – The amount of time (in seconds) to wait before sending the handshake message.
back_delay (TODO) –
response (bool, optional) – Indicates whether a response is expected. Set to True if waiting for a handshake back.
- is_connected_to(node: ChaskiNode) bool¶
Check if this node is connected to another specified node.
Determines whether the current ChaskiNode instance has an established TCP connection with the given node. It checks the server pairs list for a matching ip and port pair to confirm connectivity.
- Parameters:
node (ChaskiNode) – The node to check for connectivity with the current node.
- Returns:
True if the current node is connected to the specified node; otherwise, False.
- Return type:
bool
- is_port_available(port: int) bool¶
Check if a specific port is available for use.
This method attempts to bind to a given port to determine if it is available for use. It creates a temporary socket and tries to bind it to the specified port on the current node’s ip. If the binding is successful, the port is considered available. Otherwise, it is in use.
- Parameters:
port (int) – The port number to check for availability.
- Returns:
True if the port is available; False if the port is already in use.
- Return type:
bool
- Raises:
OSError – If the port binding operation encounters an error other than the port being in use.
- async message_stream() AsyncGenerator[source]¶
Asynchronously generate messages from the message queue.
This coroutine listens for incoming messages on the internal message queue and yields each message as it arrives. This method is intended to be used within an asynchronous context, allowing the consumer to retrieve messages in a non-blocking manner.
- Yields:
AsyncGenerator – A Message object retrieved from the message queue.
Notes
This method runs indefinitely until the message queue is exhausted or the coroutine is explicitly stopped. Ensure proper cancellation to avoid hanging coroutines.
- property paired: bool¶
Check if the node is paired for all its subscriptions.
This property returns a boolean value indicating whether the node is paired for all the subscriptions it subscribes to.
- Returns:
True if the node is paired for all subscriptions, otherwise False.
- Return type:
bool
- paired_for(subscription) bool¶
- async ping(server_edge: Edge | None = None, size: int = 0) None¶
Send ping messages to one or all connected edges.
This method sends a ping message either to a specified edge or broadcasts it to all connected edges in the server_pairs list. It is used to measure network latency and can be used to ensure connectivity. The method allows specifying the size of each ping message and the number of times it should be repeated.
- Parameters:
server_edge (Optional[Edge], optional) – The specific edge to which the ping message should be sent. If None, the ping message is sent to all edges in the server_pairs list. Defaults to None.
size (int, optional) – The size of the dummy data to be sent with the ping message in bytes. This allows simulating payload sizes and their effect on latency. Defaults to 0.
- async propagate(message: Message, source_edge: Edge) None¶
Propagate a message to other connected edges, excluding the source edge.
This method is used to propagate messages received by a node to other connected edges in the network, excluding the edge from which the message was received. This helps in disseminating the message across the network while preventing immediate looping of the message back to its origin.
- Parameters:
Notes
The message’s TTL (Time-to-Live) will be decremented by one. If the TTL reaches zero or less, the message will not be propagated further.
The message’s UUID will be appended to the message pool to track its propagation and avoid duplicate processing.
- async push(topic: str, data: bytes = None) None[source]¶
Write a message to the specified topic.
This method allows the asynchronous sending of messages to a designated topic. The message data, if provided, is encapsulated in a ChaskiMessage and dispatched to the relevant subscribers within the network.
- Parameters:
topic (str) – The topic to which the message is to be sent. Each message is delivered exclusively to the nodes subscribing to this topic.
data (bytes, optional) – The byte-encoded data to be sent with the message. This could be any binary payload that subscribers are expected to process.
- async push_file(topic: str, file: BytesIO, filename: str = None, data: dict = None)[source]¶
Asynchronously sends a file chunk by chunk to the specified topic.
This method reads the file in chunks and sends each chunk as a separate message to the specified topic. The file’s metadata, including filename and hash, can also be sent along with the chunks. This allows for efficient and scalable file transfer in a distributed network.
- Parameters:
topic (str) – The topic to which the file chunks are to be sent. Nodes subscribing to this topic will receive the file chunks.
file (BytesIO) – A file-like object (must support read method). The file from which the data is read and sent in chunks.
filename (str, optional) – The name of the file being sent. If not provided, the name attribute of the file object is used.
Notes
This method uses asynchronous I/O to read the file in chunks and send each chunk without blocking the event loop. It ensures that the entire file is processed and sent even if the process involves multiple chunks.
This method uses a lock to ensure only one file transfer happens at a time.
- async remove_duplicated_connections() None¶
Remove duplicate connections from the server pairs.
Iterates over the list of server pairs and closes connections that have the same ip and port as an already seen connection. This ensures that each peer is connected to the node only once, avoiding redundant connections.
- remove_propagation_command(command: str) None¶
Remove a command from the list of commands that should be propagated.
This method removes a given command from the internal list of commands that are allowed to be propagated to other nodes. It ensures that the command is removed if it exists in the list. If the command does not exist in the list, the method completes silently without any changes.
- Parameters:
command (str) – The command to remove from the propagation list. This command should be a string representing a specific type of message that was previously added for propagation to other nodes.
- async request_ssl_certificate(ca_address: str) None¶
Request an SSL certificate from a Certificate Authority (CA).
This coroutine initiates a request for an SSL certificate from the specified Certificate Authority (CA) address. It generates a Certificate Signing Request (CSR), sends it to the CA, and waits for the CA to sign the CSR and return the signed certificate. Additionally, it retrieves the CA’s certificate and updates the node’s SSL context.
- Parameters:
ca_address (str) – The address of the Certificate Authority (CA) node to request the SSL certificate from.
- Raises:
Exception – If there is an error during the SSL certificate request or signing process.
Notes
The method performs the following steps: 1. Generates a CSR locally. 2. Establishes a connection with the CA node. 3. Sends the CSR to the CA for signing. 4. Receives the signed certificate and CA’s certificate from the CA. 5. Writes the signed certificate and CA’s certificate to disk. 6. Updates the SSL context of the node with the new certificate. 7. Closes the connection with the CA node. 8. Restarts the node with the newly updated SSL context.
- async run() None¶
Launch TCP and UDP servers for the node.
This coroutine starts the TCP and UDP server tasks to listen for incoming connections and handle UDP datagrams. It is an essential part of the node’s operation, enabling it to accept connections from other nodes and exchange messages over the network.
- serialize_message(message: Message) bytes¶
Serialize a Message instance into a bytes object.
This method takes a Message instance and converts it into a bytes object that can be transmitted over the network. It first serializes the message and its topic, and then prepends the lengths of these serialized components in bytes for proper framing.
- Parameters:
message (Message) – The message object to be serialized. It contains the command, topic, data, timestamp, TTL, and UUID associated with the message.
- Returns:
A bytes object that represents the serialized message. The format ensures both the message and its topic can be accurately reconstructed on the receiving end.
- Return type:
bytes
- serializer(obj: Any) bytes¶
Serialize the provided object using the configured serializer.
This method converts a Python object into a bytes representation using the serializer function specified at node initialization. This is typically used for preparing data to be sent over the network.
- Parameters:
obj (Any) – The Python object to be serialized. This can be any serializable object, such as dictionaries, lists, or custom objects.
- Returns:
The serialized representation of the input object as bytes.
- Return type:
bytes
Notes
If the serialization process fails, this method returns the string representation of the object as a fallback.
- property status: dict¶
Retrieve the current status of the ChaskiNode.
This property compiles and returns a dictionary containing the current status details of the node. The status includes information about the node’s paired events for each subscription, whether the server is closing, and whether the node is in the process of attempting to reconnect.
- Returns:
A dictionary containing the status details of the node. The keys include: - ‘paired’: A dictionary where keys are subscription topics and values are boolean
indicating whether the node is paired for that subscription.
’serving’: Boolean value indicating whether the server is closing (False) or not (True).
- ’reconnecting’: Boolean value indicating whether the node is currently attempting to
reconnect to a peer (True) or not (False).
- Return type:
dict
- async stop() None¶
Stop all activities of the node, ensuring proper cleanup.
This coroutine is responsible for gracefully stopping all network services of the node. It closes both TCP and UDP connections, cancels background tasks such as keep-alive checks, and finalizes any pending operations. After invoking this function, the node will no longer serve as part of the network until restarted.
- store_data(id_: str, value: Any) None[source]¶
Store data in the persistent storage.
- Parameters:
id (str) – The unique identifier for the data.
value (Any) – The value to be stored.
- subscribe(subscriptions: List[str], paired: bool = False) None¶
Subscribe the node to specified topics.
This method allows the node to subscribe to additional topics for receiving messages. If the node is already paired for the given subscriptions, it does not duplicate the subscriptions.
- Parameters:
subscriptions (List[str]) – A list of subscription topics to which the node should subscribe.
paired (bool) – A flag indicating whether the subscriptions should be marked as paired.
Notes
Subscriptions are tracked within the node, allowing it to receive relevant messages from other nodes that share similar interests or topics.
Examples
>>> node = ChaskiNode() >>> node.subscribe(['topic1', 'topic2'], paired=True) >>> assert 'topic1' in node.subscriptions >>> assert 'topic2' in node.subscriptions
- terminate_stream() None[source]¶
Terminate the message streaming process.
This method sets the terminate_stream_flag to True, signaling the message_stream coroutine to stop generating further messages. This is typically used to gracefully shut down the message streaming process.
- track_task(coro)¶
- async try_to_reconnect(edge: Edge) None¶
Continuously attempt to reconnect to a given edge.
This coroutine will retry to establish a TCP connection with the specified edge in case the initial connection has been lost. The attempts will be made at 1-second intervals until a successful connection is established or the coroutine is explicitly cancelled. This method is useful for maintaining persistent connections in a network of ChaskiNodes.
- Parameters:
edge (Edge) – The edge to which the reconnection attempts will be made. This represents the lost connection that needs to be restored.
- uuid() str¶
Generate a unique identifier for the node.
This method generates and returns a universally unique identifier (UUID) as a string. The UUID is used to uniquely identify objects and events within the network communication context, ensuring that each identifier is distinct across the distributed node system.
- Returns:
A string representation of a UUID, which can be used to uniquely identify an object, event, or message within the node network.
- Return type:
str
Notes
The UUID generation is based on Python’s uuid module, which provides a way to create universally unique identifiers compliant with RFC 4122.
See also
uuid.uuid4Generates a random UUID.
References