Operators#

What is an Operator?#

Operators are pieces of code that will perform some action on data coming from other operators. They are the bits of code that can be chained together to build a data pipeline.

Getting operators into podman storage#

Premade operators are currently located in the operators/ directory. To run an operator pipeline, podman has to know about it. Can be done using bake or podman build.

docker bake + podman pull#

We can use use docker bake and podman pull for operators in the bake hcl file.

cd interactEM
make setup # no need to run this if you already have

# Build all operators and pull them into podman storage
make operators

# Or build a specific operator and pull it into podman storage
make operator target=center-of-mass-partial

For faster iteration during development, you can omit the --build-base flag by using the lower-level bake.sh script directly:

./operators/bake.sh --push-local --pull-local --target center-of-mass-partial

If you make an operator, then you can add it to the hcl file and it will be a part of this build process. This doesn’t happen automatically, you will have to manually edit that file - look at the format and use your judgment (or ask an agent).

podman build#

If you create an operator using the instructions below, you can also use a regular podman build.

For example, if we have an operator with the name my-operator in the my-operator/ directory you can build it like this:

cd my-operator
# tag should match what is found in the "image" field in your operator.json
podman build -t ghcr.io/nersc/interactem/my-operator:latest .  

Creating an operator#

At the very least, three files are required to make an operator:

  1. run.py

  2. Containerfile

  3. Specification

As a part of our cli tool, we have some templates that can get you started. Using uv:

uv sync
uv run interactem operator new

or poetry:

poetry install
poetry run interactem operator new

Or if you are feeling dangerous:

pip install . 
interactem operator new

This will generate the files you need under whatever directory you specify. The default for the output directory output is operators/, so if you run the commands above from the git root, it will appear in the operators directory (interactEM/operators/). You can then refresh the operators in the frontend, and it will be updated with your new operator.

Note

Operators will be updated ONLY in the operators panel during a refresh. Your existing pipelines will NOT be updated. One should manually replace the operators in existing pipelines with the refreshed operator.

You can build the operators as discussed above.

run.py#

We need to define the code that will operate on incoming messages. The incoming messages will come in as BytesMessages, which contain data, metadata, and tracking information.

Here is an example of the run.py file for the partial center of mass operator. You can see that the parameters (defined in the spec, see below).

Prior to building the operator, one can quickly check if run.py will be importable by python by running python -m py_compile run.py.

@operator
def com_partial(
    inputs: BytesMessage | None, parameters: dict[str, Any]
) -> BytesMessage | None:
    if not inputs:
        logger.warning("No input provided to the subtract operator.")
        return None

    center = None
    init_center_x = parameters.get("init_center_x")
    init_center_y = parameters.get("init_center_y")
    if init_center_x is not None and init_center_y is not None:
        center = (init_center_x, init_center_y)

    crop = None
    crop_to_x = parameters.get("crop_to_x")
    crop_to_y = parameters.get("crop_to_y")
    if crop_to_x is not None and crop_to_y is not None:
        crop = (crop_to_x, crop_to_y)

    batch = BatchedFrames.from_bytes_message(inputs)
    com = com_sparse(batch, init_center=center, crop_to=crop, replace_nans=False)

    return COMPartial(header=batch.header, array=com).to_bytes_message()

Containerfile#

We need to use the operator base image the parent image for our Containerfile. In this case, we are using the distiller-streaming image as the base, as it contains a lot of utilities for processing 4D Camera frames.

ghcr.io/nersc/interactem/distiller-streaming#
FROM ghcr.io/nersc/interactem/operator

WORKDIR /app
COPY ./pyproject.toml ./poetry.lock ./README.md /app/

# Base image installs interactem-core at /interactem/core. 
# Locally, the project uses ../../backend/core which
# resolves to /backend/core inside the container. We symlink it here 
# so poetry can find it.
RUN mkdir -p /backend && \
	if [ -d /interactem/core ]; then \
        ln -sfn /interactem/core /backend/core; \
    fi && \
    if [ -d /interactem/operators ]; then \
        ln -sfn /interactem/operators /backend/operators; \
    fi

RUN poetry install --no-root --without dev

COPY ./distiller_streaming/ /app/distiller_streaming/
RUN poetry install --only-root
ghcr.io/nersc/interactem/center-of-mass-partial#
FROM ghcr.io/nersc/interactem/distiller-streaming

COPY ./run.py /app/run.py

Specification#

Operators specifications need to be defined in a json file. The specification can be found in spec.py.

Here’s an example of an operator.json for the partial center of mass operator.

Specification model#
class OperatorSpec(BaseModel):
    id: OperatorSpecID
    label: str  # Human readable name of the operator
    description: str  # Human readable description of the operator
    image: str  # Contain image for operator
    inputs: list[OperatorSpecInput] | None = None  # List of inputs
    outputs: list[OperatorSpecOutput] | None = None  # List of outputs
    parameters: list[OperatorSpecParameter] | None = None  # List of parameters
    tags: list[OperatorSpecTag] | None = None  # List of tags to match on
    parallel_config: ParallelConfig | None = None  # Parallel execution config
Example operator.json#
{
  "id": "70dd71a7-5ebf-4515-8bf9-941d1284328c",
  "image": "ghcr.io/nersc/interactem/center-of-mass-partial",
  "label": "Partial Center of Mass",
  "description": "Calculates the center of mass for a frame",
  "inputs": [
    {
      "name": "in",
      "label": "The input",
      "type": "frame",
      "description": "Input frame"
    }
  ],
  "outputs": [
    {
      "name": "com_partial",
      "label": "The output",
      "type": "com_partial",
      "description": "Partial center of mass"
    }
  ],
  "parameters": [
    {
      "name": "crop_to_x",
      "label": "Crop To X",
      "type": "int",
      "default": "255",
      "description": "X-coordinate to crop to",
      "required": false
    },
    {
      "name": "crop_to_y",
      "label": "Crop To Y",
      "type": "int",
      "default": "255",
      "description": "Y-coordinate to crop to",
      "required": false
    },
    {
      "name": "init_center_x",
      "label": "Initial Center X",
      "default": "255",
      "type": "int",
      "description": "Initial Center X-coordinate for center of mass calculation",
      "required": false
    },
    {
      "name": "init_center_y",
      "label": "Initial Center Y",
      "default": "255",
      "type": "int",
      "description": "Initial Center Y-coordinate for center of mass calculation",
      "required": false
    }
  ],
  "parallel_config": {
    "type": "embarrassing"
  }
}