-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Improve Slow request metric calculation; Add bulkSync config to fine-tune #25275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR improves slow request metric calculation and adds bulk operation configuration for fine-tuning concurrent database operations. The changes address connection pool exhaustion during bulk operations by introducing a semaphore-based throttling mechanism and improve metric accuracy across virtual threads using atomic operations.
Changes:
- Introduced
BulkOperationSemaphoreto limit concurrent DB operations during bulk processing and prevent connection pool starvation - Modified
RequestLatencyContextto use atomic operations (AtomicLong,AtomicInteger) for thread-safe metric accumulation across virtual threads - Added
BulkOperationConfigurationwith auto-scaling support based on connection pool size - Updated
EntityRepository.bulkCreateOrUpdateEntities()to use semaphore throttling and propagate request latency context to virtual threads - Added comprehensive test coverage for concurrency scenarios, semaphore behavior, and metrics tracking
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| RequestLatencyContext.java | Changed timing counters from primitive longs/ints to AtomicLong/AtomicInteger for thread-safe metric accumulation; added context propagation methods (getContext/setContext/clearContext) for virtual threads; added reset() method for testing |
| EntityRepository.java | Integrated semaphore-based throttling for bulk operations; added request latency context propagation to virtual threads; replaced fixed thread pool with virtual thread executor; added timeout handling for overload scenarios |
| BulkOperationSemaphore.java | New singleton class managing concurrent DB operations using fair semaphore with configurable permits and timeouts; supports auto-scaling based on connection pool size |
| BulkOperationConfiguration.java | New configuration class with parameters for max concurrent operations, connection pool percentage allocation, auto-scaling flag, and acquire timeout |
| OpenMetadataApplicationConfig.java | Added bulkOperationConfiguration property with getter that returns default instance if not configured |
| OpenMetadataApplication.java | Initialized BulkOperationSemaphore during application startup using config and connection pool size |
| RequestLatencyContextTest.java | Added 480+ lines of new tests covering context propagation, bulk operations simulation, concurrent operations, stress testing, and timing accuracy |
| BulkOperationSemaphoreTest.java | New test file with 263 lines covering initialization, permit acquisition/release, concurrent access, timeouts, and auto-scaling calculations |
| BulkOperationIntegrationTest.java | New integration test file with 528 lines covering semaphore throttling, metrics tracking across virtual threads, real workload simulation, and concurrent bulk requests |
Comments suppressed due to low confidence (1)
openmetadata-service/src/test/java/org/openmetadata/service/monitoring/RequestLatencyContextTest.java:37
- The test uses reflection to clear static maps in
RequestLatencyContext, but the production code now has a publicreset()method (line 382-389 in RequestLatencyContext.java) that does exactly this. Replace the reflection-based clearStaticMaps() method with a simple call toRequestLatencyContext.reset()to improve maintainability and avoid fragility.
private void clearStaticMaps() throws Exception {
java.lang.reflect.Field requestTimersField =
RequestLatencyContext.class.getDeclaredField("requestTimers");
requestTimersField.setAccessible(true);
((java.util.concurrent.ConcurrentHashMap<?, ?>) requestTimersField.get(null)).clear();
| // Initialize bulk operation semaphore for connection-aware throttling | ||
| BulkOperationSemaphore.initialize( | ||
| catalogConfig.getBulkOperationConfiguration(), | ||
| catalogConfig.getDataSourceFactory().getMaxSize()); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to getMaxSize() on DataSourceFactory may fail if the factory is of type HikariCPDataSourceFactory which uses maximumPoolSize instead. The getMaxSize() method is inherited from Dropwizard's DataSourceFactory base class, but HikariCPDataSourceFactory has its own maximumPoolSize field. Verify that getMaxSize() returns the correct value, or consider using HikariCPDataSourceFactory.getMaximumPoolSize() directly when the factory is of that type. If this is correct, consider adding a comment explaining the relationship between these properties.
| List<CompletableFuture<Void>> futures = new ArrayList<>(); | ||
|
|
||
| // Capture the request latency context from the parent thread for propagation to virtual threads | ||
| RequestLatencyContext.RequestContext parentLatencyContext = RequestLatencyContext.getContext(); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parent latency context is captured but there's no null check. If bulkCreateOrUpdateEntities is called outside of a request context (e.g., from a background job or during testing), parentLatencyContext will be null. While line 6891 uses setContext(parentLatencyContext) which has null protection, logging and debugging will be affected. Consider adding a null check with a debug log to indicate when bulk operations occur outside request tracking.
| @Max(200) | ||
| private int maxConcurrentDbOperations = 10; |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The maximum validation is set to 200, but the Javadoc recommendations go up to 80 for 16+ vCore databases. This discrepancy could be confusing. Either update the documentation to explain why 200 is the maximum allowed value (e.g., for very large enterprise deployments), or reduce the @max constraint to align with the documented recommendations.
|
|
||
| @Getter | ||
| private static class RequestContext { | ||
| public static class RequestContext { |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RequestContext inner class visibility was changed from private to public to support context propagation. However, this exposes internal implementation details. Consider whether the public exposure is necessary, or if a narrower API (e.g., package-private or protected) would be sufficient. If public is required, add comprehensive Javadoc explaining the class's purpose and how external code should interact with it.
| private BulkOperationSemaphore(int maxConcurrentOperations, long acquireTimeoutMs) { | ||
| this.maxPermits = maxConcurrentOperations; | ||
| this.acquireTimeoutMs = acquireTimeoutMs; | ||
| this.semaphore = new Semaphore(maxConcurrentOperations, true); // Fair ordering |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a fair semaphore (fairness = true) ensures FIFO ordering but has significant performance overhead compared to non-fair semaphores. For bulk operations, strict ordering may not be necessary. Consider whether fair ordering is required for your use case, or if switching to fairness = false would provide better throughput without impacting correctness. Document the reasoning for using fair mode if it's intentional.
| this.semaphore = new Semaphore(maxConcurrentOperations, true); // Fair ordering | |
| // Use non-fair semaphore for better throughput; FIFO ordering is not required for bulk operations. | |
| this.semaphore = new Semaphore(maxConcurrentOperations, false); |
| LOG.info( | ||
| "Starting bulk operation for {} entities with {}/{} permits available", | ||
| entities.size(), | ||
| semaphore.availablePermits(), | ||
| semaphore.getMaxPermits()); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This INFO-level log is emitted for every bulk operation request and could generate significant log volume in production environments with frequent bulk operations. Consider either changing this to DEBUG level, or adding rate limiting to prevent log flooding. This is particularly important since bulk operations may be frequent during ingestion workflows.
| private static final ExecutorService VIRTUAL_THREAD_EXECUTOR = | ||
| Executors.newVirtualThreadPerTaskExecutor(); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The virtual thread executor is created as a static final field but is never explicitly shut down. While virtual thread executors don't strictly require shutdown for cleanup, it's good practice to shut down executors in test teardown to avoid resource leaks and ensure clean test isolation. Consider adding executor shutdown in the @AfterAll method or closing the executor after each test.
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
…o improve_slow_request_metric
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
🔍 CI failure analysis for f3fa73f: Two unrelated CI failures: (1) Flaky glossary test with status mismatch, (2) SonarCloud runner out of disk space. Both are infrastructure/environmental issues.IssueTwo unrelated CI failures: (1) Maven PostgreSQL test flakiness, (2) Maven SonarCloud infrastructure failure. Root CausePostgreSQL CI: Flaky test in SonarCloud CI: Infrastructure failure - GitHub Actions runner ran out of disk space. DetailsPostgreSQL CI FailureTest location: Status mismatch: - Expected: ...Approved,"#FF5733"...
+ Actual: ...In Review,"#FF5733"...Why unrelated: PR only modifies bulk operations, metrics tracking, and logging. No glossary, CSV import/export, or approval workflow code was changed. SonarCloud CI FailureError: The GitHub Actions runner exhausted disk space after ~3.5 hours during the Maven build step. This is a transient infrastructure issue unrelated to code changes. Job details:
Both failures are environmental/infrastructure issues not caused by PR #25275's code changes. Code Review ✅ Approved 8 resolved / 8 findingsWell-implemented bulk operation executor with proper thread safety and comprehensive metric instrumentation. All previous findings have been addressed. Resolved ✅ 8 resolvedBug: Race condition in double-checked locking singleton initialization
Bug: Unsafe reset of non-volatile isShutdown flag
Bug: Format specifier in LOG.info will cause IllegalFormatException
Edge Case: Auto-scaling may produce too few threads with small connection pools
Bug: Race condition in internalTimerStartNanos check-then-act
...and 3 more from earlier reviews What Works WellThe Tip Comment OptionsAuto-apply is off Gitar will not commit updates to this branch. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | This comment will update automatically (Docs) |
|
…tune (#25275) * Improve Slow request metric calculation; Add bulkSync config to fine-tune * Add clear metric instrumentation for bulk operations * Address gitar comments



Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
getContext,setContext,clearContext) inRequestLatencyContext.javafor virtual thread metric aggregationAtomicLongandAtomicIntegerensures accurate metrics across concurrent operationsBulkExecutor.javareplacesBulkOperationSemaphorewith clearer metric instrumentation for database and search operationsElasticSearchSearchManager,OpenSearchSearchManager, and aggregation managersOMSqlLoggerwith request context integrationBulkOperationConfigurationwith auto-scaling based on connection pool size (default: 20% for bulk ops, 80% for user traffic)conf/openmetadata.yamlBulkExecutorTest(364 lines) with concurrency and throttling testsOMSqlLoggerTestadded for SQL logging verificationThis will update automatically on new commits.