Skip to content

Pipeline Mode Integration Guide

Overview

Pipeline Mode is designed for high-throughput, asynchronous ingestion of ACORD AL3 files. It operates as a continuous background process, monitoring a source (S3 bucket or directory) and producing standardized JSON/Parquet outputs.

Integration Workflow

Configuration Reference

Integra is configured entirely via Environment Variables, adhering to 12-Factor App principles.

Variable Required Default Description
INTEGRA_MODE Yes api Set to pipeline to enable high-throughput batch processing.
INTEGRA_PIPELINE_SOURCE Yes - Input Location. Supports local paths (/mnt/data/in) or S3 URIs (s3://bucket/prefix). The watcher polls this location for new .al3 files.
INTEGRA_PIPELINE_DEST Yes - Output Location. Successfully processed JSON/Parquet files are written here.
INTEGRA_PIPELINE_ERROR No $DEST/error Failure Location. If a file is corrupt or fails parsing, it is moved here to unblock the pipeline.
INTEGRA_WORKER_POOL_SIZE No 2 * vCPU Concurrency Control. Number of parallel files to process. Increase this for I/O bound workloads (S3), decrease for CPU bound.
INTEGRA_PIPELINE_MAX_FILE_SIZE No 100MB Protection. Files larger than this are skipped and logged as error to prevent memory exhaustion (OOM).
INTEGRA_PIPELINE_ENABLE_VALIDATION_REPORT No false Audit. If true, a .report.json file is generated for every processed file (valid or invalid), containing checksums and validation errors.
INTEGRA_OUTPUT_FORMAT No json Format. Comma-separated list of formats: json (Hierarchical), ndjson (Flat), parquet (Columnar), csv.
INTEGRA_PORT No 8080 Observability. Port to expose Prometheus metrics (/metrics).

Tuning Guide

  • Throughput: For maximum throughput on multi-core machines, set INTEGRA_WORKER_POOL_SIZE to 2x your CPU core count (since AL3 parsing involves I/O operations - reading files, writing outputs).
  • Latency: If processing small files from S3, a larger pool size (e.g., 4 * vCPU) helps mask network latency.
  • Memory: processing a 10MB AL3 file may require ~50MB RAM during parsing. Ensure your container memory limit allows for POOL_SIZE * 50MB overhead.

Integration Workflow

File Naming & Concurrency

  • Prefix Sharding: For high availability, configure multiple worker containers watching different prefixes (e.g., in/shard-1/, in/shard-2/).
  • Idempotency: Integra processes each file once and then deletes/archives it from the source. Ensure unique filenames to prevent output overwrites if sending to the same destination.

Output Formats

Integra preserves the AL3 hierarchy.

{
  "1MHG": { ... },
  "2TRG": { ... },
  ...
}

Audit Report Schema (.report.json)

Every processed file generates a report containing data lineage information. Integrators must check this file to confirm validation status.

{
  "source_file": "/data/in/policy123.al3",
  "timestamp": "2024-01-31T12:00:00Z",
  "status": "success",
  "duration_ms": 150,
  "size_bytes": 10240,
  "sha256": "a1b2c3d4...",
  "validation": {
    "valid": true,
    "errors": []
  }
}

Monitoring

Integra exposes Prometheus metrics at the configured PORT (default 8080).

Key Metrics

  • integra_pipeline_files_processed_total: Counter (labels: status).
    • status="success": Successfully converted.
    • status="parse_failed": Corrupt AL3 structure.
    • status="read_failed": IO error.
  • integra_pipeline_active_workers: Gauge of currently busy workers.
  • integra_pipeline_processing_duration_seconds: Histogram of latency.

Alerts

  • High Error Rate: Alert if files_processed_total{status!="success"} increases.
  • Stalled Pipeline: Alert if active_workers > 0 but files_processed_total doesn't increase for N minutes.