Automation with Python
You can automate your LightlyOne Worker workflows directly from Python. This allows you to seamlessly integrate the LightlyOne Worker in your machine learning pipeline. LightlyOne also integrates with pipeline tools such as, for example, Metaflow.
Prerequisites
In order to use the LightlyOne Worker in any automated workflow, the following prerequisites are necessary:
- You need to have the LightlyOne Worker installed.
- You need to have the LightlyOne Worker registered with a label of your choice.
Automation with Python
The LightlyOne Worker is shipped as a Docker image. Therefore, if you want to run the LightlyOne Worker from Python, you need to install the Docker Python API first:
pip install docker
Now, you can use the following script run the LightlyOne Worker directly from Python. Don't forget to change the settings according to your setup!
The script will execute the following steps:
- Create a Dataset
- Configure a Datasource
- Schedule a Run
- Start the LightlyOne Worker and process the scheduled run
- Download Artifacts
Example for S3
Note that the code below shows how to automate the LightlyOne Worker if your data is stored on S3. Of course, you can also use other cloud storage providers. Simply replace the lines configuring the datasource with your preferred setup.
import time
import contextlib
from pathlib import Path
from collections.abc import Iterator
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
import docker
from docker.models.containers import Container
from lightly.api import ApiWorkflowClient
from lightly.openapi_generated.swagger_client import DatasetType
from lightly.openapi_generated.swagger_client import DatasourcePurpose
# Settings
LIGHTLY_TOKEN = "YOUR_LIGHTLY_TOKEN"
IMAGE_NAME: str = "lightly/worker:latest"
DATASET_NAME: str = f"My_Dataset_{now}"
# LightlyOne Worker config
SELECTION_CONFIG = {
"proportion_samples": 0.5,
"strategies": [
{"input": {"type": "EMBEDDINGS"}, "strategy": {"type": "DIVERSITY"}}
],
}
WORKER_CONFIG = {
"shutdown_when_job_finished": True, # Important!
}
# S3
S3_REGION = "YOUR_S3_REGION"
S3_ACCESS_KEY_ID = "YOUR_S3_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY = "YOUR_S3_SECRET_ACCESS_KEY"
S3_BUCKET_PATH = "YOUR_S3_BUCKET_PATH"
# Worker
WORKER_ID = "YOUR_WORKER_ID"
WORKER_LABEL = "YOUR_WORKER_LABEL"
# Outputs
OUTPUT_DIR = Path()
if __name__ == "__main__":
# Create a dataset
client = ApiWorkflowClient(token=LIGHTLY_TOKEN)
client.create_dataset(DATASET_NAME, DatasetType.VIDEOS)
# Configure S3
client.set_s3_config(
resource_path=S3_BUCKET_PATH,
region=S3_REGION,
access_key=S3_ACCESS_KEY_ID,
secret_access_key=S3_SECRET_ACCESS_KEY,
purpose=DatasourcePurpose.INPUT,
)
client.set_s3_config(
resource_path=S3_BUCKET_PATH,
region=S3_REGION,
access_key=S3_ACCESS_KEY_ID,
secret_access_key=S3_SECRET_ACCESS_KEY,
purpose=DatasourcePurpose.LIGHTLY,
)
# Schedule a run
scheduled_run_id = client.schedule_compute_worker_run(
worker_config=WORKER_CONFIG,
selection_config=SELECTION_CONFIG,
runs_on=[WORKER_LABEL],
)
# Start the LightlyOne Worker and process the job
docker_client = docker.from_env()
volumes = [f"{OUTPUT_DIR.resolve()}:/home/output_dir"]
console_logs = docker_client.containers.run(
IMAGE_NAME,
f"token={LIGHTLY_TOKEN} worker.worker_id={WORKER_ID}",
labels={"lightly_worker_label": WORKER_LABEL},
volumes=volumes,
)
# Download artifacts
run = client.get_compute_worker_run_from_scheduled_run(scheduled_run_id=scheduled_run_id)
client.download_compute_worker_run_artifacts(run=run, output_dir=OUTPUT_DIR / "artifacts")
It is important to set
shutdown_when_job_finished=True
in worker config when scheduling the job! Otherwise, the worker will keep running and listen for a new job.
Updated about 2 months ago