Pipeline Mode Integration Guide
Overview
Pipeline Mode is designed for high-throughput, asynchronous ingestion of ACORD AL3 files. It operates as a continuous background process, monitoring a source (S3 bucket or directory) and producing standardized JSON/Parquet outputs.
Integration Workflow
Configuration Reference
Integra is configured entirely via Environment Variables, adhering to 12-Factor App principles.
| Variable | Required | Default | Description |
|---|---|---|---|
INTEGRA_MODE |
Yes | api |
Set to pipeline to enable high-throughput batch processing. |
INTEGRA_PIPELINE_SOURCE |
Yes | - | Input Location. Supports local paths (/mnt/data/in) or S3 URIs (s3://bucket/prefix). The watcher polls this location for new .al3 files. |
INTEGRA_PIPELINE_DEST |
Yes | - | Output Location. Successfully processed JSON/Parquet files are written here. |
INTEGRA_PIPELINE_ERROR |
No | $DEST/error |
Failure Location. If a file is corrupt or fails parsing, it is moved here to unblock the pipeline. |
INTEGRA_WORKER_POOL_SIZE |
No | 2 * vCPU |
Concurrency Control. Number of parallel files to process. Increase this for I/O bound workloads (S3), decrease for CPU bound. |
INTEGRA_PIPELINE_MAX_FILE_SIZE |
No | 100MB |
Protection. Files larger than this are skipped and logged as error to prevent memory exhaustion (OOM). |
INTEGRA_PIPELINE_ENABLE_VALIDATION_REPORT |
No | false |
Audit. If true, a .report.json file is generated for every processed file (valid or invalid), containing checksums and validation errors. |
INTEGRA_OUTPUT_FORMAT |
No | json |
Format. Comma-separated list of formats: json (Hierarchical), ndjson (Flat), parquet (Columnar), csv. |
INTEGRA_PORT |
No | 8080 |
Observability. Port to expose Prometheus metrics (/metrics). |
Tuning Guide
- Throughput: For maximum throughput on multi-core machines, set
INTEGRA_WORKER_POOL_SIZEto 2x your CPU core count (since AL3 parsing involves I/O operations - reading files, writing outputs). - Latency: If processing small files from S3, a larger pool size (e.g.,
4 * vCPU) helps mask network latency. - Memory: processing a 10MB AL3 file may require ~50MB RAM during parsing. Ensure your container memory limit allows for
POOL_SIZE * 50MBoverhead.
Integration Workflow
File Naming & Concurrency
- Prefix Sharding: For high availability, configure multiple worker containers watching different prefixes (e.g.,
in/shard-1/,in/shard-2/). - Idempotency: Integra processes each file once and then deletes/archives it from the source. Ensure unique filenames to prevent output overwrites if sending to the same destination.
Output Formats
Integra preserves the AL3 hierarchy.
{
"1MHG": { ... },
"2TRG": { ... },
...
}
Audit Report Schema (.report.json)
Every processed file generates a report containing data lineage information. Integrators must check this file to confirm validation status.
{
"source_file": "/data/in/policy123.al3",
"timestamp": "2024-01-31T12:00:00Z",
"status": "success",
"duration_ms": 150,
"size_bytes": 10240,
"sha256": "a1b2c3d4...",
"validation": {
"valid": true,
"errors": []
}
}
Monitoring
Integra exposes Prometheus metrics at the configured PORT (default 8080).
Key Metrics
integra_pipeline_files_processed_total: Counter (labels:status).status="success": Successfully converted.status="parse_failed": Corrupt AL3 structure.status="read_failed": IO error.
integra_pipeline_active_workers: Gauge of currently busy workers.integra_pipeline_processing_duration_seconds: Histogram of latency.
Alerts
- High Error Rate: Alert if
files_processed_total{status!="success"}increases. - Stalled Pipeline: Alert if
active_workers > 0butfiles_processed_totaldoesn't increase for N minutes.