Skip to content

Stream#

lit.sdk.services.stream #

This module defines the Stream class and associated functions for managing real-time data-streaming services. It provides functionality to start, stop, restart, and retrieve the status of streams, as well as a function to get a list of available streams.

__doc__ = '\nThis module defines the Stream class and associated functions for managing real-time\ndata-streaming services. It provides functionality to start, stop, restart, and retrieve\nthe status of streams, as well as a function to get a list of available streams.\n' module #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__file__ = '/opt/lit-platform/lit-lib/src/lit/sdk/services/stream.cpython-312-x86_64-linux-gnu.so' module #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__name__ = 'lit.sdk.services.stream' module #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__package__ = 'lit.sdk.services' module #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__test__ = {'Stream.start (line 50)': '\n Starts the stream.\n\n Returns:\n (ServiceActionStatus): An object containing the status of the operation.\n\n Examples:\n >>> stream = Stream("contoso", "bitcoin")\n >>> stream.start()\n {\'status\': \'SUCCESS\', \'message\': \'Stream started successfully.\'}\n ', 'Stream.stop (line 70)': '\n Stops the stream.\n\n Returns:\n (ServiceActionStatus): An object containing the status of the operation.\n\n Examples:\n >>> stream = Stream("contoso", "bitcoin")\n >>> stream.stop()\n {\'status\': \'SUCCESS\', \'message\': \'Stream stopped successfully.\'}\n ', 'Stream.restart (line 90)': '\n Restarts the stream.\n\n Returns:\n (ServiceActionStatus): An object containing the status of the operation.\n\n Examples:\n >>> stream = Stream("contoso", "bitcoin")\n >>> stream.restart()\n {\'status\': \'SUCCESS\', \'message\': \'Stream restarted successfully.\'}\n ', 'Stream.status (line 110)': '\n Retrieves the status of the stream.\n\n Returns:\n (ServiceStatus): An object containing the current status of the stream.\n\n Examples:\n >>> stream = Stream("contoso", "bitcoin")\n >>> stream.status()\n {\'name\': \'bitcoin\', \'active\': \'active\', \'status\': \'running\', \'started\': \'Wed 2024-07-24 13:32:23 CDT\', \'pid\': 3687984, \'tasks\': 5, \'memory\': \'3.3G\'}\n ', 'get_streams (line 130)': 'Gets a list of the available streams.\n\n Args:\n team (str): The name of the team.\n\n Returns:\n (list[Stream]): List of available streams.\n\n Examples:\n >>> get_streams("contoso")\n [Stream(team=\'contoso\', name=\'bitcoin\')]\n '} module #

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

LitServiceError #

Bases: lit.sdk.errors.LitError

A custom exception class for Lit service errors.

__annotations__ = {} class #

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

__doc__ = 'A custom exception class for Lit service errors.' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'lit.sdk.errors' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__init__(*args, code=None) method descriptor #

Initializes a new instance of LitServiceError.

Parameters:

Name Type Description Default
*args object

Variable number of arguments to pass to the base class constructor.

required
code int | None

The error code. Defaults to None.

None

Service #

Bases: abc.ABC

Abstract base class for managing services.

__abstractmethods__ = frozenset({'stop', 'restart', 'status', 'start'}) class #

frozenset() -> empty frozenset object frozenset(iterable) -> frozenset object

Build an immutable unordered collection of unique elements.

__annotations__ = {'team': 'str', 'name': 'str'} class #

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

__doc__ = 'Abstract base class for managing services.' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'lit.sdk.services.service' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__weakref__ property #

list of weak references to the object

__init__(team, name) method descriptor #

Initializes a new instance of Service.

Parameters:

Name Type Description Default
team str

The name of the team that owns the service.

required
name str

The name of the service.

required

restart() method descriptor #

Restarts the service.

Returns:

Name Type Description
ServiceActionStatus ServiceActionStatus

A dictionary containing the status of the operation.

Raises:

Type Description
NotImplementedError

If the method is not implemented by a subclass.

Examples:

>>> service = SomeServiceSubclass("contoso", "service_name")
>>> service.restart()
{'status': 'SUCCESS', 'message': 'Service restarted successfully.'}

start() method descriptor #

Starts the service.

Returns:

Name Type Description
ServiceActionStatus ServiceActionStatus

A dictionary containing the status of the operation.

Raises:

Type Description
NotImplementedError

If the method is not implemented by a subclass.

Examples:

>>> service = SomeServiceSubclass("contoso", "service_name")
>>> service.start()
{'status': 'SUCCESS', 'message': 'Service started successfully.'}

status() method descriptor #

Retrieves the status of the service.

Returns:

Name Type Description
ServiceStatus ServiceStatus

A dictionary containing the current status of the service.

Raises:

Type Description
NotImplementedError

If the method is not implemented by a subclass.

Examples:

>>> service = SomeServiceSubclass("contoso", "service_name")
>>> service.status()
{'name': 'service_name', 'active': 'active', 'status': 'running', 'started': 'Wed 2024-07-24 13:32:23 CDT', 'pid': 3687984, 'tasks': 5, 'memory': '3.3G'}

stop() method descriptor #

Stops the service.

Returns:

Name Type Description
ServiceActionStatus ServiceActionStatus

A dictionary containing the status of the operation.

Raises:

Type Description
NotImplementedError

If the method is not implemented by a subclass.

Examples:

>>> service = SomeServiceSubclass("contoso", "service_name")
>>> service.stop()
{'status': 'SUCCESS', 'message': 'Service stopped successfully.'}

ServiceActionStatus #

Bases: builtins.dict

Represents the status of a service action.

Examples:

