Interface#

Server#

MOSEC server interface.

This module provides a way to define the service components for machine learning model serving.

Dynamic Batching#

The user may enable the dynamic batching feature for any stage when the corresponding worker is appended, by setting the append_worker(max_batch_size).

Multiprocess#

The user may spawn multiple processes for any stage when the corresponding worker is appended, by setting the append_worker(num).

IPC Wrapper#

The user may wrap the inter-process communication to use shared memory, e.g. pyarrow plasma, by providing the IPC Wrapper for the server.

class mosec.server.Server(ipc_wrapper=None)[source]#

MOSEC server interface.

It allows users to sequentially append workers they implemented, builds the workflow pipeline automatically and starts up the server.

__init__(ipc_wrapper=None)[source]#

Initialize a MOSEC Server.

Parameters:

ipc_wrapper (Union[IPCWrapper, partial, None]) – wrapper function (before and after) IPC

register_daemon(name, proc)[source]#

Register a daemon to be monitored.

Parameters:
  • name (str) – the name of this daemon

  • proc (Popen) – the process handle of the daemon

append_worker(worker, num=1, max_batch_size=1, max_wait_time=0, start_method='spawn', env=None, timeout=0)[source]#

Sequentially appends workers to the workflow pipeline.

Parameters:
  • worker (Type[Worker]) – the class you inherit from Worker which implements the forward

  • num (int) – the number of processes for parallel computing (>=1)

  • max_batch_size (int) – the maximum batch size allowed (>=1), will enable the dynamic batching if it > 1

  • max_wait_time (int) – the maximum wait time (millisecond) for dynamic batching, needs to be used with max_batch_size to enable the feature. If not configure, will use the CLI argument –wait (default=10ms)

  • start_method (str) – the process starting method (“spawn” or “fork”)

  • env (Optional[List[Dict[str, str]]]) – the environment variables to set before starting the process

  • timeout (int) – the timeout (second) for each worker forward processing (>=1)

run()[source]#

Start the mosec model server.

Worker#

MOSEC worker interface.

This module provides the interface to define a worker with such behaviors:

  1. initialize

  2. serialize/deserialize data to/from another worker

  3. serialize/deserialize data to/from the client side

  4. data processing

class mosec.worker.Worker[source]#

MOSEC worker interface.

It provides default IPC (de)serialization methods, stores the worker meta data including its stage and maximum batch size, and leaves the forward method to be implemented by the users.

By default, we use JSON encoding. But users are free to customize via simply overriding the deserialize method in the first stage (we term it as ingress stage) and/or the serialize method in the last stage (we term it as egress stage).

For the encoding customization, there are many choices including MessagePack, Protocol Buffer and many other out-of-the-box protocols. Users can even define their own protocol and use it to manipulate the raw bytes! A naive customization can be found in this PyTorch example.

__init__()[source]#

Initialize the worker.

This method doesn’t require the child class to override.

serialize_ipc(data)[source]#

Define IPC serialization method.

Parameters:

data (Any) – returned data from forward()

Return type:

bytes

deserialize_ipc(data)[source]#

Define IPC deserialization method.

Parameters:

data (bytes) – input data for forward()

Return type:

Any

property stage: str#

Return the stage name.

property max_batch_size: int#

Return the maximum batch size.

property worker_id: int#

Return the ID of this worker instance.

This property returns the worker ID in the range of [1, … , num] (num as configured in append_worker(num)) to differentiate workers in the same stage.

serialize(data)[source]#

Serialize the last stage (egress).

Default response serialization method: JSON.

Check mosec.mixin for more information.

Parameters:

data (Any) – the same type as the output of the forward()

Return type:

bytes

Returns:

the bytes you want to put into the response body

Raises:

EncodingError – if the data cannot be serialized with JSON

deserialize(data)[source]#

Deserialize the first stage (ingress).

Default request deserialization method: JSON.

Check mosec.mixin for more information.

