Skip to content

Real-time AKI detection system using ML inference on HL7 hospital data. Achieves 0.9843 F3 score with 26ms latency. Production-deployed on Kubernetes.

Notifications You must be signed in to change notification settings

AliBoukind13/aki-detection

Repository files navigation

AKI Detection System

Overview

This system implements real-time Acute Kidney Injury (AKI) detection by processing HL7 messages from hospital systems. It monitors patient creatinine levels and triggers alerts when potential AKI cases are detected.

Live Deployment Performance

  • F3 Score: 0.9843
  • Average Latency: 0.026 seconds

Table of Contents

System Architecture

System Flow

  1. Hospital systems send HL7 messages (ADT and ORU) via MLLP
  2. Messages are parsed and validated for message structure (does it have all the required fields) and data integrity (do the required fields contain sensible data in typical formats)
  3. Data routing based on message type:
    • ADT^A01 (Admission):
      • The patient is added to the Redis cache (currently admitted patients database)
      • The patient's historical test data is loaded from the SQLite database into Redis (if they were admitted before and have previous test results prior to the current admission)
      • Any unmatched test results that arrived before the patient's admission are retrieved from the dedicated SQLite database, processed, and moved to Redis
    • ADT^A03 (Discharge):
      • The patient's data is moved from Redis to the SQLite historical database
      • The patient is removed from the Redis cache
    • ORU^R01 (Test Result):
      • If the patient is already admitted (in the Redis cache): The test is added to the Redis database and triggers ML inference (checks if the patient is at risk of AKI)
      • If the patient is not admitted: The test is stored in the unmatched LIMS (unmatched messages) database for later processing, waiting for the patient to be admitted
  4. ML inference runs when new creatinine results are added for admitted patients
  5. Pager alerts triggered for positive predictions

Data Storage Architecture

The system employs three distinct databases for different purposes:

1. Redis (Currently Admitted Patients)

  • In-memory cache with persistence (AOF - Append Only File)
  • Allows fast lookups for real-time ML inference
  • Stores patient demographics (sex and date of birth) and all of the patient's test results (even historical ones prior to the patient's current admission)
  • Data persists across container restarts via /state/redisdata (in the Docker/Kubernetes setup. It may have a different location if run locally using the simulator)

2. SQLite - Historical Database (hospital_data.db)

  • Persistent storage of all patient test history
  • Source of historical test data loaded to Redis on patient admission (if any)
  • Destination for the patient's test results on discharge
  • Allows long-term data retention

3. SQLite - Unmatched LIMS Database (unmatched_lims.db)

  • Buffer for test results arriving before the patient's admission
  • Handles out-of-order message scenarios
  • Automatically reconciled on patient admission
  • Prevents data loss from timing issues

Database Schemas & Examples

Currently Admitted Patients' Redis Data Structures

# 1. Patient Info (Hash)
Key: patient_info:{mrn}
Key Example: patient_info:123456

Fields (with example):
  - dob: "19840203" # Date of birth (YYYYMMDD format)
  - sex: "F"        # M or F

# 2. Patient Tests (List of JSON-encoded dictionaries)
Key: patient_tests:{mrn}
Key Example: patient_tests:123456

Values (with example): [
  {"date": "20240301120000", "value": 1.2}, # date: test date in YYYYMMDDHHMMSS format, value: test creatinine level result
  {"date": "20240302093000", "value": 1.4},
  {"date": "20240303151500", "value": 1.8}
]

SQLite - Historical Database (hospital_data.db)

Table: history
Columns:
  - mrn (TEXT)              -- Patient ID, e.g., "123456"
  - test_date (TEXT)        -- YYYYMMDDHHMMSS format
  - creatinine_result (REAL) -- Test value, e.g., 1.8

Example Row:
| mrn    | test_date      | creatinine_result |
|--------|----------------|-------------------|
| 123456 | 20240303151500 | 1.8               |

SQLite - Unmatched LIMS Database (unmatched_lims.db)

Table: unmatched_lims
Columns: (same structure as history table)

Example: Test arrives before patient admission
| mrn    | test_date      | creatinine_result |
|--------|----------------|-------------------|
| 789012 | 20240304080000 | 2.1               |

Prerequisites

  • Docker and Docker Compose (for containerized deployment)
  • Python 3.12+ (for local development), but should work for Python 3.8+
  • 900 MiB available storage for persistent volumes
  • Access to ports 8000 (metrics), 8440 (MLLP), 8441 (pager)

Installation & Setup

Note: Original Production System

The production Kubernetes deployment is now offline as the project is complete. The instructions below are for running the system locally with the simulator for testing/demonstration purposes.