>>> action_status: ServiceActionStatus = {
...     "status": "SUCCESS",
...     "message": "Service started successfully."
... }

__annotations__ = {'status': ForwardRef("Literal['SUCCESS', 'ERROR']", module='lit.sdk.services.service'), 'message': ForwardRef('str', module='lit.sdk.services.service')} class #

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

__doc__ = 'Represents the status of a service action.\n\n Examples:\n >>> action_status: ServiceActionStatus = {\n ... "status": "SUCCESS",\n ... "message": "Service started successfully."\n ... }\n ' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'lit.sdk.services.service' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__optional_keys__ = frozenset() class #

frozenset() -> empty frozenset object frozenset(iterable) -> frozenset object

Build an immutable unordered collection of unique elements.

__orig_bases__ = (<function TypedDict at 0x7fef89f62e80>,) class #

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__required_keys__ = frozenset({'status', 'message'}) class #

frozenset() -> empty frozenset object frozenset(iterable) -> frozenset object

Build an immutable unordered collection of unique elements.

__total__ = True class #

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

__weakref__ property #

list of weak references to the object

ServiceStatus #

Bases: builtins.dict

Represents the status of a service.

Examples:

>>> service_status: ServiceStatus = {
...     "name": "my_service",
...     "active": "active",
...     "status": "running",
...     "started": "Wed 2024-07-24 13:32:23 CDT",
...     "pid": 3687984,
...     "tasks": 5,
...     "memory": "3.3G"
... }

__annotations__ = {'name': ForwardRef('str', module='lit.sdk.services.service'), 'active': ForwardRef('str', module='lit.sdk.services.service'), 'status': ForwardRef('str', module='lit.sdk.services.service'), 'started': ForwardRef('str', module='lit.sdk.services.service'), 'pid': ForwardRef('int', module='lit.sdk.services.service'), 'tasks': ForwardRef('int', module='lit.sdk.services.service'), 'memory': ForwardRef('str', module='lit.sdk.services.service')} class #

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

__doc__ = 'Represents the status of a service.\n\n Examples:\n >>> service_status: ServiceStatus = {\n ... "name": "my_service",\n ... "active": "active",\n ... "status": "running",\n ... "started": "Wed 2024-07-24 13:32:23 CDT",\n ... "pid": 3687984,\n ... "tasks": 5,\n ... "memory": "3.3G"\n ... }\n ' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'lit.sdk.services.service' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__optional_keys__ = frozenset() class #

frozenset() -> empty frozenset object frozenset(iterable) -> frozenset object

Build an immutable unordered collection of unique elements.

__orig_bases__ = (<function TypedDict at 0x7fef89f62e80>,) class #

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__required_keys__ = frozenset({'active', 'name', 'pid', 'started', 'status', 'tasks', 'memory'}) class #

frozenset() -> empty frozenset object frozenset(iterable) -> frozenset object

Build an immutable unordered collection of unique elements.

__total__ = True class #

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

__weakref__ property #

list of weak references to the object

Stream #

Bases: lit.sdk.services.service.Service

Represents a real-time data-streaming service worker.

__abstractmethods__ = frozenset() class #

frozenset() -> empty frozenset object frozenset(iterable) -> frozenset object

Build an immutable unordered collection of unique elements.

__annotations__ = {'team': 'str', 'name': 'str'} class #

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

__doc__ = 'Represents a real-time data-streaming service worker.' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'lit.sdk.services.stream' class #

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__init__(team, name) method descriptor #

Initializes a new instance of Stream.

Parameters:

Name Type Description Default
team str

The name of the team that owns the Stream.

required
name str

The name of the Stream.

required

restart() method descriptor #

Restarts the stream.

Returns:

Type Description
ServiceActionStatus

An object containing the status of the operation.

Examples:

>>> stream = Stream("contoso", "bitcoin")
>>> stream.restart()
{'status': 'SUCCESS', 'message': 'Stream restarted successfully.'}

start() method descriptor #

Starts the stream.

Returns:

Type Description
ServiceActionStatus

An object containing the status of the operation.

Examples:

>>> stream = Stream("contoso", "bitcoin")
>>> stream.start()
{'status': 'SUCCESS', 'message': 'Stream started successfully.'}

status() method descriptor #

Retrieves the status of the stream.

Returns:

Type Description
ServiceStatus

An object containing the current status of the stream.

Examples:

>>> stream = Stream("contoso", "bitcoin")
>>> stream.status()
{'name': 'bitcoin', 'active': 'active', 'status': 'running', 'started': 'Wed 2024-07-24 13:32:23 CDT', 'pid': 3687984, 'tasks': 5, 'memory': '3.3G'}

stop() method descriptor #

Stops the stream.

Returns:

Type Description
ServiceActionStatus

An object containing the status of the operation.

Examples:

>>> stream = Stream("contoso", "bitcoin")
>>> stream.stop()
{'status': 'SUCCESS', 'message': 'Stream stopped successfully.'}

lit_error_handler(error_type=LitError) method descriptor #

A decorator function that catches and re-raises exceptions with a custom error type.

Parameters:

Name Type Description Default
error_type Type[LitError]

The type of exception to raise. Defaults to LitError.

LitError

Returns:

Type Description
Callable

A wrapper function that catches exceptions and raises the specified error type.

Examples:

>>> @lit_error_handler()
... def raises_error():
...     assert False
...
>>> raises_error()
Traceback (most recent call last):
File "/opt/lit/src/lit/sdk/errors.py", line 130, in wrapper
File "<stdin>", line 3, in raises_error
AssertionError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/lit/src/lit/sdk/errors.py", line 132, in wrapper
lit.sdk.errors.LitError: An error occurred in function 'raises_error'