Parameters:

data (bytes) – the raw bytes extracted from the request body

Return type:

Any

Returns:

the same type as the input of the forward()

Raises:

DecodingError – if the data cannot be deserialized with JSON

abstract forward(data)[source]#

Model inference, data processing or computation logic.

Parameters:

data (Any) – input data to be processed

Return type:

Any

Must be overridden by the subclass.

If any code in this forward() needs to access other resources (e.g. a model, a memory cache, etc.), the user should initialize these resources as attributes of the class in the __init__.

Note

For a stage that enables dynamic batching, please return the results that have the same length and the same order of the input data.

Note

  • for a single-stage worker, data will go through

    <deserialize> -> <forward> -> <serialize>

  • for a multi-stage worker that is neithor ingress not egress, data

    will go through <deserialize_ipc> -> <forward> -> <serialize_ipc>

Errors#

Exceptions used in the Worker.

Suppose the input dataflow of our model server is as follows:

bytes -> deserialize -> data -> parse -> valid data

If the raw bytes cannot be successfully deserialized, the DecodingError is raised; if the decoded data cannot pass the validation check (usually implemented by users), the ValidationError should be raised.

exception mosec.errors.MosecError[source]#

Bases: Exception

Mosec basic exception.

exception mosec.errors.ClientError[source]#

Bases: MosecError

Client side error.

This error indicates that the server cannot or will not process the request due to something that is perceived to be a client error. It will return the details to the client side with HTTP 400.

exception mosec.errors.ServerError[source]#

Bases: MosecError

Server side error.

This error indicates that the server encountered an unexpected condition that prevented it from fulfilling the request. It will return the details to the client side with HTTP 500.

Attention: be careful about the returned message since it may contain some sensitive information. If you don’t want to return the details, just raise an exception that is not inherited from mosec.errors.MosecError.

exception mosec.errors.EncodingError[source]#

Bases: ServerError

Serialization error.

The EncodingError should be raised in user-implemented codes when the serialization for the response bytes fails. This error will set to status code to HTTP 500 and show the details in the response.

exception mosec.errors.DecodingError[source]#

Bases: ClientError

De-serialization error.

The DecodingError should be raised in user-implemented codes when the de-serialization for the request bytes fails. This error will set the status code to HTTP 400 in the response.

exception mosec.errors.ValidationError[source]#

Bases: MosecError

Request data validation error.

The ValidationError should be raised in user-implemented codes, where the validation for the input data fails. Usually, it should be put after the data de-serialization, which converts the raw bytes into structured data. This error will set the status code to HTTP 422 in the response.

exception mosec.errors.MosecTimeoutError[source]#

Bases: BaseException

Exception raised when a MOSEC worker operation times out.

If a bug in the forward code causes the worker to hang indefinitely, a timeout can be used to ensure that the worker eventually returns control to the main thread program. When a timeout occurs, the MosecTimeout exception is raised. This exception can be caught and handled appropriately to perform any necessary cleanup tasks or return a response indicating that the operation timed out.

Note that MosecTimeout is a subclass of BaseException, not Exception. This is because timeouts should not be caught and handled in the same way as other exceptions. Instead, they should be handled in a separate except block which isn’t designed to break the working loop.

Mixin#

Plugins#

Wrapper layer for IPC between workers.

This will be called before sending data or after receiving data through the Protocol.

class mosec.ipc.IPCWrapper[source]#

This public class defines the mosec IPC wrapper plugin interface.

The wrapper has to implement at least put and get methods.

abstract put(data)[source]#

Put bytes to somewhere to get ids, which are sent via protocol.

Parameters:

data (List[bytes]) – List of bytes data.

Return type:

List[bytes]

Returns: List of bytes ID.

abstract get(ids)[source]#

Get bytes from somewhere by ids, which are received via protocol.

Parameters:

ids (List[bytes]) – List of bytes ID.

Return type:

List[bytes]

Returns: List of bytes data.