Skip to content

Data Lake Integration Guide

Learn how to integrate Integra with your data lake (AWS S3, Google Cloud Storage, Azure Blob) for scalable ACORD AL3 processing.


Architecture Overview

Integra acts as a transformation layer between your raw AL3 files and your data analytics platform.

graph LR
    A[Raw AL3 Files] -->|Ingest| B[Integra]
    B -->|Convert| C[NDJSON/Parquet]
    C -->|Store| D[Data Lake (S3)]
    D -->|Query| E[Athena/Snowflake]

1. File Ingestion

Upload raw AL3 files to an "Incoming" bucket or folder.

2. Processing (Lambda/Batch)

Trigger Integra to process files. For high volume, use the /v1/parse endpoint with NDJSON output.

curl -X POST \
  --data-binary "@policy.al3" \
  -H "Accept: application/x-ndjson" \
  "http://integra:8080/v1/parse" > output.ndjson

3. Storage Strategy

Store processed files in your data lake using a hive-partitioned structure for performance.

Structure: s3://my-datalake/al3-processed/year=2025/month=01/day=17/policy_type=commercial/

4. Schema Catalog (AWS Glue)

Use simpler schemas for analytics. Since AL3 is hierarchical, we recommend flattening key fields or using databases with native JSON support (Snowflake, BigQuery).


AWS Integration Example

S3 Event Trigger (Lambda)

import boto3
import requests
import os

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    integra_url = os.environ.get('INTEGRA_URL', 'http://integra-lb:8080')

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # 1. Get AL3
        obj = s3.get_object(Bucket=bucket, Key=key)
        al3_data = obj['Body'].read()

        # 2. Call Integra
        try:
            resp = requests.post(
                f'{integra_url}/v1/parse',
                data=al3_data,
                headers={'Accept': 'application/x-ndjson'},
                timeout=30
            )
            resp.raise_for_status()

            # 3. Save NDJSON to Processed Bucket
            new_key = key.replace('.al3', '.ndjson').replace('incoming/', 'processed/')
            s3.put_object(
                Bucket='my-processed-bucket',
                Key=new_key,
                Body=resp.content,
                ContentType='application/x-ndjson'
            )
            print(f"Processed {key} -> {new_key}")

        except Exception as e:
            print(f"Error processing {key}: {e}")
            # Move to error bucket...

Snowflake Integration

Snowflake can query NDJSON data directly from an external stage (S3).

-- Create File Format
CREATE OR REPLACE FILE FORMAT al3_ndjson_format
  TYPE = 'JSON';

-- Copy into Table
COPY INTO policies_table
  FROM @my_s3_stage
  FILE_FORMAT = (FORMAT_NAME = al3_ndjson_format)
  PATTERN='.*.ndjson';

-- Query JSON Data
SELECT 
  v:code::string as group_code,
  v:dataElements."Premium Amount"::float as premium
FROM policies_table;

Best Practices

  • Use NDJSON: It is split-table and ideal for distributed processing engines (Spark, Presto).
  • Compression: Gzip your NDJSON files before uploading to S3 (output.ndjson.gz) to save storage and transfer costs.
  • Error Handling: Capture 4xx/5xx responses from Integra and move failed AL3 files to a "Dead Letter" bucket for manual inspection.