> ## Documentation Index
> Fetch the complete documentation index at: https://docs.lume.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Database Transformation

> An example of processing data from a relational database like PostgreSQL or Snowflake.

This example demonstrates how to trigger a run that processes data already loaded into a relational database like PostgreSQL or Snowflake.

**Scenario**: An external process loads invoice data into a `raw_invoices` table in your PostgreSQL database. All records for a given batch are tagged with the same `batch_id`.

The Flow Version `invoice_processor:v1` is configured in the Lume UI with:

* **Source Connector**: Your PostgreSQL database.
* **Source Tables**: `raw_invoices`.
* **Filtering Logic**: A `WHERE` clause like `batch_id = '{source_path}'`.
* **Target Connector**: Your Snowflake data warehouse.
* **Target Table**: `clean_invoices`.

```python theme={null}
import lume
import uuid
import psycopg2 # Example using the PostgreSQL driver

# --- 1. Prepare and Insert the Batch Data ---
# In a real workflow, this data would come from an upstream source.
# The `db_batch_id` is just a string that uniquely identifies the records for this run.
db_batch_id = str(uuid.uuid4())
new_invoices = [
    (db_batch_id, "inv_123", "cust_abc", 100.50),
    (db_batch_id, "inv_124", "cust_xyz", 75.00)
]

# Connect to your database and insert the new records.
# This part of the code runs on your infrastructure.
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=yourhost")
with conn.cursor() as cur:
    for record in new_invoices:
        cur.execute(
            "INSERT INTO raw_invoices (batch_id, invoice_id, customer_id, amount) VALUES (%s, %s, %s, %s)",
            record
        )
conn.commit()
print(f"Inserted {len(new_invoices)} records with batch_id: {db_batch_id}")


# --- 2. Trigger the Run ---
# Lume uses the pre-configured Connector and injects the `source_path`
# into the filtering logic defined in the Flow Version.
run = lume.run(
    flow_version="invoice_processor:v1",
    source_path=db_batch_id
)
print(f"Started run {run.id} for database batch_id: {db_batch_id}")

# --- 3. Monitor and Finalize ---
# `run.wait()` blocks execution until the pipeline is complete.
print("Waiting for run to complete...")
run.wait()

print(f"Run {run.id} finished with status: {run.status}")

if run.status == 'SUCCEEDED':
    metrics = run.metadata.get('metrics', {})
    results = run.metadata.get('results', {})
    print(f"  - Input Rows: {results.get('input_rows')}")
    print(f"  - Mapped Rows: {results.get('mapped_rows')}")
    print(f"  - Duration: {metrics.get('duration_seconds', 0):.2f}s")
    # The transformed data is now available in the `clean_invoices` table in Snowflake.
else:
    error_info = run.metadata.get('error', {})
    print(f"  - Error Code: {error_info.get('code')}")
    print(f"  - Error Message: {error_info.get('message')}")
```

#### Sample Input

(Records in the PostgreSQL `raw_invoices` table where `batch_id` matches the `source_path`)

| batch\_id | invoice\_id | customer\_id | amount |
| --------- | ----------- | ------------ | ------ |
| ...       | inv\_123    | cust\_abc    | 100.50 |
| ...       | inv\_124    | cust\_xyz    | 75.00  |

#### Sample Output

(New records created in the Snowflake `clean_invoices` table by the Lume pipeline)

| invoiceId | customerId | amountCents | processedAt          |
| --------- | ---------- | ----------- | -------------------- |
| INV-123   | CUST-ABC   | 10050       | 2024-07-30T10:00:00Z |
| INV-124   | CUST-XYZ   | 7500        | 2024-07-30T10:00:00Z |

*Notice how the Flow Version transformed the data: it standardized IDs, converted the amount to cents, and added a processing timestamp.*

<Tip>
  For production workflows, consider using [Webhooks](/pages/libraries/python-ent/production#asynchronous-processing-with-webhooks) instead of `run.wait()` for more efficient, event-driven monitoring.
</Tip>

## Final Result
