Metaflow
Metaflow is a tool originally developed by Netflix. It allows data scientists to scale their pipelines from a simple script on their local machine to a production ready end-to-end workflow. The compute can be handled by AWS Batch, AWS Step Functions, or a custom kubernetes cluster.
This guide demonstrates how the LightlyOne Worker can be used with Metaflow on AWS Batch.
Prerequisites
In order to use the LightlyOne Worker in any automated workflow, the following prerequisites are necessary:
- You need to have the LightlyOne Worker installed.
- You need to have the LightlyOne Worker registered with a label of your choice.
Wrap the LightlyOne Worker
Metaflow expects the LightlyOne Worker Docker image to have no explicit entrypoint. Since the LightlyOne Worker comes with an entrypoint, you'll have to wrap the image in your own and host it on your AWS repository. You can use the following Dockerfile to get the desired image:
FROM lightly/worker:latest
# Install additional dependencies here if required
# Remove the entrypoint from the LightlyOne Worker
ENTRYPOINT []
Next, build the Docker image, tag it, and push it to your AWS repository. Don't forget the replace {YOUR_IDENTIFIER}
in the commands below. It is the identifier of your Amazon Elastic Container Registry
docker build --file Dockerfile.metaflow --tag my-lightly/worker ./
docker tag my-lightly/worker:latest {YOUR_IDENTIFIER}.amazonaws.com/my-lightly/worker:latest
docker push {YOUR_IDENTIFIER}.amazonaws.com/my-lightly/worker:latest
Automation with Metaflow
For this guide you need to have Metaflow deployed to AWS or kubernetes. If you haven't done that already, follow the instructions here. Additionally, you need to install the Metaflow Python client:
pip install metaflow
Now, you can use the following flow to run the LightlyOne Worker directly on AWS Batch. Don't forget to change the settings according to your setup!
The flow will execute the following steps:
- Create a Dataset
- Configure a Datasource
- Schedule a Run
- Start the LightlyOne Worker and process the scheduled run
You execute the flow locally with python lightly_worker_flow.py run
.
Example for S3
Note that the code below shows how to automate the LightlyOne Worker if your data is stored on S3. Of course, you can also use other cloud storage providers. Simply replace the lines configuring the datasource with your preferred setup.
import os
import time
from pathlib import Path
from omegaconf import OmegaConf
from multiprocessing import Process
from datetime import datetime
from lightly.cli.config import get_config
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
# Metaflow
from metaflow import FlowSpec, step, batch, environment
# Settings
LIGHTLY_TOKEN = "YOUR_LIGHTLY_TOKEN"
IMAGE_NAME: str = "my-lightly/worker:latest"
DATASET_NAME: str = f"My_Dataset_{now}"
# LightlyOne Worker config
SELECTION_CONFIG = {
"proportion_samples": 0.5,
"strategies": [
{"input": {"type": "EMBEDDINGS"}, "strategy": {"type": "DIVERSITY"}}
],
}
WORKER_CONFIG = {}
# S3
S3_REGION = "YOUR_S3_REGION"
S3_ACCESS_KEY_ID = "YOUR_S3_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY = "YOUR_S3_SECRET_ACCESS_KEY"
S3_BUCKET_PATH = "YOUR_S3_BUCKET_PATH"
# Worker
WORKER_ID = "YOUR_WORKER_ID"
WORKER_LABEL = "YOUR_WORKER_LABEL"
# Constants (do not change)
HOME_DIR = Path("/home/lightly_worker") # prior to worker version 2.11 this must be "/home/boris"
HYDRA_DIR = Path(".hydra")
METAFLOW_DIR = Path(".metaflow")
RESOURCE_PATH = Path("onprem-docker/lightly_worker/src/lightly_worker/resources")
DOCKER_CONFIG = Path("docker/docker.yaml")
class LightlyWorkerFlow(FlowSpec):
"""Flow which processes videos in an S3 bucket using the LightlyOne Worker. """
@batch(
image=IMAGE_NAME,
cpu=8,
gpu=1,
memory=120000,
)
@environment(
vars={
"LIGHTLY_TOKEN": LIGHTLY_TOKEN,
}
)
@step
def start(self):
"""Schedules a job and processes it with the LightlyOne Worker. """
# Set up the environment
os.chdir(HOME_DIR)
HYDRA_DIR.mkdir(exist_ok=True)
METAFLOW_DIR.mkdir(exist_ok=True)
try:
# Wrap this in a try-catch to make the linter happy
from lightly_worker import main
from lightly.api import ApiWorkflowClient
from lightly.openapi_generated.swagger_client import DatasetType
from lightly.openapi_generated.swagger_client import DatasourcePurpose
except ImportError:
pass
# Create a dataset
client = ApiWorkflowClient()
client.create_dataset(DATASET_NAME, DatasetType.VIDEOS)
self.dataset_id = client.dataset_id
# Configure s3
client.set_s3_config(
resource_path=S3_BUCKET_PATH,
region=S3_REGION,
access_key=S3_ACCESS_KEY_ID,
secret_access_key=S3_SECRET_ACCESS_KEY,
purpose=DatasourcePurpose.INPUT,
)
client.set_s3_config(
resource_path=S3_BUCKET_PATH,
region=S3_REGION,
access_key=S3_ACCESS_KEY_ID,
secret_access_key=S3_SECRET_ACCESS_KEY,
purpose=DatasourcePurpose.LIGHTLY,
)
# Schedule a run
self.scheduled_run_id = client.schedule_compute_worker_run(
worker_config=WORKER_CONFIG,
selection_config=SELECTION_CONFIG,
runs_on=[WORKER_LABEL]
)
# Run the LightlyOne Worker
cfg = OmegaConf.load(RESOURCE_PATH / DOCKER_CONFIG)
cfg.lightly = get_config.get_lightly_config()
cfg.worker.worker_id = WORKER_ID
process = Process(target=main.main, args=(cfg,))
process.start()
# Monitor the scheduled run
last_run_info = None
while True:
run_info = client.get_compute_worker_run_info(
scheduled_run_id=self.scheduled_run_id)
if run_info.in_end_state():
assert run_info.ended_successfully(), "Run did not end successfully"
break
if run_info != last_run_info:
no_update_count = 0
else:
no_update_count += 1
if no_update_count >= 10:
raise RuntimeError(
f"Test timout: no run_info update for at least 5 minutes\n"
f"last_run_info: {str(last_run_info)}, run_info: {str(run_info)}"
)
last_run_info = run_info
time.sleep(30)
# Finish up
process.terminate()
self.next(self.end)
@step
def end(self):
"""You can access selected images using the Lightly dataset_id. """
# Continue working with the selected images using the dataset_id
print(f"My Lightly dataset_id is {self.dataset_id}")
if __name__ == "__main__":
LightlyWorkerFlow()
Updated 2 months ago