This guide provides best practices for integrating the Lume Python SDK into production environments, focusing on security, reliability, and monitoring.

Security: A Secure-by-Design Architecture

Lume’s architecture is fundamentally designed to minimize access to your sensitive systems. The Sync-Transform-Sync model ensures that the core transformation engine never has direct, standing access to your production data stores.

The Principle of Least Privilege

  • Transactional Access: Lume’s access to your systems is transactional and short-lived. The Connectors only require temporary permissions to perform a specific data synchronization task. They connect, ingest the necessary data, and disconnect.
  • Isolated Transformation: The actual data transformation process runs in a completely isolated Lume environment that has no network path to your infrastructure.
  • Reduced Attack Surface: This model dramatically reduces the attack surface and simplifies security audits. Instead of granting broad permissions to a third-party service, you only need to authorize highly-scoped, temporary data sync operations.

Managing API Keys

Never hardcode your Lume API key in your source code. Use a secure mechanism to manage it.

  • Environment Variables: The SDK automatically detects the LUME_API_KEY environment variable. This is the recommended approach for most environments.

    export LUME_API_KEY="your_api_key_here"
    
  • Secrets Management Systems: For production systems, use a secrets manager like AWS Secrets Manager, Google Secret Manager, or HashiCorp Vault. Your application should fetch the key at runtime.

    import os
    import my_secrets_manager # Your preferred client
    
    # Fetch key from secrets manager and set it as an env var
    api_key = my_secrets_manager.get_secret("LUME_API_KEY")
    os.environ["LUME_API_KEY"] = api_key
    
    import lume
    # The SDK will now use the key
    

Orchestration: Integrating with Airflow

The Lume SDK is a lightweight client, making it perfect for orchestration tools like Airflow. The role of your DAG is to prepare the data batch and trigger the run.

This example shows an Airflow DAG that processes daily sales data.

from __future__ import annotations

import pendulum
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
import boto3 # To upload data
import uuid
import lume
import time
from lume.api.classes.exceptions import RunFailedError, RunTimeoutError # Import specific exceptions

def _prepare_and_trigger_lume_run(**context):
    """
    Prepares a data batch by uploading to S3 and then triggers a Lume run.
    """
    # 1. Prepare the data batch. In a real DAG, this might pull from a database.
    # We create a deterministic source_path to leverage idempotency.
    logical_date = context["data_interval_start"].to_date_string()
    source_bucket = "my-lume-source-data"
    s3_key = f"daily_sales/{logical_date}/sales.csv"
    source_path = f"s3://{source_bucket}/{s3_key}"

    # Here, we would generate and upload the actual sales data file.
    # For this example, we'll assume it's already there.
    print(f"Ensuring data exists at {source_path}")

    # 2. Trigger the Lume run
    run = lume.run(
        flow_version="sales_transformer:v3",
        source_path=source_path
    )
    print(f"Lume run {run.id} started for source_path: {source_path}.")
    
    # Push the run_id to XComs so the monitoring task can use it
    context["ti"].xcom_push(key="lume_run_id", value=run.id)

def _monitor_lume_run(**context):
    """Monitors the Lume run until it completes and checks the final status."""
    run_id = context["ti"].xcom_pull(task_ids="trigger_lume_run", key="lume_run_id")
    if not run_id:
        raise ValueError("Could not find lume_run_id in XComs.")

    run = lume.run_status(run_id) # Get the initial run object

    try:
        # Use the SDK's built-in wait method.
        # This is cleaner than a custom polling loop.
        run.wait(timeout=3600) # Wait for up to 1 hour
    
    except RunTimeoutError:
        # The run took too long. Fail the Airflow task.
        raise RuntimeError(f"Lume run {run.id} timed out after 1 hour.")

    except RunFailedError:
        # The run failed on the Lume platform. Access the error details
        # from the run object, which is updated by wait() even on failure.
        error_details = run.metadata.get("errors", [])
        if error_details:
            error_info = f"Stage: {error_details[0]['stage']}, Error: {error_details[0]['message']}"
        else:
            error_info = "No detailed error message available."
        raise RuntimeError(f"Lume run {run.id} failed. Details: {error_info}")

    print(f"Lume run {run.id} completed successfully with status: {run.status}.")


with DAG(
    dag_id="lume_production_pipeline",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
    doc_md="A production DAG to trigger and monitor Lume data transformations.",
) as dag:
    trigger_task = PythonOperator(
        task_id="trigger_lume_run",
        python_callable=_prepare_and_trigger_lume_run,
    )
    
    monitor_task = PythonOperator(
        task_id="monitor_lume_run",
        python_callable=_monitor_lume_run,
    )

    trigger_task >> monitor_task

Asynchronous Processing with Webhooks

