"""
=========================================================================
ChaskiStreamer: Asynchronous Message Streaming with a Distributed Network
=========================================================================
The `ChaskiStreamer` module provides the functionality to stream messages
asynchronously within a distributed network environment. It leverages the
base class `ChaskiNode` and extends its capabilities to handle an internal
message queue for efficient and scalable message processing.
Classes
=======
- *ChaskiStreamer*: Extends ChaskiNode to provide asynchronous message streaming capabilities.
"""
import os
import asyncio
import hashlib
from queue import Queue as SyncQueue
from asyncio import Queue
from typing import AsyncGenerator
from types import TracebackType
from io import BytesIO
from chaski.node import ChaskiNode, Message, Edge
from chaski.utils.persistent_storage import PersistentStorage
from typing import Any
[docs]class ChaskiStreamer(ChaskiNode):
"""
Stream messages with ChaskiStreamer.
The ChaskiStreamer class inherits from ChaskiNode and provides an implementation
to handle asynchronous message streaming within a distributed network. It sets up
an internal message queue to manage incoming messages, processes these messages,
and allows the asynchronous sending of messages to designated topics.
"""
def __init__(
self,
destination_folder: str = ".",
chunk_size: int = 8192,
file_handling_callback: callable = None,
allow_incoming_files: bool = False,
sync: bool = False,
persistent_storage: bool = False,
*args: tuple,
**kwargs: dict,
):
# Initialize lock for file transfer to prevent concurrent transfers
self._file_transfer_lock = asyncio.Lock()
"""
Initialize a new instance of ChaskiStreamer.
Parameters
----------
destination_folder : str, optional
The folder where the processed files will be stored. Defaults to the current directory ('.').
chunk_size : int, optional
The size of the chunks in which the files will be processed. Defaults to 1024 bytes.
file_handling_callback : callable, optional
A callback function to handle file inputs. This function should accept `name`, `path`, and `hash` as arguments.
allow_incoming_files : bool, optional
Flag to enable or disable processing of incoming file chunks. Defaults to False.
sync : bool, optional
Flag to enable or disable synchronous processing. Defaults to False.
persistent_storage : bool, optional
Flag to enable or disable persistent storage. Defaults to False.
*args : tuple
Additional positional arguments to pass to the superclass initializer.
**kwargs : dict
Additional keyword arguments to pass to the superclass initializer.
"""
super().__init__(*args, **kwargs)
self.sync = sync
if self.sync:
self.message_queue = SyncQueue()
else:
self.message_queue = Queue()
self.chunk_size = chunk_size
self.destination_folder = destination_folder
self.file_handling_callback = file_handling_callback
self.allow_incoming_files = allow_incoming_files
self.terminate_stream_flag = False
self.enable_message_propagation()
self.add_propagation_command("ChaskiMessage")
self.add_propagation_command("ChaskiStorageRequest")
if persistent_storage:
self.persistent_storage = PersistentStorage()
[docs] def __repr__(self):
"""
Provide a string representation of the ChaskiStreamer instance.
This method returns a string that includes the class name and network information
such as the IP address and port. If the instance is a root node, it prepends an
asterisk (*) to the string.
"""
h = "*" if self.paired else ""
return h + self.address
@property
def address(self) -> str:
"""
Get the address of the ChaskiStreamer instance.
This property returns the address of the ChaskiStreamer in the format
"ChaskiStreamer@ip:port".
Returns
-------
str
A string representation of the ChaskiStreamer address.
"""
return f"ChaskiStreamer@{self.ip}:{self.port}"
[docs] @classmethod
def get_hash(cls, file: str, algorithm: str = "sha256") -> str:
"""
Compute the hash of a file using the specified algorithm.
This method reads the file in chunks and computes the hash digest
using the provided hashing algorithm. The default algorithm is SHA-256.
Parameters
----------
file : str
The path to the file for which to compute the hash.
algorithm : str, optional
The hashing algorithm to use. Defaults to 'sha256'.
Other common algorithms include 'md5', 'sha1', 'sha512', etc.
Returns
-------
str
The hexadecimal hash digest of the file.
"""
hash_func = hashlib.new(algorithm)
with open(file, "rb") as f:
while chunk := f.read(8192): # Read the file in 8192 byte blocks
hash_func.update(chunk)
return hash_func.hexdigest()
[docs] def _get_status(self, **kwargs) -> dict:
"""
Retrieve the status of the node.
This method compiles and returns a dictionary containing the current status
details of the node. The status includes information about the node's
paired events for each subscription, whether the server is closing,
and whether the node is in the process of attempting to reconnect.
Parameters
----------
**kwargs : dict, optional
Additional status information that can be passed as key-value pairs and
will be included in the returned status dictionary.
Returns
-------
dict
A dictionary containing the status details of the node. The keys include:
- 'paired': A dictionary where keys are subscription topics and values are boolean
indicating whether the node is paired for that subscription.
- 'serving': Boolean value indicating whether the server is closing (`False`) or not (`True`).
- 'reconnecting': Boolean value indicating whether the node is currently attempting to
reconnect to a peer (`True`) or not (`False`).
"""
return {
# Get the status of paired events for each subscription
"paired": {
sub: self.paired_event[sub].is_set() for sub in self.subscriptions
},
# Check if the server is closing; 'True' means it's still serving.
"serving": not self.server_closing,
# Check if the node's reconnecting event is currently set, implying it is trying to reconnect to a peer.
"reconnecting": self.reconnecting.is_set(),
"allow_incoming_files": self.allow_incoming_files,
**kwargs,
}
[docs] async def __aenter__(self) -> AsyncGenerator:
"""
Enter the asynchronous context for streaming messages.
This method is called when entering the asynchronous context using the `async with` statement.
It returns the message stream generator which will yield messages asynchronously from the
internal message queue.
Returns
-------
AsyncGenerator[Message, None, None]
A generator that yields `Message` objects as they arrive in the message queue.
"""
return self.message_stream()
[docs] async def __aexit__(
self,
exception_type: type,
exception_value: BaseException,
exception_traceback: TracebackType,
) -> None:
"""
Exit the runtime context related to this object and stop the streamer.
This method is invoked to exit the asynchronous runtime context, typically
used in conjunction with an asynchronous context manager. It ensures that
any resources or operations related to this object are properly cleaned
up and stopped.
Parameters
----------
exception_type : type, optional
The exception type if an exception was raised, otherwise None.
exception_value : BaseException, optional
The exception instance if an exception was raised, otherwise None.
exception_traceback : TracebackType, optional
The traceback object if an exception was raised, otherwise None.
Notes
-----
This method ensures that the streamer is stopped and any pending
messages are handled gracefully. It is intended to be used within an
asynchronous context that supports the asynchronous context manager
protocol.
"""
self.terminate_stream()
[docs] async def push(self, topic: str, data: bytes = None) -> None:
"""
Write a message to the specified topic.
This method allows the asynchronous sending of messages to a designated topic. The message data, if provided, is encapsulated in a `ChaskiMessage` and dispatched to the relevant subscribers within the network.
Parameters
----------
topic : str
The topic to which the message is to be sent. Each message is delivered exclusively to the nodes subscribing to this topic.
data : bytes, optional
The byte-encoded data to be sent with the message. This could be any binary payload that subscribers are expected to process.
"""
await self._write("ChaskiMessage", data=data, topic=topic)
[docs] async def _process_ChaskiMessage(self, message: Message, edge: Edge) -> None:
"""
Process an incoming Chaski message and place it onto the message queue.
This method is responsible for handling Chaski messages received from the network.
Upon receiving a message, it places the message into the internal message queue for
further processing.
Parameters
----------
message : Message
The Chaski message received that needs to be processed. It contains the command,
data, and several other attributes.
edge : Edge
The network edge (connection) from which the message was received.
Notes
-----
This method operates asynchronously to ensure non-blocking behavior.
The received message is added to the internal message queue using the `put` method.
Once placed in the queue, the message can be retrieved and processed by other
components of the application.
"""
if not self.terminate_stream_flag:
if self.sync:
await self.message_queue.put(message)
await asyncio.sleep(0)
else:
await self.message_queue.put(message)
[docs] def activate_file_transfer(self) -> None:
"""
Enable the processing of incoming file chunks.
This method sets the `allow_incoming_files` flag to `True`, allowing the
`ChaskiStreamer` to process incoming file chunks. When enabled, the
`ChaskiStreamer` can receive and handle file transfers as messages
containing file chunks are received.
"""
self.allow_incoming_files = True
[docs] def deactivate_file_transfer(self) -> None:
"""
Disable the processing of incoming file chunks.
This method sets the `allow_incoming_files` flag to `False`, preventing the
`ChaskiStreamer` from processing any incoming file chunks until re-enabled.
"""
self.allow_incoming_files = False
[docs] async def message_stream(self) -> AsyncGenerator:
"""
Asynchronously generate messages from the message queue.
This coroutine listens for incoming messages on the internal message queue
and yields each message as it arrives. This method is intended to be used
within an asynchronous context, allowing the consumer to retrieve messages
in a non-blocking manner.
Yields
------
AsyncGenerator
A `Message` object retrieved from the message queue.
Notes
-----
This method runs indefinitely until the message queue is exhausted or the
coroutine is explicitly stopped. Ensure proper cancellation to avoid
hanging coroutines.
"""
while True:
try:
message = await self.message_queue.get()
except Exception:
await asyncio.sleep(0.1)
continue
except asyncio.CancelledError:
return
if self.terminate_stream_flag:
break
yield message
self.terminate_stream_flag = True
[docs] def terminate_stream(self) -> None:
"""
Terminate the message streaming process.
This method sets the `terminate_stream_flag` to `True`, signaling the
`message_stream` coroutine to stop generating further messages. This
is typically used to gracefully shut down the message streaming process.
"""
self.terminate_stream_flag = True
[docs] async def push_file(
self,
topic: str,
file: BytesIO,
filename: str = None,
data: dict = None,
):
"""
Asynchronously sends a file chunk by chunk to the specified topic.
This method reads the file in chunks and sends each chunk as a separate message
to the specified topic. The file's metadata, including filename and hash,
can also be sent along with the chunks. This allows for efficient and scalable
file transfer in a distributed network.
Parameters
----------
topic : str
The topic to which the file chunks are to be sent. Nodes subscribing to this
topic will receive the file chunks.
file : BytesIO
A file-like object (must support `read` method). The file from which the data is read
and sent in chunks.
filename : str, optional
The name of the file being sent. If not provided, the name attribute of the file object is used.
Notes
-----
This method uses asynchronous I/O to read the file in chunks and send each chunk
without blocking the event loop. It ensures that the entire file is processed and sent
even if the process involves multiple chunks.
This method uses a lock to ensure only one file transfer happens at a time.
"""
# Acquire the lock to ensure only one file transfer at a time
async with self._file_transfer_lock:
size = 0
# Initialize a SHA-256 hash function for computing the hash digest of the file chunks
hash_func = hashlib.new("sha256")
while True:
# Read the next chunk of data from the file up to the specified chunk size
chunk = file.read(self.chunk_size)
# If no more chunks are available to read, the file transfer is complete
if not chunk:
break
# Increment the size by the length of the current chunk
size += len(chunk)
# Update the hash function with the current chunk of data.
hash_func.update(chunk)
# Package the chunked file data along with metadata such as filename, hash, and chunk size
package_data = {
"filename": (
filename if filename else os.path.split(file.name)[-1]
),
"chunk": chunk,
"hash": hash_func.hexdigest(),
"data": data,
"chunk_size": self.chunk_size,
"size": size,
}
# Send the chunked file data as a message to the specified topic and yield control to the event loop
await self._write("ChaskiFile", data=package_data, topic=topic)
await asyncio.sleep(0) # very important sleep
[docs] async def _process_ChaskiFile(self, message: Message, edge: Edge) -> None:
"""
Process an incoming ChaskiFile message and append each chunk of data to the target file.
This method handles the processing of incoming file chunks sent over the network within a ChaskiFile message.
Each chunk is appended to the file specified by the filename in the message data. When all chunks have been received,
the callback function specified by file_input_callback is invoked.
Parameters
----------
message : Message
The Chaski message that contains the file chunk data. This message includes attributes such as the filename,
chunk data, and file hash.
edge : Edge
The network edge (connection) from which the message was received. This can be used for additional context
about the sender.
Notes
-----
This method performs asynchronous file I/O using the `open` function with the 'ab' mode to append each chunk of
data. It checks if the chunk data is empty, indicating that all chunks have been received, and then invokes the
file_input_callback function, if provided.
"""
# Check if the processing of incoming file chunks is allowed.
if not self.allow_incoming_files:
return
# Append incoming file chunk data to the target file in append-binary mode
if chunk := message.data.pop("chunk"):
with open(
os.path.join(self.destination_folder, message.data["filename"]),
"ab",
) as file:
# Write the current chunk to the target file in append-binary mode
file.write(chunk)
else:
# Invoke the file input callback if it is callable, passing message data and destiny folder
if callable(self.file_handling_callback):
# If a file input callback is defined, call it with message data and destiny folder
self.file_handling_callback(
**{
**message.data,
"destiny_folder": self.destination_folder,
}
)
[docs] async def _process_ChaskiStorageRequest(self, message: Message, edge: Edge) -> None:
"""
Process a storage request received via Chaski.
Parameters
----------
message : Message
The incoming message containing the storage request details.
edge : Edge
The edge connection through which the message was received.
"""
# Extract the storage request and connection details from the incoming message data
requests = message.data["request"]
ip = message.data["ip"]
port = message.data["port"]
# Check if the requested storage is available in persistent storage
if requests in self.persistent_storage:
response = self.persistent_storage.pop(requests)
# Prepare the response data structure for sending back
data = {"request": requests, "response": response}
# Send the storage response via UDP to the specified IP and port
await self._send_udp_message("storage", data, ip, port)
[docs] def _process_udp_storage(self, message: Message) -> None:
"""
Process a storage response received via UDP.
Parameters
----------
message : Message
The incoming message containing the storage response details.
"""
requests = message.data["request"]
response = message.data["response"]
self.store_data(requests, response)
[docs] def store_data(self, id_: str, value: Any) -> None:
"""
Store data in the persistent storage.
Parameters
----------
id_ : str
The unique identifier for the data.
value : Any
The value to be stored.
"""
self.persistent_storage.set(id_, value)
[docs] async def fetch_storage(self, id_: str) -> None:
"""
Request to fetch storage data.
Parameters
----------
id_ : str
The unique identifier of the data to fetch.
"""
# Prepare the data structure for the storage request, including the identifier and connection details
data = {
"request": id_,
"ip": self.ip,
"port": self.port,
}
# Send the storage request message over the network
await self._write("ChaskiStorageRequest", data=data, topic="storage")