Skip to content

skydev9293/Clinic-Data-Flow

Repository files navigation

Clinic Data Flow Pipeline

A high-performance, production-ready data pipeline for processing large volumes of clinic and healthcare data, built with Go.

Features

  • High Performance: Concurrent processing with configurable worker pools
  • Scalable: Handles very large datasets efficiently with batched processing
  • Robust: Comprehensive validation and error handling
  • Observable: Built-in metrics and monitoring
  • Well-Tested: Extensive unit tests, integration tests, and benchmarks
  • Professional: Clean architecture following Go best practices

Architecture

The pipeline consists of several layers:

┌─────────────────────────────────────────────────────────────┐
│                        Data Sources                          │
│              (JSON files, APIs, databases)                   │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                    Ingestion Layer                           │
│  - Concurrent reading with worker pools                     │
│  - Streaming JSON parsing                                   │
│  - Batch buffering                                           │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                 Transformation Layer                         │
│  - Data validation (formats, ranges, business rules)        │
│  - Normalization (text formatting, case conversion)         │
│  - Enrichment (computed fields, timestamps)                 │
│  - Deduplication                                             │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                   Storage Layer                              │
│  - Batched writes for efficiency                            │
│  - Buffered I/O                                              │
│  - Multiple output formats                                   │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                    Monitoring                                │
│  - Real-time metrics (throughput, error rates)              │
│  - Performance tracking                                      │
│  - Structured logging                                        │
└─────────────────────────────────────────────────────────────┘

Project Structure

clinic-data-flow/
├── cmd/
│   └── pipeline/              # Main application entry point
│       └── main.go
├── internal/
│   ├── domain/                # Core domain models
│   │   └── models.go          # Patient, Appointment, MedicalRecord, etc.
│   ├── ingestion/             # Data reading and ingestion
│   │   └── reader.go          # Concurrent readers with worker pools
│   ├── transform/             # Data transformation and validation
│   │   ├── validator.go       # Validation logic
│   │   └── transformer.go     # Transformation and enrichment
│   ├── storage/               # Data storage and output
│   │   └── writer.go          # Batched writers
│   ├── metrics/               # Monitoring and metrics
│   │   └── metrics.go         # Performance tracking
│   ├── pipeline/              # Pipeline orchestration
│   │   ├── pipeline.go        # Main pipeline coordinator
│   │   └── pipeline_test.go   # Integration tests
│   └── generator/             # Test data generation
│       ├── generator.go       # Synthetic data generator
│       └── generator_test.go  # Generator tests
├── go.mod
├── go.sum
└── README.md

Domain Models

Patient

  • Personal information (name, DOB, gender)
  • Contact details (email, phone, address)
  • Insurance information
  • Timestamps

Appointment

  • References to patient and doctor
  • Scheduling information
  • Appointment type and status
  • Clinical notes

Medical Record

  • Patient and doctor references
  • Diagnosis and symptoms
  • Prescriptions and medications
  • Laboratory results
  • Vital signs measurements
  • Clinical notes

Doctor

  • Professional information
  • Specialization
  • License number
  • Contact details

Installation

# Clone the repository
git clone https://github.com/yourusername/clinic-data-flow.git
cd clinic-data-flow

# Download dependencies
go mod download

# Build the application
go build -o bin/pipeline ./cmd/pipeline

Usage

Basic Usage

# Process 10,000 patient records with default settings
./bin/pipeline -type patients -count 10000

# Process appointments with custom worker count
./bin/pipeline -type appointments -count 5000 -workers 20

# Process medical records with output to file
./bin/pipeline -type medical_records -count 1000 -output output.json

# Large-scale processing with optimized batch size
./bin/pipeline -type patients -count 1000000 -workers 50 -batch 500

Command-Line Options

  • -type: Data type to process (patients, appointments, medical_records)
  • -count: Number of records to generate and process (default: 10000)
  • -workers: Number of concurrent workers (default: 10)
  • -batch: Batch size for processing (default: 100)
  • -output: Output file path (empty for stdout)

Programmatic Usage

package main

import (
    "context"
    "os"
    "time"

    "github.com/clinic-data-flow/internal/pipeline"
    "go.uber.org/zap"
)

func main() {
    // Create logger
    logger, _ := zap.NewProduction()

    // Configure pipeline
    p := pipeline.New(pipeline.Config{
        WorkerCount:        20,           // 20 concurrent workers
        BatchSize:          500,          // Process 500 records per batch
        BufferSize:         2000,         // Channel buffer size
        MonitoringInterval: 10 * time.Second,
        Logger:             logger,
    })

    // Open input and output
    input, _ := os.Open("patients.json")
    defer input.Close()

    output, _ := os.Create("processed.json")
    defer output.Close()

    // Process data
    ctx := context.Background()
    if err := p.ProcessPatients(ctx, input, output); err != nil {
        logger.Fatal("pipeline failed", zap.Error(err))
    }

    // Get metrics
    metrics := p.GetMetrics()
    snapshot := metrics.GetSnapshot()

    logger.Info("completed",
        zap.Int64("records_processed", snapshot.RecordsProcessed),
        zap.Float64("throughput_rps", snapshot.Throughput),
    )
}

