Metaflow is a tool originally developed by Netflix. It allows data scientists to scale their pipelines from a simple script on their local machine to a production ready end-to-end workflow. The compute can be handled by AWS Batch, AWS Step Functions, or a custom kubernetes cluster.

This guide demonstrates how the Lightly Worker can be used with Metaflow on AWS Batch.

Prerequisites

In order to use the Lightly Worker in any automated workflow, the following prerequisites are necessary:

Wrap the Lightly Worker

Metaflow expects the Lightly Worker Docker image to have no explicit entrypoint. Since the Lightly Worker comes with an entrypoint, you'll have to wrap the image in your own and host it on your AWS repository. You can use the following Dockerfile to get the desired image:

FROM lightly/worker:latest

# Install additional dependencies here if required

# Remove the entrypoint from the Lightly Worker
ENTRYPOINT []

Next, build the Docker image, tag it, and push it to your AWS repository. Don't forget the replace {YOUR_IDENTIFIER} in the commands below:

docker build -t my-lightly/worker
docker tag my-lightly/worker:latest {YOUR_IDENTIFIER}.amazonaws.com/my-lightly/worker:latest
docker push {YOUR_IDENTIFIER}.amazonaws.com/my-lightly/worker:latest

Automation with Metaflow

For this guide you need to have Metaflow deployed to AWS or kubernetes. If you haven't done that already, follow the instructions here. Additionally, you need to install the Metaflow Python client:

pip install metaflow

Now, you can use the following flow to run the Lightly Worker directly on AWS Batch. Don't forget to change the settings according to your setup!

The flow will execute the following steps:

  • Create a Dataset
  • Configure a Datasource
  • Schedule a Run
  • Start the Lightly Worker and process the scheduled run

📘

Example for S3

Note that the code below shows how to automate the Lightly Worker if your data is stored on S3. Of course, you can also use other cloud storage providers. Simply replace the lines configuring the datasource with your preferred setup.

import os
import time
from pathlib import Path
from omegaconf import OmegaConf
from multiprocessing import Process
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")

# Metaflow
from metaflow import FlowSpec, step, batch, environment

# Settings
LIGHTLY_TOKEN = "YOUR_LIGHTLY_TOKEN"
IMAGE_NAME: str = "my-lightly/worker:latest"
DATASET_NAME: str = f"My_Dataset_{now}"
# Lightly Worker config
SELECTION_CONFIG = {
    "proportion_samples": 0.5,
    "strategies": [
        {"input": {"type": "EMBEDDINGS"}, "strategy": {"type": "DIVERSITY"}}
    ],
}
WORKER_CONFIG = {}
# S3
S3_REGION = "YOUR_S3_REGION"
S3_ACCESS_KEY_ID = "YOUR_S3_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY = "YOUR_S3_SECRET_ACCESS_KEY"
S3_BUCKET_PATH = "YOUR_S3_BUCKET_PATH"
# Worker
WORKER_ID = "YOUR_WORKER_ID"
WORKER_LABEL = "YOUR_WORKER_LABEL"

# Constants (do not change)
HOME_DIR = Path("/home/boris")
HYDRA_DIR = Path(".hydra")
METAFLOW_DIR = Path(".metaflow")
RESOURCE_PATH = Path("lightly_worker/src/lightly_worker/resources")
DOCKER_CONFIG = Path("docker/docker.yaml")
LIGHTLY_CONFIG = Path("lightly/lightly.yaml")


class LightlyWorkerFlow(FlowSpec):
    """Flow which processes videos in an S3 bucket using the Lightly Worker. """

    @batch(
        image=IMAGE_NAME,
        cpu=8,
        gpu=1,
        memory=120000,
    )
    @environment(
        vars={
            "LIGHTLY_TOKEN": LIGHTLY_TOKEN,
        }
    )
    @step
    def start(self):
        """Schedules a job and processes it with the Lightly Worker. """

        # Set up the environment
        os.chdir(HOME_DIR)
        HYDRA_DIR.mkdir(exist_ok=True)
        METAFLOW_DIR.mkdir(exist_ok=True)
        try:
            # Wrap this in a try-catch to make the linter happy
            from lightly_worker import main
            from lightly.api import ApiWorkflowClient
            from lightly.openapi_generated.swagger_client import DatasetType
            from lightly.openapi_generated.swagger_client import DatasourcePurpose
        except ImportError:
            pass

        # Create a dataset
        client = ApiWorkflowClient()
        client.create_dataset(DATASET_NAME, DatasetType.VIDEOS)
        self.dataset_id = client.dataset_id

        # Configure s3
        client.set_s3_config(
            resource_path=S3_BUCKET_PATH,
            region=S3_REGION,
            access_key=S3_ACCESS_KEY_ID,
            secret_access_key=S3_SECRET_ACCESS_KEY,
            purpose=DatasourcePurpose.INPUT,
        )
        client.set_s3_config(
            resource_path=S3_BUCKET_PATH,
            region=S3_REGION,
            access_key=S3_ACCESS_KEY_ID,
            secret_access_key=S3_SECRET_ACCESS_KEY,
            purpose=DatasourcePurpose.LIGHTLY,
        )

        # Schedule a run
        self.scheduled_run_id = client.schedule_compute_worker_run(
            worker_config=WORKER_CONFIG,
            selection_config=SELECTION_CONFIG,
            runs_on=[WORKER_LABEL]
        )

        # Run the Lightly Worker
        cfg = OmegaConf.load(RESOURCE_PATH / DOCKER_CONFIG)
        cfg.lightly = OmegaConf.load(RESOURCE_PATH / LIGHTLY_CONFIG)
        cfg.worker.worker_id = WORKER_ID

        process = Process(target=main.main, args=(cfg,))
        process.start()

        # Monitor the scheduled run
        last_run_info = None
        while True:
            run_info = client.get_compute_worker_run_info(
              scheduled_run_id=self.scheduled_run_id)
            if run_info.in_end_state():
                assert run_info.ended_successfully(), "Run did not end successfully"
                break
            if run_info != last_run_info:
                no_update_count = 0
            else:
                no_update_count += 1
                if no_update_count >= 10:
                    raise RuntimeError(
                        f"Test timout: no run_info update for at least 5 minutes\n"
                        f"last_run_info: {str(last_run_info)}, run_info: {str(run_info)}"
                    )
            last_run_info = run_info
            time.sleep(30)

        # Finish up
        process.terminate()
        self.next(self.end)


    @step
    def end(self):
        """You can access selected images using the Lightly dataset_id. """
        
        # Continue working with the selected images using the dataset_id
        print(f"My Lightly dataset_id is {self.dataset_id}")


if __name__ == "__main__":
    LightlyWorkerFlow()