Automation with Python

You can automate your Lightly Worker workflows directly from Python. This allows you to seamlessly integrate the Lightly Worker in your machine learning pipeline. Lightly also integrates with pipeline tools such as, for example, Metaflow.

Prerequisites

In order to use the Lightly Worker in any automated workflow, the following prerequisites are necessary:

Automation with Python

The Lightly Worker is shipped as a Docker image. Therefore, if you want to run the Lightly Worker from Python, you need to install the Docker Python API first:

pip install docker

Now, you can use the following script run the Lightly Worker directly from Python. Don't forget to change the settings according to your setup!

The script will execute the following steps:

πŸ“˜

Example for S3

Note that the code below shows how to automate the Lightly Worker if your data is stored on S3. Of course, you can also use other cloud storage providers. Simply replace the lines configuring the datasource with your preferred setup.

import time
import contextlib
from pathlib import Path
from collections.abc import Iterator
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")

import docker
from docker.models.containers import Container

from lightly.api import ApiWorkflowClient
from lightly.openapi_generated.swagger_client import DatasetType
from lightly.openapi_generated.swagger_client import DatasourcePurpose


# Settings
LIGHTLY_TOKEN = "YOUR_LIGHTLY_TOKEN"
IMAGE_NAME: str = "lightly/worker:latest"
DATASET_NAME: str = f"My_Dataset_{now}"
# Lightly Worker config
SELECTION_CONFIG = {
    "proportion_samples": 0.5,
    "strategies": [
        {"input": {"type": "EMBEDDINGS"}, "strategy": {"type": "DIVERSITY"}}
    ],
}
WORKER_CONFIG = {
    "shutdown_when_job_finished": True,  # Important!
}
# S3
S3_REGION = "YOUR_S3_REGION"
S3_ACCESS_KEY_ID = "YOUR_S3_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY = "YOUR_S3_SECRET_ACCESS_KEY"
S3_BUCKET_PATH = "YOUR_S3_BUCKET_PATH"
# Worker
WORKER_ID = "YOUR_WORKER_ID"
WORKER_LABEL = "YOUR_WORKER_LABEL"
# Outputs
OUTPUT_DIR = Path()


if __name__ == "__main__":

    # Create a dataset
    client = ApiWorkflowClient(token=LIGHTLY_TOKEN)
    client.create_dataset(DATASET_NAME, DatasetType.VIDEOS)

    # Configure S3
    client.set_s3_config(
        resource_path=S3_BUCKET_PATH,
        region=S3_REGION,
        access_key=S3_ACCESS_KEY_ID,
        secret_access_key=S3_SECRET_ACCESS_KEY,
        purpose=DatasourcePurpose.INPUT,
    )
    client.set_s3_config(
        resource_path=S3_BUCKET_PATH,
        region=S3_REGION,
        access_key=S3_ACCESS_KEY_ID,
        secret_access_key=S3_SECRET_ACCESS_KEY,
        purpose=DatasourcePurpose.LIGHTLY,
    )

    # Schedule a run
    scheduled_run_id = client.schedule_compute_worker_run(
        worker_config=WORKER_CONFIG,
        selection_config=SELECTION_CONFIG,
        runs_on=[WORKER_LABEL],
    )

    # Start the Lightly Worker and process the job
    docker_client = docker.from_env()
    volumes = [f"{OUTPUT_DIR.resolve()}:/home/output_dir"]
    console_logs = docker_client.containers.run(
        IMAGE_NAME,
        f"token={LIGHTLY_TOKEN} worker.worker_id={WORKER_ID}",
        labels={"lightly_worker_label": WORKER_LABEL},
        volumes=volumes,
    )

    # Download artifacts
    run = client.get_compute_worker_run_from_scheduled_run(scheduled_run_id=scheduled_run_id)
    client.download_compute_worker_run_artifacts(run=run, output_dir=OUTPUT_DIR / "artifacts")

πŸ“˜

It is important to setshutdown_when_job_finished=Truein worker config when scheduling the job! Otherwise, the worker will keep running and listen for a new job.