System Architecture Overview

  • AKI Detection System: Connects to the MLLP server to receive messages and sends alerts to pager
  • Simulator: Mimics a hospital system by sending HL7 messages and receiving pager alerts
  • For Testing: Run simulator first, then connect AKI Detection system to it

Quick Start with Simulator

Step 1: Clone the Repository

# Clone the repository
git clone https://github.com/AliBoukind13/aki-detection.git
cd aki-detection

Step 2: Set up Python Environment

# Create virtual environment
python3 -m venv venv

# Activate virtual environment
source venv/bin/activate  # On macOS/Linux
# OR
venv\Scripts\activate     # On Windows

# Install Python dependencies
pip install -r requirements.txt

Step 3: Choose Your Setup Method

Option A: Run with Docker (Recommended)
# Build the Docker image (make sure Docker is running on your machine)
docker build -t aki-detection .

# Terminal 1: Start the simulator
python simulator.py --mllp=8440 --pager=8441 --messages=messages.mllp

# Terminal 2: Start the AKI system in Docker
docker run -d \
  -p 8000:8000 \
  -e MLLP_ADDRESS="host.docker.internal:8440" \
  -e PAGER_ADDRESS="host.docker.internal:8441" \
  -v $(pwd)/data:/state \
  --name aki-system \
  aki-detection

# Note: Docker (Option A) uses data/*.db database files (NOT Databases/*.db)
#       Local main.py (option B) uses Databases/*.db files
Option B: Run Locally
# Terminal 1: Start the simulator (same as Option A)
python simulator.py --mllp=8440 --pager=8441 --messages=messages.mllp

# Terminal 2: Start the AKI system locally (make sure venv is activated)
MLLP_ADDRESS="localhost:8440" PAGER_ADDRESS="localhost:8441" python3 main.py

# Note: Local setup (Option B) uses Databases/*.db files

Step 4: Verify Everything is Running Correctly

# For Docker setup (Option A) - check logs
docker logs -f aki-system

# For local setup (Option B) - logs appear directly in terminal

# Check Prometheus metrics endpoint (works for both setups):
http://localhost:8000/metrics
# Paste the URL into a browser

Testing

Running Unit Tests

# Run all tests
python -m pytest unit_tests/

# Run specific test module (without .py extension)
python -m unit_tests.test_db
python -m unit_tests.test_parser
python -m unit_tests.test_message_validator
python -m unit_tests.test_aki_detection
python -m unit_tests.test_client

Test Coverage

The system includes comprehensive unit tests across all critical components:

test_db.py - Database Handler Tests

  • Patient admission/discharge workflows
  • Handling unmatched test results (tests arriving before admission)
  • Redis and SQLite interaction integrity
  • Invalid data validation (bad dates, invalid sex values)
  • Transaction rollback scenarios

test_parser.py - HL7 Message Parser Tests

  • Valid ADT and ORU message parsing
  • Multiple OBR/OBX pairs handling
  • Missing segment detection
  • ACK message generation (success/failure)

test_message_validator.py - Message Validation Tests

  • ADT^A01 admission validation (requires DOB, sex)
  • ADT^A03 discharge validation (minimal requirements)
  • ORU^R01 lab result validation
  • Missing required fields detection (MSH.7, PID.3, OBR.7, etc.)

test_aki_detection.py - ML Inference Engine Tests

  • Model loading and initialization
  • Feature preprocessing pipeline
  • Creatinine statistics computation
  • Pager alert triggering with mock HTTP requests
  • Z-score standardization validation

test_client.py - MLLP Client Tests

  • TCP socket connection handling
  • MLLP frame parsing (0x0B start, 0x1C 0x0D end)
  • Multiple message processing in single connection
  • Incomplete frame buffering
  • ACK response verification

Monitoring & Metrics

