This guide covers advanced patterns and techniques for users building sophisticated, high-reliability data pipelines with the Lume Python SDK.

Controlling Run Behavior at Runtime

While most configuration lives in the Flow Version, you can override certain settings at runtime for specific, one-off tasks.

Forcing a Rerun (Idempotency Bypass)

By default, Lume prevents re-processing the same source_path for a given Flow Version to ensure idempotency. For disaster recovery or reprocessing corrected data, you can bypass this check. See the Idempotency and Reliability section for more details on how Lume prevents duplicate runs.

# This will force Lume to re-process the data at this source_path,
# even if a successful run already exists for it.
run = lume.run(
    flow_version="customer_cleaner:v1",
    source_path="failed_batch_abc_123",
    force_rerun=True 
)

Use with caution: Forcing a rerun can lead to duplicate data in your target system if not managed carefully.

Advanced Run Monitoring

For production systems, a simple time.sleep() loop is not ideal. A better approach is to use an exponential backoff strategy for polling to reduce network load and handle transient API issues gracefully.

import time

def wait_for_run_completion(run_id, max_attempts=10, initial_delay=5):
    """
    Waits for a Lume run to complete using exponential backoff.
    """
    delay = initial_delay
    for attempt in range(max_attempts):
        run = lume.run_status(run_id)
        
        if run.status in ["SUCCEEDED", "PARTIAL_FAILED", "FAILED", "CRASHED"]:
            print(f"Run {run.id} finished with status: {run.status}")
            return run
        
        print(f"Attempt {attempt + 1}: Status is {run.status}. Retrying in {delay}s...")
        time.sleep(delay)
        delay = min(delay * 2, 60) # Double the delay, capped at 60 seconds

    raise TimeoutError(f"Run {run.id} did not complete after {max_attempts} attempts.")

# Usage:
initial_run = lume.run(...)
try:
    final_run = wait_for_run_completion(initial_run.id)
    # Now you can inspect the final metadata
    print(final_run.metadata)
except TimeoutError as e:
    print(e)

Understanding Run Metadata

The run.metadata attribute contains a rich, structured object detailing the outcome of a completed run. It is essential for building robust monitoring, alerting, and automated downstream workflows.

For the complete, detailed schema of this object, see the LumeRun Class Reference.

Handling Partial Failures

A PARTIAL_FAILED status is not necessarily an error condition. It means some data was processed successfully, while some was rejected. This is a common outcome in production pipelines dealing with messy real-world data.

Your workflow should be designed to handle this state gracefully.

run = wait_for_run_completion("run_01HY...")

if run.status == "SUCCEEDED":
    # All data processed, trigger downstream processes on the mapped output.
    trigger_reporting_job(run.metadata["target_locations"]["mapped"])

elif run.status == "PARTIAL_FAILED":
    # Some data is ready. You might process the good data...
    trigger_reporting_job(run.metadata["target_locations"]["mapped"])
    
    # ...and send the rejected data to a separate workflow for manual review.
    notify_data_stewards(
        run_id=run.id,
        rejects_location=run.metadata["target_locations"]["rejects"]
    )

elif run.status in ["FAILED", "CRASHED"]:
    # No data was processed. Alert the on-call engineer.
    alert_on_call(
        run_id=run.id,
        error_message=run.metadata.get("error_message")
    )

This pattern ensures that you maximize the value of successfully processed data while isolating problematic records for further investigation.