Automation with Python
You can automate your Lightly Worker workflows directly from Python. This allows you to flawlessly integrate the Lightly Worker in your machine learning pipeline. Lightly also integrates with pipeline tools such as, for example, Metaflow.
Prerequisites
In order to use the Lightly Worker in any automated workflow, the following prerequisites are necessary:
- You need to have the Lightly Worker installed.
- You need to have the Lightly Worker registered with a label of your choice.
Automation with Python
The Lightly Worker is shipped as a Docker image. Therefore, if you want to run the Lightly Worker from Python, you need to install the Docker Python API first:
pip install docker
Now, you can use the following script run the Lightly Worker directly from Python. Don't forget to change the settings according to your setup!
The script will execute the following steps:
- Create a Dataset
- Configure a Datasource
- Schedule a Run
- Start the Lightly Worker and process the scheduled run
- Download Artifacts
Example for S3
Note that the code below shows how to automate the Lightly Worker if your data is stored on S3. Of course, you can also use other cloud storage providers. Simply replace the lines configuring the datasource with your preferred setup.
import time
import contextlib
from pathlib import Path
from collections.abc import Iterator
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
import docker
from docker.models.containers import Container
from lightly.api import ApiWorkflowClient
from lightly.openapi_generated.swagger_client import DatasetType
from lightly.openapi_generated.swagger_client import DatasourcePurpose
# Settings
LIGHTLY_TOKEN = "YOUR_LIGHTLY_TOKEN"
IMAGE_NAME: str = "lightly/worker:latest"
DATASET_NAME: str = f"My_Dataset_{now}"
# Lightly Worker config
SELECTION_CONFIG = {
"proportion_samples": 0.5,
"strategies": [
{"input": {"type": "EMBEDDINGS"}, "strategy": {"type": "DIVERSITY"}}
],
}
WORKER_CONFIG = {}
# S3
S3_REGION = "YOUR_S3_REGION"
S3_ACCESS_KEY_ID = "YOUR_S3_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY = "YOUR_S3_SECRET_ACCESS_KEY"
S3_BUCKET_PATH = "YOUR_S3_BUCKET_PATH"
# Worker
WORKER_ID = "YOUR_WORKER_ID"
WORKER_LABEL = "YOUR_WORKER_LABEL"
# Outputs
OUTPUT_DIR = Path()
@contextlib.contextmanager
def start_worker() -> Iterator[Container]:
"""Context manager starting a new container.
The container is killed once the context is exited.
"""
docker_client = docker.from_env()
volumes = [f"{OUTPUT_DIR.resolve()}:/home/output_dir"]
container = docker_client.containers.run(
IMAGE_NAME,
f"token={LIGHTLY_TOKEN} worker.worker_id={WORKER_ID}",
detach=True,
labels={"lightly_worker_label": WORKER_LABEL},
volumes=volumes,
)
try:
yield container
finally:
try:
container.kill()
except docker.errors.APIError:
# if a container was killed from outside, we don't care
pass
if __name__ == "__main__":
# Create a dataset
client = ApiWorkflowClient(token=LIGHTLY_TOKEN)
client.create_dataset(DATASET_NAME, DatasetType.VIDEOS)
# Configure s3
client.set_s3_config(
resource_path=S3_BUCKET_PATH,
region=S3_REGION,
access_key=S3_ACCESS_KEY_ID,
secret_access_key=S3_SECRET_ACCESS_KEY,
purpose=DatasourcePurpose.INPUT,
)
client.set_s3_config(
resource_path=S3_BUCKET_PATH,
region=S3_REGION,
access_key=S3_ACCESS_KEY_ID,
secret_access_key=S3_SECRET_ACCESS_KEY,
purpose=DatasourcePurpose.LIGHTLY,
)
# Schedule a run
scheduled_run_id = client.schedule_compute_worker_run(
worker_config=WORKER_CONFIG,
selection_config=SELECTION_CONFIG,
runs_on=[WORKER_LABEL],
)
# Start the Lightly Worker and process the job
with start_worker():
last_run_info = None
no_update_count = 0
while True:
run_info = client.get_compute_worker_run_info(scheduled_run_id=scheduled_run_id)
if run_info.in_end_state():
assert run_info.ended_successfully(), "Run did not end successfully"
break
if run_info != last_run_info:
no_update_count = 0
else:
no_update_count += 1
if no_update_count >= 10:
raise RuntimeError(
f"Test timout: no run_info update for at least 5 minutes\n"
f"last_run_info: {str(last_run_info)}, run_info: {str(run_info)}"
)
last_run_info = run_info
time.sleep(30)
# Download artifacts
run = client.get_compute_worker_run_from_scheduled_run(scheduled_run_id=scheduled_run_id)
client.download_compute_worker_run_artifacts(run=run, output_dir=OUTPUT_DIR / "artifacts")
Updated 10 months ago