This system implements real-time Acute Kidney Injury (AKI) detection by processing HL7 messages from hospital systems. It monitors patient creatinine levels and triggers alerts when potential AKI cases are detected.
- F3 Score: 0.9843
- Average Latency: 0.026 seconds
- System Architecture
- Prerequisites
- Installation & Setup
- Testing
- Monitoring & Metrics
- Key Implementation Details
- Safety Features
- Project Structure
- Hospital systems send HL7 messages (ADT and ORU) via MLLP
- Messages are parsed and validated for message structure (does it have all the required fields) and data integrity (do the required fields contain sensible data in typical formats)
- Data routing based on message type:
- ADT^A01 (Admission):
- The patient is added to the Redis cache (currently admitted patients database)
- The patient's historical test data is loaded from the SQLite database into Redis (if they were admitted before and have previous test results prior to the current admission)
- Any unmatched test results that arrived before the patient's admission are retrieved from the dedicated SQLite database, processed, and moved to Redis
- ADT^A03 (Discharge):
- The patient's data is moved from Redis to the SQLite historical database
- The patient is removed from the Redis cache
- ORU^R01 (Test Result):
- If the patient is already admitted (in the Redis cache): The test is added to the Redis database and triggers ML inference (checks if the patient is at risk of AKI)
- If the patient is not admitted: The test is stored in the unmatched LIMS (unmatched messages) database for later processing, waiting for the patient to be admitted
- ADT^A01 (Admission):
- ML inference runs when new creatinine results are added for admitted patients
- Pager alerts triggered for positive predictions
The system employs three distinct databases for different purposes:
- In-memory cache with persistence (AOF - Append Only File)
- Allows fast lookups for real-time ML inference
- Stores patient demographics (sex and date of birth) and all of the patient's test results (even historical ones prior to the patient's current admission)
- Data persists across container restarts via
/state/redisdata(in the Docker/Kubernetes setup. It may have a different location if run locally using the simulator)
- Persistent storage of all patient test history
- Source of historical test data loaded to Redis on patient admission (if any)
- Destination for the patient's test results on discharge
- Allows long-term data retention
- Buffer for test results arriving before the patient's admission
- Handles out-of-order message scenarios
- Automatically reconciled on patient admission
- Prevents data loss from timing issues
# 1. Patient Info (Hash)
Key: patient_info:{mrn}
Key Example: patient_info:123456
Fields (with example):
- dob: "19840203" # Date of birth (YYYYMMDD format)
- sex: "F" # M or F
# 2. Patient Tests (List of JSON-encoded dictionaries)
Key: patient_tests:{mrn}
Key Example: patient_tests:123456
Values (with example): [
{"date": "20240301120000", "value": 1.2}, # date: test date in YYYYMMDDHHMMSS format, value: test creatinine level result
{"date": "20240302093000", "value": 1.4},
{"date": "20240303151500", "value": 1.8}
]
Table: history
Columns:
- mrn (TEXT) -- Patient ID, e.g., "123456"
- test_date (TEXT) -- YYYYMMDDHHMMSS format
- creatinine_result (REAL) -- Test value, e.g., 1.8
Example Row:
| mrn | test_date | creatinine_result |
|--------|----------------|-------------------|
| 123456 | 20240303151500 | 1.8 |Table: unmatched_lims
Columns: (same structure as history table)
Example: Test arrives before patient admission
| mrn | test_date | creatinine_result |
|--------|----------------|-------------------|
| 789012 | 20240304080000 | 2.1 |- Docker and Docker Compose (for containerized deployment)
- Python 3.12+ (for local development), but should work for Python 3.8+
- 900 MiB available storage for persistent volumes
- Access to ports 8000 (metrics), 8440 (MLLP), 8441 (pager)
The production Kubernetes deployment is now offline as the project is complete. The instructions below are for running the system locally with the simulator for testing/demonstration purposes.
- AKI Detection System: Connects to the MLLP server to receive messages and sends alerts to pager
- Simulator: Mimics a hospital system by sending HL7 messages and receiving pager alerts
- For Testing: Run simulator first, then connect AKI Detection system to it
# Clone the repository
git clone https://github.com/AliBoukind13/aki-detection.git
cd aki-detection# Create virtual environment
python3 -m venv venv
# Activate virtual environment
source venv/bin/activate # On macOS/Linux
# OR
venv\Scripts\activate # On Windows
# Install Python dependencies
pip install -r requirements.txt# Build the Docker image (make sure Docker is running on your machine)
docker build -t aki-detection .
# Terminal 1: Start the simulator
python simulator.py --mllp=8440 --pager=8441 --messages=messages.mllp
# Terminal 2: Start the AKI system in Docker
docker run -d \
-p 8000:8000 \
-e MLLP_ADDRESS="host.docker.internal:8440" \
-e PAGER_ADDRESS="host.docker.internal:8441" \
-v $(pwd)/data:/state \
--name aki-system \
aki-detection
# Note: Docker (Option A) uses data/*.db database files (NOT Databases/*.db)
# Local main.py (option B) uses Databases/*.db files# Terminal 1: Start the simulator (same as Option A)
python simulator.py --mllp=8440 --pager=8441 --messages=messages.mllp
# Terminal 2: Start the AKI system locally (make sure venv is activated)
MLLP_ADDRESS="localhost:8440" PAGER_ADDRESS="localhost:8441" python3 main.py
# Note: Local setup (Option B) uses Databases/*.db files# For Docker setup (Option A) - check logs
docker logs -f aki-system
# For local setup (Option B) - logs appear directly in terminal
# Check Prometheus metrics endpoint (works for both setups):
http://localhost:8000/metrics
# Paste the URL into a browser# Run all tests
python -m pytest unit_tests/
# Run specific test module (without .py extension)
python -m unit_tests.test_db
python -m unit_tests.test_parser
python -m unit_tests.test_message_validator
python -m unit_tests.test_aki_detection
python -m unit_tests.test_clientThe system includes comprehensive unit tests across all critical components:
test_db.py - Database Handler Tests
- Patient admission/discharge workflows
- Handling unmatched test results (tests arriving before admission)
- Redis and SQLite interaction integrity
- Invalid data validation (bad dates, invalid sex values)
- Transaction rollback scenarios
test_parser.py - HL7 Message Parser Tests
- Valid ADT and ORU message parsing
- Multiple OBR/OBX pairs handling
- Missing segment detection
- ACK message generation (success/failure)
test_message_validator.py - Message Validation Tests
- ADT^A01 admission validation (requires DOB, sex)
- ADT^A03 discharge validation (minimal requirements)
- ORU^R01 lab result validation
- Missing required fields detection (MSH.7, PID.3, OBR.7, etc.)
test_aki_detection.py - ML Inference Engine Tests
- Model loading and initialization
- Feature preprocessing pipeline
- Creatinine statistics computation
- Pager alert triggering with mock HTTP requests
- Z-score standardization validation
test_client.py - MLLP Client Tests
- TCP socket connection handling
- MLLP frame parsing (0x0B start, 0x1C 0x0D end)
- Multiple message processing in single connection
- Incomplete frame buffering
- ACK response verification
The system exposes Prometheus metrics on port 8000 (http://localhost:8000/metrics) for production monitoring.
- Clinical:
aki_predictions_total(positive/negative predictions),pagers_sent_total(alerts sent) - Message Processing:
hl7_messages_total(by type),blood_tests_total,hl7_parse_errors_total - System Health:
redis_operation_failures,mllp_reconnections_total,pager_errors_total
- Two-stage validation pipeline:
- Structure validation (HL7Parser): Ensures valid HL7 message format
- Data validation (DatabaseHandler): Validates content (dates, values, etc.)
- Supports multiple OBR/OBX pairs in ORU messages (multiple test results)
- Graceful handling of missing fields
Model Architecture & Deployment:
- Type: Neural Network trained for binary classification (AKI risk prediction)
- Storage Format: ONNX (Open Neural Network Exchange) -
model.onnx - Inference Pipeline: Patient data → Feature extraction → Z-score normalization → Neural network → Risk score (0-1)
- Decision Threshold: 0.4 (values > 0.4 trigger pager alerts)
The neural network uses 7 engineered features derived from patient data:
| Feature | Description | Source |
|---|---|---|
age |
Patient age in years | Computed from DOB |
sex |
Binary encoding (0=M, 1=F) | From PID segment |
creatinine_mean |
Average of all historical values | Computed from test history |
creatinine_max |
Maximum historical value | Computed from test history |
creatinine_min |
Minimum historical value | Computed from test history |
creatinine_std |
Standard deviation of values | Computed from test history |
creatinine_last |
Most recent test result | Latest ORU message |
Preprocessing Details:
- Features are standardized using Z-score normalization
- Training distribution parameters stored in
preprocessing_params.json - Mean and standard deviation values computed during training are applied to production data
-
Data Validation
- Strict HL7 message validation
- Type checking for numerical values
- Date/time format verification
-
Error Recovery
- Graceful handling of network issues: If the hospital system disconnects, the client automatically reconnects up to 13 times without losing data. Messages in progress are safely stored for retry.
- Transaction-based database updates: All database operations are atomic - if moving a patient's 100 test results from Redis to SQLite fails halfway, everything rolls back. No partial transfers that could corrupt patient data.
- Exponential backoff for failed operations: When the pager system is down, retries happen at increasing intervals (1s, 2s, 4s, 8s, 16s) rather than hammering the server. Prevents overwhelming recovering systems.
- Logging of all critical operations: Every patient admission, test result, and alert is logged with timestamps and outcomes. If something goes wrong, there's a complete audit trail for debugging.
-
Clinical Safety
- Conservative ML threshold (0.4) - prioritizes catching all AKI cases over reducing false alarms, which is common in medical ML applications
- Automatic retrying of unmatched results
- Complete audit trail via logging
- Fail-safe defaults for missing data
.
├── hl7_aki_system/ # Core system modules
│ ├── __init__.py # Package initialization
│ ├── aki_detection.py # ML inference engine and pager alerts
│ ├── config.py # Environment variables and system configuration
│ ├── DatabaseHandler.py # Redis and SQLite database operations (along with some Data Verification)
│ ├── hl7_parser.py # HL7 message parsing and ACK generation
│ ├── message_validator.py # HL7 message validation (ADT/ORU)
│ ├── metrics.py # Prometheus metrics definitions
│ ├── mllp_client.py # MLLP client - receives and processes HL7 messages
│ ├── model.onnx # Trained neural network model for AKI prediction
│ └── preprocessing_params.json # Feature normalization parameters (mean/std)
├── Databases/ # Database files and initialization
│ ├── __init__.py # Package initialization
│ ├── setup_db.py # Initialize SQLite databases (historical & unmatched)
│ ├── history.csv # Seed data for historical patient tests
│ ├── hospital_data.db # SQLite database for historical test results (when run locally using main.py)
│ └── unmatched_lims.db # SQLite database for tests awaiting admission (when run locally using main.py)
├── unit_tests/ # Test suite
│ ├── test_db.py # DatabaseHandler class tests
│ ├── test_parser.py # HL7Parser class tests
│ ├── test_message_validator.py # MessageValidator class tests
│ ├── test_aki_detection.py # InferenceEngine class tests
│ ├── test_client.py # MLLPClient class tests
│ └── hospital_data.db # Test database for unit tests
├── data/ # Runtime data directory (Docker/K8s only)
│ └── [Created at runtime for Redis persistence and databases]
├── main.py # Application entry point - starts MLLP client
├── __init__.py # Package initialization
├── simulator.py # Hospital system simulator for testing
├── simulator_test.py # Test suite for simulator
├── requirements.txt # Python package dependencies
├── Dockerfile # Container image definition
├── start.sh # Container startup script (Redis + app)
├── coursework4.yaml # Kubernetes deployment for Azure AKS
├── messages.mllp # Sample HL7 messages in MLLP format
├── aki.csv # Reference data: known AKI cases (MRN, timestamp)
├── .gitignore # Git exclusion patterns
└── README.md # Project documentation (this file)