For high-throughput, event-driven workflows, polling for run status is inefficient. Lume supports webhooks to notify your application as soon as a run completes.

You can configure a webhook endpoint for your Flow in the Lume UI. When a run finishes, Lume will send a POST request to your URL with a payload summarizing the outcome.

For a complete, runnable example of a webhook-driven application, see the Webhook-Driven Application Example.

Webhook Payload Schema

The webhook payload contains the essential information about the completed run.

{
  "run_id": "run_01HY...",
  "status": "SUCCEEDED",
  "run_metadata": {
  }
}

You can use the run_id to call lume.run_status() to retrieve the full, detailed metadata object if needed.

Securing Your Webhook Endpoint

To ensure that incoming webhook requests are genuinely from Lume, you must verify their signature.

  1. Secret Key: When you configure a webhook in the Lume UI, a unique secret key is generated. You must store this key securely in your application’s environment.
  2. Signature Header: Every webhook request from Lume includes a X-Lume-Signature-256 header. This signature is an HMAC-SHA256 hash of the request body, created using your secret key.
  3. Verification: In your application, you must compute the same HMAC-SHA256 hash of the received request body using your stored secret key. If your computed signature matches the one in the header, the request is authentic.
# Example using Flask to verify a webhook signature
from flask import Flask, request, abort
import hmac
import hashlib
import os

app = Flask(__name__)

# The secret key you obtained from the Lume UI
LUME_WEBHOOK_SECRET = os.environ.get("LUME_WEBHOOK_SECRET")

@app.route("/lume-webhook", methods=["POST"])
def handle_lume_webhook():
    # 1. Get the signature from the header
    signature_header = request.headers.get("X-Lume-Signature-256")
    if not signature_header:
        abort(400, "Missing signature header")

    # 2. Compute your own signature
    payload = request.get_data()
    computed_signature = hmac.new(
        key=LUME_WEBHOOK_SECRET.encode("utf-8"),
        msg=payload,
        digestmod=hashlib.sha256
    ).hexdigest()

    # 3. Compare the signatures securely
    if not hmac.compare_digest(computed_signature, signature_header):
        abort(403, "Invalid signature")

    # The request is authentic. You can now process the payload.
    event_data = request.get_json()
    print(f"Received valid webhook for Run ID: {event_data['run_id']}")
    
    # Add to a message queue or trigger a downstream process
    # ...

    return {"status": "received"}, 200

Rejecting requests with invalid signatures is critical to protect your system from forged payloads.

Idempotency and Reliability

To prevent processing the same data twice, ensure your source_path is unique and deterministic for each batch of data.

  • For time-based batches, include the timestamp in the source_path. Example: daily_invoices/2024-07-30.
  • For event-driven workflows, use a unique identifier from the triggering event (e.g., a message ID or a transaction ID).

If you attempt to create a run with a source_path that has already been successfully processed for a given Flow Version, the Lume platform will reject the request with an InvalidRequestError, preventing duplicate pipeline executions.

To bypass this for a legitimate reprocessing, use the force_rerun=True parameter in your lume.run() call.

Monitoring and Alerting

Integrate Lume monitoring into your existing observability stack.

  • Check the Final Status: After a run completes, always check the terminal status (SUCCEEDED, PARTIAL_FAILED, FAILED).
  • Alert on Failures: Configure alerts in your orchestration tool (like Airflow’s on_failure_callback) or monitoring system to trigger when a run enters a FAILED or CRASHED state.
  • Log Key Information: In your application logs, always include the run.id, flow_version, and source_path. This makes debugging much easier.
# A simple alerting function
def alert_on_failure(run):
    if run.status in ["FAILED", "CRASHED"]:
        # Access structured error data for more precise alerting
        errors = run.metadata.get("errors", [])
        if errors:
            first_error = errors[0]
            error_summary = f"Failed at stage '{first_error['stage']}' with error '{first_error['error_code']}'"
            error_details = first_error['message']
        else:
            error_summary = f"Lume run {run.id} failed with unpecified error!"
            error_details = f"Status: {run.status}"

        # Use your preferred alerting mechanism (e.g., PagerDuty, Slack)
        send_alert(
            summary=error_summary,
            details=error_details
        )

SDK Initialization

The SDK requires your Lume API key to authenticate. The recommended approach is to set the LUME_API_KEY environment variable.

export LUME_API_KEY="your_api_key_here"

The SDK will automatically detect this key. If you need to manage keys for multiple environments (e.g., development, staging, production), we recommend using a secret management tool like HashiCorp Vault, AWS Secrets Manager, or Doppler, and injecting the appropriate key as an environment variable at runtime.

For cases where environment variables are not feasible, you can initialize the SDK programmatically. See lume.init() for details.