Data Lake Integration Guide
Learn how to integrate Integra with your data lake (AWS S3, Google Cloud Storage, Azure Blob) for scalable ACORD AL3 processing.
Architecture Overview
Integra acts as a transformation layer between your raw AL3 files and your data analytics platform.
graph LR
A[Raw AL3 Files] -->|Ingest| B[Integra]
B -->|Convert| C[NDJSON/Parquet]
C -->|Store| D[Data Lake (S3)]
D -->|Query| E[Athena/Snowflake]
Recommended Workflow
1. File Ingestion
Upload raw AL3 files to an "Incoming" bucket or folder.
2. Processing (Lambda/Batch)
Trigger Integra to process files. For high volume, use the /v1/parse endpoint with NDJSON output.
curl -X POST \
--data-binary "@policy.al3" \
-H "Accept: application/x-ndjson" \
"http://integra:8080/v1/parse" > output.ndjson
3. Storage Strategy
Store processed files in your data lake using a hive-partitioned structure for performance.
Structure:
s3://my-datalake/al3-processed/year=2025/month=01/day=17/policy_type=commercial/
4. Schema Catalog (AWS Glue)
Use simpler schemas for analytics. Since AL3 is hierarchical, we recommend flattening key fields or using databases with native JSON support (Snowflake, BigQuery).
AWS Integration Example
S3 Event Trigger (Lambda)
import boto3
import requests
import os
def lambda_handler(event, context):
s3 = boto3.client('s3')
integra_url = os.environ.get('INTEGRA_URL', 'http://integra-lb:8080')
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# 1. Get AL3
obj = s3.get_object(Bucket=bucket, Key=key)
al3_data = obj['Body'].read()
# 2. Call Integra
try:
resp = requests.post(
f'{integra_url}/v1/parse',
data=al3_data,
headers={'Accept': 'application/x-ndjson'},
timeout=30
)
resp.raise_for_status()
# 3. Save NDJSON to Processed Bucket
new_key = key.replace('.al3', '.ndjson').replace('incoming/', 'processed/')
s3.put_object(
Bucket='my-processed-bucket',
Key=new_key,
Body=resp.content,
ContentType='application/x-ndjson'
)
print(f"Processed {key} -> {new_key}")
except Exception as e:
print(f"Error processing {key}: {e}")
# Move to error bucket...
Snowflake Integration
Snowflake can query NDJSON data directly from an external stage (S3).
-- Create File Format
CREATE OR REPLACE FILE FORMAT al3_ndjson_format
TYPE = 'JSON';
-- Copy into Table
COPY INTO policies_table
FROM @my_s3_stage
FILE_FORMAT = (FORMAT_NAME = al3_ndjson_format)
PATTERN='.*.ndjson';
-- Query JSON Data
SELECT
v:code::string as group_code,
v:dataElements."Premium Amount"::float as premium
FROM policies_table;
Best Practices
- Use NDJSON: It is split-table and ideal for distributed processing engines (Spark, Presto).
- Compression: Gzip your NDJSON files before uploading to S3 (
output.ndjson.gz) to save storage and transfer costs. - Error Handling: Capture 4xx/5xx responses from Integra and move failed AL3 files to a "Dead Letter" bucket for manual inspection.