A high-performance, production-ready data pipeline for processing large volumes of clinic and healthcare data, built with Go.
- High Performance: Concurrent processing with configurable worker pools
- Scalable: Handles very large datasets efficiently with batched processing
- Robust: Comprehensive validation and error handling
- Observable: Built-in metrics and monitoring
- Well-Tested: Extensive unit tests, integration tests, and benchmarks
- Professional: Clean architecture following Go best practices
The pipeline consists of several layers:
┌─────────────────────────────────────────────────────────────┐
│ Data Sources │
│ (JSON files, APIs, databases) │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Ingestion Layer │
│ - Concurrent reading with worker pools │
│ - Streaming JSON parsing │
│ - Batch buffering │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Transformation Layer │
│ - Data validation (formats, ranges, business rules) │
│ - Normalization (text formatting, case conversion) │
│ - Enrichment (computed fields, timestamps) │
│ - Deduplication │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Storage Layer │
│ - Batched writes for efficiency │
│ - Buffered I/O │
│ - Multiple output formats │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Monitoring │
│ - Real-time metrics (throughput, error rates) │
│ - Performance tracking │
│ - Structured logging │
└─────────────────────────────────────────────────────────────┘
clinic-data-flow/
├── cmd/
│ └── pipeline/ # Main application entry point
│ └── main.go
├── internal/
│ ├── domain/ # Core domain models
│ │ └── models.go # Patient, Appointment, MedicalRecord, etc.
│ ├── ingestion/ # Data reading and ingestion
│ │ └── reader.go # Concurrent readers with worker pools
│ ├── transform/ # Data transformation and validation
│ │ ├── validator.go # Validation logic
│ │ └── transformer.go # Transformation and enrichment
│ ├── storage/ # Data storage and output
│ │ └── writer.go # Batched writers
│ ├── metrics/ # Monitoring and metrics
│ │ └── metrics.go # Performance tracking
│ ├── pipeline/ # Pipeline orchestration
│ │ ├── pipeline.go # Main pipeline coordinator
│ │ └── pipeline_test.go # Integration tests
│ └── generator/ # Test data generation
│ ├── generator.go # Synthetic data generator
│ └── generator_test.go # Generator tests
├── go.mod
├── go.sum
└── README.md
- Personal information (name, DOB, gender)
- Contact details (email, phone, address)
- Insurance information
- Timestamps
- References to patient and doctor
- Scheduling information
- Appointment type and status
- Clinical notes
- Patient and doctor references
- Diagnosis and symptoms
- Prescriptions and medications
- Laboratory results
- Vital signs measurements
- Clinical notes
- Professional information
- Specialization
- License number
- Contact details
# Clone the repository
git clone https://github.com/yourusername/clinic-data-flow.git
cd clinic-data-flow
# Download dependencies
go mod download
# Build the application
go build -o bin/pipeline ./cmd/pipeline# Process 10,000 patient records with default settings
./bin/pipeline -type patients -count 10000
# Process appointments with custom worker count
./bin/pipeline -type appointments -count 5000 -workers 20
# Process medical records with output to file
./bin/pipeline -type medical_records -count 1000 -output output.json
# Large-scale processing with optimized batch size
./bin/pipeline -type patients -count 1000000 -workers 50 -batch 500-type: Data type to process (patients,appointments,medical_records)-count: Number of records to generate and process (default: 10000)-workers: Number of concurrent workers (default: 10)-batch: Batch size for processing (default: 100)-output: Output file path (empty for stdout)
package main
import (
"context"
"os"
"time"
"github.com/clinic-data-flow/internal/pipeline"
"go.uber.org/zap"
)
func main() {
// Create logger
logger, _ := zap.NewProduction()
// Configure pipeline
p := pipeline.New(pipeline.Config{
WorkerCount: 20, // 20 concurrent workers
BatchSize: 500, // Process 500 records per batch
BufferSize: 2000, // Channel buffer size
MonitoringInterval: 10 * time.Second,
Logger: logger,
})
// Open input and output
input, _ := os.Open("patients.json")
defer input.Close()
output, _ := os.Create("processed.json")
defer output.Close()
// Process data
ctx := context.Background()
if err := p.ProcessPatients(ctx, input, output); err != nil {
logger.Fatal("pipeline failed", zap.Error(err))
}
// Get metrics
metrics := p.GetMetrics()
snapshot := metrics.GetSnapshot()
logger.Info("completed",
zap.Int64("records_processed", snapshot.RecordsProcessed),
zap.Float64("throughput_rps", snapshot.Throughput),
)
}The pipeline is optimized for high-throughput processing:
Performance on a typical development machine (results may vary):
| Dataset Size | Workers | Batch Size | Throughput | Time |
|---|---|---|---|---|
| 10,000 | 10 | 100 | ~50,000 rps | 200ms |
| 100,000 | 20 | 500 | ~80,000 rps | 1.25s |
| 1,000,000 | 50 | 1000 | ~100,000 rps | 10s |
- Adjust Worker Count: Increase workers for I/O-bound operations
- Optimize Batch Size: Larger batches reduce overhead but use more memory
- Buffer Size: Increase for high-throughput scenarios
- Validation: Disable strict validation in non-strict mode for better performance
# Run all tests
go test ./...
# Run with coverage
go test -cover ./...
# Generate coverage report
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out# Run all benchmarks
go test -bench=. ./...
# Run specific benchmark
go test -bench=BenchmarkPipelineThroughput ./internal/pipeline
# Run with memory profiling
go test -bench=. -benchmem ./...=== RUN TestPipelineProcessPatients
--- PASS: TestPipelineProcessPatients (0.15s)
=== RUN TestValidatePatient
--- PASS: TestValidatePatient (0.00s)
=== RUN TestGeneratePatients
--- PASS: TestGeneratePatients (0.05s)
PASS
coverage: 85.2% of statements
- Required fields: ID, first name, last name, date of birth
- Date of birth must be in the past and reasonable (<150 years)
- Gender must be M, F, or O
- Email format validation
- Phone number format validation
- Required fields: ID, patient ID, doctor ID, appointment time
- Duration must be positive and ≤480 minutes
- Status must be: scheduled, completed, cancelled, or no-show
- Required fields: ID, patient ID, doctor ID, date
- Date cannot be in the future
- Vital signs must be within reasonable medical ranges:
- Temperature: 35-42°C
- Blood pressure: Systolic 70-250, Diastolic 40-150 mmHg
- Heart rate: 40-200 bpm
- Respiratory rate: 8-40 breaths/min
- Oxygen saturation: 70-100%
- Weight: 1-300 kg
- Height: 30-250 cm
The pipeline provides comprehensive metrics:
- Records Read: Total records ingested
- Records Processed: Successfully processed records
- Records Written: Records written to output
- Records Failed: Failed validations
- Bytes Read/Written: I/O volume
- Processing Time: Total processing duration
- Throughput: Records per second
- Error Rate: Percentage of failed records
- Elapsed Time: Total pipeline runtime
Metrics are logged periodically and available via the API:
metrics := pipeline.GetMetrics()
snapshot := metrics.GetSnapshot()
fmt.Printf("Throughput: %.2f records/sec\n", snapshot.Throughput)
fmt.Printf("Error Rate: %.2f%%\n", snapshot.ErrorRate)type Config struct {
WorkerCount int // Number of concurrent workers (default: 10)
BatchSize int // Records per batch (default: 100)
BufferSize int // Channel buffer size (default: 1000)
MonitoringInterval time.Duration // Metrics logging interval (default: 10s)
Logger *zap.Logger // Structured logger
}- CPU-bound workloads: Set workers = CPU cores
- I/O-bound workloads: Set workers = 2-4× CPU cores
- Memory constraints: Reduce batch size and buffer size
- High throughput: Increase batch size and workers
The pipeline implements multiple levels of error handling:
- Input Validation: Catches malformed data early
- Transformation Errors: Logged and tracked in metrics
- Write Errors: Propagated up the chain
- Context Cancellation: Graceful shutdown support
Failed records are logged with details but don't stop the pipeline.
Contributions are welcome! Please follow these guidelines:
- Write tests for new features
- Maintain code coverage above 80%
- Follow Go best practices and conventions
- Add comments for exported functions
- Update documentation
MIT License - See LICENSE file for details
For questions or support, please open an issue on GitHub.
Built with ❤️ for healthcare data processing