Automation with Python

You can automate your Lightly Worker workflows directly from Python. This allows you to flawlessly integrate the Lightly Worker in your machine learning pipeline. Lightly also integrates with pipeline tools such as, for example, Metaflow.

Prerequisites

In order to use the Lightly Worker in any automated workflow, the following prerequisites are necessary:

Automation with Python

The Lightly Worker is shipped as a Docker image. Therefore, if you want to run the Lightly Worker from Python, you need to install the Docker Python API first:

pip install docker

Now, you can use the following script run the Lightly Worker directly from Python. Don't forget to change the settings according to your setup!

The script will execute the following steps:

📘

Example for S3

Note that the code below shows how to automate the Lightly Worker if your data is stored on S3. Of course, you can also use other cloud storage providers. Simply replace the lines configuring the datasource with your preferred setup.

import time
import contextlib
from pathlib import Path
from collections.abc import Iterator
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")

import docker
from docker.models.containers import Container

from lightly.api import ApiWorkflowClient
from lightly.openapi_generated.swagger_client import DatasetType
from lightly.openapi_generated.swagger_client import DatasourcePurpose


# Settings
LIGHTLY_TOKEN = "YOUR_LIGHTLY_TOKEN"
IMAGE_NAME: str = "lightly/worker:latest"
DATASET_NAME: str = f"My_Dataset_{now}"
# Lightly Worker config
SELECTION_CONFIG = {
    "proportion_samples": 0.5,
    "strategies": [
        {"input": {"type": "EMBEDDINGS"}, "strategy": {"type": "DIVERSITY"}}
    ],
}
WORKER_CONFIG = {}
# S3
S3_REGION = "YOUR_S3_REGION"
S3_ACCESS_KEY_ID = "YOUR_S3_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY = "YOUR_S3_SECRET_ACCESS_KEY"
S3_BUCKET_PATH = "YOUR_S3_BUCKET_PATH"
# Worker
WORKER_ID = "YOUR_WORKER_ID"
WORKER_LABEL = "YOUR_WORKER_LABEL"
# Outputs
OUTPUT_DIR = Path()


@contextlib.contextmanager
def start_worker() -> Iterator[Container]:
    """Context manager starting a new container.

    The container is killed once the context is exited.

    """
    docker_client = docker.from_env()
    volumes = [f"{OUTPUT_DIR.resolve()}:/home/output_dir"]
    container = docker_client.containers.run(
        IMAGE_NAME,
        f"token={LIGHTLY_TOKEN} worker.worker_id={WORKER_ID}",
        detach=True,
        labels={"lightly_worker_label": WORKER_LABEL},
        volumes=volumes,
    )
    try:
        yield container
    finally:
        try:
            container.kill()
        except docker.errors.APIError:
            # if a container was killed from outside, we don't care
            pass


if __name__ == "__main__":

    # Create a dataset
    client = ApiWorkflowClient(token=LIGHTLY_TOKEN)
    client.create_dataset(DATASET_NAME, DatasetType.VIDEOS)

    # Configure s3
    client.set_s3_config(
        resource_path=S3_BUCKET_PATH,
        region=S3_REGION,
        access_key=S3_ACCESS_KEY_ID,
        secret_access_key=S3_SECRET_ACCESS_KEY,
        purpose=DatasourcePurpose.INPUT,
    )
    client.set_s3_config(
        resource_path=S3_BUCKET_PATH,
        region=S3_REGION,
        access_key=S3_ACCESS_KEY_ID,
        secret_access_key=S3_SECRET_ACCESS_KEY,
        purpose=DatasourcePurpose.LIGHTLY,
    )

    # Schedule a run
    scheduled_run_id = client.schedule_compute_worker_run(
        worker_config=WORKER_CONFIG,
        selection_config=SELECTION_CONFIG,
        runs_on=[WORKER_LABEL],
    )

    # Start the Lightly Worker and process the job
    with start_worker():
        last_run_info = None
        no_update_count = 0
        while True:
            run_info = client.get_compute_worker_run_info(scheduled_run_id=scheduled_run_id)
            if run_info.in_end_state():
                assert run_info.ended_successfully(), "Run did not end successfully"
                break
            if run_info != last_run_info:
                no_update_count = 0
            else:
                no_update_count += 1
                if no_update_count >= 10:
                    raise RuntimeError(
                        f"Test timout: no run_info update for at least 5 minutes\n"
                        f"last_run_info: {str(last_run_info)}, run_info: {str(run_info)}"
                    )
            last_run_info = run_info
            time.sleep(30)

    # Download artifacts
    run = client.get_compute_worker_run_from_scheduled_run(scheduled_run_id=scheduled_run_id)
    client.download_compute_worker_run_artifacts(run=run, output_dir=OUTPUT_DIR / "artifacts")