Skip to content

Batch ETL Integration Guide

Learn how to process large volumes of AL3 files using Integra in batch mode.

Use Case

You have thousands of historical AL3 files (e.g., daily dumps from an agency management system) that need to be converted to a modern format for warehousing.


Strategy 1: Sequential Scripting (Simple)

For moderate volumes (< 10GB), a simple shell script or Python script running locally or on a single VM is sufficient. Integra processes typical AL3 files in milliseconds.

#!/bin/bash
# Bulk process all .AL3 files in a directory

INPUT_DIR="./raw_al3"
OUTPUT_DIR="./processed_json"
INTEGRA_URL="http://localhost:8080/v1/parse"

mkdir -p "$OUTPUT_DIR"

for file in "$INPUT_DIR"/*.AL3; do
    filename=$(basename "$file")
    echo "Processing $filename..."

    curl -s -X POST \
      --data-binary "@$file" \
      "$INTEGRA_URL" > "$OUTPUT_DIR/${filename}.json"

    # Check parse success by inspecting file or exit code
    if [ $? -eq 0 ]; then
        echo "✓ Success"
    else
        echo "✗ Failed"
    fi
done

Strategy 2: Parallel Processing (High Volume)

For high volumes, leverage parallel workers. Integra handles concurrent requests efficiently.

Using GNU Parallel

find ./raw_al3 -name "*.AL3" | parallel -j 8 \
  "curl -s --data-binary @{} http://localhost:8080/v1/parse > ./processed/{/.}.json"

-j 8: Runs 8 concurrent uploads.

Using Python Multiprocessing

from multiprocessing import Pool
import requests
import os

files = [f for f in os.listdir('./raw') if f.endswith('.AL3')]

def process_file(filename):
    with open(f"./raw/{filename}", 'rb') as f:
        resp = requests.post('http://localhost:8080/v1/parse', data=f.read())
        if resp.ok:
            with open(f"./processed/{filename}.json", 'w') as out:
                out.write(resp.text)
            return True
    return False

if __name__ == '__main__':
    with Pool(processes=16) as pool:
        results = pool.map(process_file, files)
        print(f"Processed {sum(results)} files.")

Strategy 3: Pipeline Orchestration (Airflow/Prefect)

Integrate Integra as a task in your ETL DAG.

Airflow Example: 1. Sensor: Watch S3 bucket for new zip archives. 2. Unzip: Extract AL3 files to a temporary location. 3. Map Task: Send each AL3 file to Integra (HTTP Operator). 4. Load: Upload resulting JSONs to Snowflake/Redshift.


Recommendations

  • Resource Sizing: Integra is CPU-bound. If running locally, allocate 1 CPU core per concurrent worker.
  • Memory: Memory usage is low per request, but JSON parsing response can be large. Use NDJSON output (Accept: application/x-ndjson) to minimize client-side memory overhead during ETL.
  • Aggregation: If loading into a Data Warehouse, consider concatenating multiple small JSON outputs into a single large NDJSON/JSONL file before COPY command for better DB import performance.