Authoring operators#
What is an Operator?#
Operators are pieces of code that will perform some action on data coming from other operators. They are the bits of code that can be chained together to build a data pipeline.
Creating an operator#
At the very least, three files are required to make an operator:
As a part of our cli
tool, we have some templates that can get you started. Using uv
:
cd cli
uv venv .venv
uv pip install .
source .venv/bin/activate
interactem operator new
One can similarly do this with poetry install
(either way should work), or with a plain pip install .
.
run.py
file#
We need to define the code that will operate on incoming messages. The incoming messages will come in as BytesMessages, which contain data, metadata, and tracking information.
Here is an example of the run.py
file for the partial center of mass operator. You can see that the parameters (defined in the spec, see below).
@operator
def com_partial(
inputs: BytesMessage | None, parameters: dict[str, Any]
) -> BytesMessage | None:
if not inputs:
logger.warning("No input provided to the subtract operator.")
return None
center = None
init_center_x = parameters.get("init_center_x")
init_center_y = parameters.get("init_center_y")
if init_center_x is not None and init_center_y is not None:
center = (init_center_x, init_center_y)
crop = None
crop_to_x = parameters.get("crop_to_x")
crop_to_y = parameters.get("crop_to_y")
if crop_to_x is not None and crop_to_y is not None:
crop = (crop_to_x, crop_to_y)
batch = BatchedFrames.from_bytes_message(inputs)
com = com_sparse(batch, init_center=center, crop_to=crop, replace_nans=False)
return COMPartial(header=batch.header, array=com).to_bytes_message()
We need to define the code that will operate on incoming messages. The incoming messages will come in as BytesMessages, which contain data, metadata, and tracking information.
Here is an example of the run.py
file for the partial center of mass operator. You can see that the parameters (defined in the spec, see below).
Containerfile#
We need to use the operator base image (ghcr.io/nersc/interactem/operator
) the parent image for our Containerfile
. In this case, we are using the distiller-streaming
image as the base, as it contains a lot of utilities for processing 4D Camera frames.
FROM ghcr.io/nersc/interactem/operator
WORKDIR /app
COPY ./pyproject.toml ./poetry.lock ./README.md /app/
RUN poetry install --no-root --without test
COPY ./distiller_streaming/ /app/distiller_streaming/
RUN poetry install --without test
FROM ghcr.io/nersc/interactem/distiller-streaming
COPY ./run.py /app/run.py
We need to use the operator base image (ghcr.io/nersc/interactem/operator
) the parent image for our Containerfile
. In this case, we are using the distiller-streaming
image as the base, as it contains a lot of utilities for processing 4D Camera frames.
Specification#
Operators specifications need to be defined in a json
file. The specification can be found in spec.py.
Here’s an example of an operator.json
for the partial center of mass operator.
class OperatorSpec(BaseModel):
id: OperatorSpecID
label: str # Human readable name of the operator
description: str # Human readable description of the operator
image: str # Contain image for operator
inputs: list[OperatorSpecInput] | None = None # List of inputs
outputs: list[OperatorSpecOutput] | None = None # List of outputs
parameters: list[OperatorSpecParameter] | None = None # List of parameters
tags: list[OperatorSpecTag] | None = None # List of tags to match on
parallel_config: ParallelConfig | None = None # Parallel execution config
operator.json
#{
"id": "70dd71a7-5ebf-4515-8bf9-941d1284328c",
"image": "ghcr.io/nersc/interactem/center-of-mass-partial",
"label": "Partial Center of Mass",
"description": "Calculates the center of mass for a frame",
"inputs": [
{
"name": "in",
"label": "The input",
"type": "frame",
"description": "Input frame"
}
],
"outputs": [
{
"name": "com_partial",
"label": "The output",
"type": "com_partial",
"description": "Partial center of mass"
}
],
"parameters": [
{
"name": "crop_to_x",
"label": "Crop To X",
"type": "int",
"default": "255",
"description": "X-coordinate to crop to",
"required": false
},
{
"name": "crop_to_y",
"label": "Crop To Y",
"type": "int",
"default": "255",
"description": "Y-coordinate to crop to",
"required": false
},
{
"name": "init_center_x",
"label": "Initial Center X",
"default": "255",
"type": "int",
"description": "Initial Center X-coordinate for center of mass calculation",
"required": false
},
{
"name": "init_center_y",
"label": "Initial Center Y",
"default": "255",
"type": "int",
"description": "Initial Center Y-coordinate for center of mass calculation",
"required": false
}
],
"parallel_config": {
"type": "embarrassing"
}
}
Building locally#
Operators are located (for now) in the operators/ directory. After you add an operator.json
to any subdirectory of operators
and refresh your frontend, it will appear in the list of operators.
You still have to build these operators and make sure that your local podman
can see them.
MacOS#
Get
docker desktop
andpodman desktop
.Set up docker local docker registry by running:
docker run -d -p 5001:5000 --restart always --name docker-registry registry:3
Use bake.sh to build all containers with docker. This includes base image, operator, and distiller-streaming. You should do the following
./bake --push-local --build-base
This will push everything to the local registry, instead of pushing up to GitHub packages. You can also omit
--build-base
to avoid building base images for faster iteration.Set your
.env
file in the operator directory to have the correct podman socket (see.env.example
)Use poetry environment from root directory pyproject.toml and run the following:
poetry run python pull_images_from_bake.py
This will pull local registry images into podman and tag them appropriately with
pull_images_from_bake.py
. This also runs at the end ofbake.sh
if--pull-local
is given.
The build_all.sh script was used before, but it is cumbersome so I am not updating it.