Performance

The pipeline is optimized for high-throughput processing:

Benchmarks

Performance on a typical development machine (results may vary):

Dataset Size Workers Batch Size Throughput Time
10,000 10 100 ~50,000 rps 200ms
100,000 20 500 ~80,000 rps 1.25s
1,000,000 50 1000 ~100,000 rps 10s

Performance Tips

  1. Adjust Worker Count: Increase workers for I/O-bound operations
  2. Optimize Batch Size: Larger batches reduce overhead but use more memory
  3. Buffer Size: Increase for high-throughput scenarios
  4. Validation: Disable strict validation in non-strict mode for better performance

Testing

Run All Tests

# Run all tests
go test ./...

# Run with coverage
go test -cover ./...

# Generate coverage report
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out

Run Benchmarks

# Run all benchmarks
go test -bench=. ./...

# Run specific benchmark
go test -bench=BenchmarkPipelineThroughput ./internal/pipeline

# Run with memory profiling
go test -bench=. -benchmem ./...

Example Test Output

=== RUN   TestPipelineProcessPatients
--- PASS: TestPipelineProcessPatients (0.15s)
=== RUN   TestValidatePatient
--- PASS: TestValidatePatient (0.00s)
=== RUN   TestGeneratePatients
--- PASS: TestGeneratePatients (0.05s)

PASS
coverage: 85.2% of statements

Validation Rules

Patient Validation

  • Required fields: ID, first name, last name, date of birth
  • Date of birth must be in the past and reasonable (<150 years)
  • Gender must be M, F, or O
  • Email format validation
  • Phone number format validation

Appointment Validation

  • Required fields: ID, patient ID, doctor ID, appointment time
  • Duration must be positive and ≤480 minutes
  • Status must be: scheduled, completed, cancelled, or no-show

Medical Record Validation

  • Required fields: ID, patient ID, doctor ID, date
  • Date cannot be in the future
  • Vital signs must be within reasonable medical ranges:
    • Temperature: 35-42°C
    • Blood pressure: Systolic 70-250, Diastolic 40-150 mmHg
    • Heart rate: 40-200 bpm
    • Respiratory rate: 8-40 breaths/min
    • Oxygen saturation: 70-100%
    • Weight: 1-300 kg
    • Height: 30-250 cm

Monitoring and Metrics

The pipeline provides comprehensive metrics:

  • Records Read: Total records ingested
  • Records Processed: Successfully processed records
  • Records Written: Records written to output
  • Records Failed: Failed validations
  • Bytes Read/Written: I/O volume
  • Processing Time: Total processing duration
  • Throughput: Records per second
  • Error Rate: Percentage of failed records
  • Elapsed Time: Total pipeline runtime

Metrics are logged periodically and available via the API:

metrics := pipeline.GetMetrics()
snapshot := metrics.GetSnapshot()

fmt.Printf("Throughput: %.2f records/sec\n", snapshot.Throughput)
fmt.Printf("Error Rate: %.2f%%\n", snapshot.ErrorRate)

Configuration

Pipeline Configuration

type Config struct {
    WorkerCount        int           // Number of concurrent workers (default: 10)
    BatchSize          int           // Records per batch (default: 100)
    BufferSize         int           // Channel buffer size (default: 1000)
    MonitoringInterval time.Duration // Metrics logging interval (default: 10s)
    Logger             *zap.Logger   // Structured logger
}

Tuning Guidelines

  • CPU-bound workloads: Set workers = CPU cores
  • I/O-bound workloads: Set workers = 2-4× CPU cores
  • Memory constraints: Reduce batch size and buffer size
  • High throughput: Increase batch size and workers

Error Handling

The pipeline implements multiple levels of error handling:

  1. Input Validation: Catches malformed data early
  2. Transformation Errors: Logged and tracked in metrics
  3. Write Errors: Propagated up the chain
  4. Context Cancellation: Graceful shutdown support

Failed records are logged with details but don't stop the pipeline.

Contributing

Contributions are welcome! Please follow these guidelines:

  1. Write tests for new features
  2. Maintain code coverage above 80%
  3. Follow Go best practices and conventions
  4. Add comments for exported functions
  5. Update documentation

License

MIT License - See LICENSE file for details

Contact

For questions or support, please open an issue on GitHub.


Built with ❤️ for healthcare data processing

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published