The system exposes Prometheus metrics on port 8000 (http://localhost:8000/metrics) for production monitoring.

Key Metrics

  • Clinical: aki_predictions_total (positive/negative predictions), pagers_sent_total (alerts sent)
  • Message Processing: hl7_messages_total (by type), blood_tests_total, hl7_parse_errors_total
  • System Health: redis_operation_failures, mllp_reconnections_total, pager_errors_total

Key Implementation Details

Message Validation

  • Two-stage validation pipeline:
    1. Structure validation (HL7Parser): Ensures valid HL7 message format
    2. Data validation (DatabaseHandler): Validates content (dates, values, etc.)
  • Supports multiple OBR/OBX pairs in ORU messages (multiple test results)
  • Graceful handling of missing fields

ML Model

Model Architecture & Deployment:

  • Type: Neural Network trained for binary classification (AKI risk prediction)
  • Storage Format: ONNX (Open Neural Network Exchange) - model.onnx
  • Inference Pipeline: Patient data → Feature extraction → Z-score normalization → Neural network → Risk score (0-1)
  • Decision Threshold: 0.4 (values > 0.4 trigger pager alerts)

The neural network uses 7 engineered features derived from patient data:

Feature Description Source
age Patient age in years Computed from DOB
sex Binary encoding (0=M, 1=F) From PID segment
creatinine_mean Average of all historical values Computed from test history
creatinine_max Maximum historical value Computed from test history
creatinine_min Minimum historical value Computed from test history
creatinine_std Standard deviation of values Computed from test history
creatinine_last Most recent test result Latest ORU message

Preprocessing Details:

  • Features are standardized using Z-score normalization
  • Training distribution parameters stored in preprocessing_params.json
  • Mean and standard deviation values computed during training are applied to production data

Safety Features

  1. Data Validation

    • Strict HL7 message validation
    • Type checking for numerical values
    • Date/time format verification
  2. Error Recovery

    • Graceful handling of network issues: If the hospital system disconnects, the client automatically reconnects up to 13 times without losing data. Messages in progress are safely stored for retry.
    • Transaction-based database updates: All database operations are atomic - if moving a patient's 100 test results from Redis to SQLite fails halfway, everything rolls back. No partial transfers that could corrupt patient data.
    • Exponential backoff for failed operations: When the pager system is down, retries happen at increasing intervals (1s, 2s, 4s, 8s, 16s) rather than hammering the server. Prevents overwhelming recovering systems.
    • Logging of all critical operations: Every patient admission, test result, and alert is logged with timestamps and outcomes. If something goes wrong, there's a complete audit trail for debugging.
  3. Clinical Safety

    • Conservative ML threshold (0.4) - prioritizes catching all AKI cases over reducing false alarms, which is common in medical ML applications
    • Automatic retrying of unmatched results
    • Complete audit trail via logging
    • Fail-safe defaults for missing data

Project Structure

.
├── hl7_aki_system/               # Core system modules
│   ├── __init__.py               # Package initialization
│   ├── aki_detection.py          # ML inference engine and pager alerts
│   ├── config.py                 # Environment variables and system configuration
│   ├── DatabaseHandler.py        # Redis and SQLite database operations (along with some Data Verification)
│   ├── hl7_parser.py             # HL7 message parsing and ACK generation
│   ├── message_validator.py      # HL7 message validation (ADT/ORU)
│   ├── metrics.py                # Prometheus metrics definitions
│   ├── mllp_client.py            # MLLP client - receives and processes HL7 messages
│   ├── model.onnx                # Trained neural network model for AKI prediction
│   └── preprocessing_params.json # Feature normalization parameters (mean/std)
├── Databases/                   # Database files and initialization
│   ├── __init__.py              # Package initialization
│   ├── setup_db.py              # Initialize SQLite databases (historical & unmatched)
│   ├── history.csv              # Seed data for historical patient tests
│   ├── hospital_data.db         # SQLite database for historical test results (when run locally using main.py)
│   └── unmatched_lims.db        # SQLite database for tests awaiting admission (when run locally using main.py)
├── unit_tests/                   # Test suite
│   ├── test_db.py                # DatabaseHandler class tests
│   ├── test_parser.py            # HL7Parser class tests
│   ├── test_message_validator.py # MessageValidator class tests
│   ├── test_aki_detection.py     # InferenceEngine class tests
│   ├── test_client.py            # MLLPClient class tests
│   └── hospital_data.db          # Test database for unit tests
├── data/                         # Runtime data directory (Docker/K8s only)
│   └── [Created at runtime for Redis persistence and databases]
├── main.py                      # Application entry point - starts MLLP client
├── __init__.py                  # Package initialization
├── simulator.py                 # Hospital system simulator for testing
├── simulator_test.py            # Test suite for simulator
├── requirements.txt             # Python package dependencies
├── Dockerfile                   # Container image definition
├── start.sh                     # Container startup script (Redis + app)
├── coursework4.yaml             # Kubernetes deployment for Azure AKS
├── messages.mllp                # Sample HL7 messages in MLLP format
├── aki.csv                      # Reference data: known AKI cases (MRN, timestamp)
├── .gitignore                   # Git exclusion patterns
└── README.md                    # Project documentation (this file)

About

Real-time AKI detection system using ML inference on HL7 hospital data. Achieves 0.9843 F3 score with 26ms latency. Production-deployed on Kubernetes.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages