Data Lake Integration Guide¶
Learn how to integrate IOSetu with your data lake (AWS S3, Google Cloud Storage, Azure Blob) for scalable ACORD AL3 processing.
Architecture Overview¶
IOSetu acts as a transformation layer between your raw AL3 files and your data analytics platform.
graph LR
A[Raw AL3 Files] -->|Ingest| B[IOSetu]
B -->|Convert| C[NDJSON/Parquet]
C -->|Store| D[Data Lake (S3)]
D -->|Query| E[Athena/Snowflake]
Recommended Workflow¶
1. File Ingestion¶
Upload raw AL3 files to an "Incoming" bucket or folder.
2. Processing (Lambda/Batch)¶
Trigger IOSetu to process files. For high volume, use the /v1/parse endpoint with NDJSON output.
curl -X POST \
--data-binary "@policy.al3" \
-H "Accept: application/x-ndjson" \
"http://iosetu:8080/v1/parse" > output.ndjson
3. Storage Strategy¶
Store processed files in your data lake using a hive-partitioned structure for performance.
Structure:
s3://my-datalake/al3-processed/year=2025/month=01/day=17/policy_type=commercial/
4. Schema Catalog (AWS Glue)¶
Use simpler schemas for analytics. Since AL3 is hierarchical, we recommend flattening key fields or using databases with native JSON support (Snowflake, BigQuery).
AWS Integration Example¶
S3 Event Trigger (Lambda)¶
import boto3
import requests
import os
def lambda_handler(event, context):
s3 = boto3.client('s3')
iosetu_url = os.environ.get('IOSETU_URL', 'http://iosetu-lb:8080')
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# 1. Get AL3
obj = s3.get_object(Bucket=bucket, Key=key)
al3_data = obj['Body'].read()
# 2. Call IOSetu
try:
resp = requests.post(
f'{iosetu_url}/v1/parse',
data=al3_data,
headers={'Accept': 'application/x-ndjson'},
timeout=30
)
resp.raise_for_status()
# 3. Save NDJSON to Processed Bucket
new_key = key.replace('.al3', '.ndjson').replace('incoming/', 'processed/')
s3.put_object(
Bucket='my-processed-bucket',
Key=new_key,
Body=resp.content,
ContentType='application/x-ndjson'
)
print(f"Processed {key} -> {new_key}")
except Exception as e:
print(f"Error processing {key}: {e}")
# Move to error bucket...
Snowflake Integration¶
Snowflake can query NDJSON data directly from an external stage (S3).
-- Create File Format
CREATE OR REPLACE FILE FORMAT al3_ndjson_format
TYPE = 'JSON';
-- Copy into Table
COPY INTO policies_table
FROM @my_s3_stage
FILE_FORMAT = (FORMAT_NAME = al3_ndjson_format)
PATTERN='.*.ndjson';
-- Query JSON Data
SELECT
v:code::string as group_code,
v:dataElements."Premium Amount"::float as premium
FROM policies_table;
Best Practices¶
- Use NDJSON: It is split-table and ideal for distributed processing engines (Spark, Presto).
- Compression: Gzip your NDJSON files before uploading to S3 (
output.ndjson.gz) to save storage and transfer costs. - Error Handling: Capture 4xx/5xx responses from IOSetu and move failed AL3 files to a "Dead Letter" bucket for manual inspection.