-
Notifications
You must be signed in to change notification settings - Fork 169
fix: instance grpc client once per process in benchmarks #1725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @chandra-siri, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the microbenchmarking suite to optimize resource usage by ensuring that gRPC and Google Cloud Storage clients are initialized only once per worker process. This change prevents redundant client creation and destruction, which can improve the accuracy and efficiency of performance measurements, especially in multiprocessing contexts. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the multi-process benchmarks to instantiate gRPC and JSON clients only once per worker process, which should improve benchmark performance and accuracy by removing client creation overhead from the measured execution time. The use of multiprocessing.Pool's initializer is a good approach for this.
My main feedback is regarding resource cleanup in the worker processes. The newly created clients and event loops are not being closed, which can lead to resource leaks. I've added a suggestion to use the atexit module to ensure graceful cleanup when worker processes terminate.
Overall, this is a good optimization for the benchmarks.
| def _worker_init(bucket_type): | ||
| """Initializes a persistent event loop and client for each worker process.""" | ||
| global worker_loop, worker_client, worker_json_client | ||
| if bucket_type == "zonal": | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| client = loop.run_until_complete(create_client()) | ||
| try: | ||
| # download_files_using_mrd_multi_coro returns max latency of coros | ||
| result = download_files_using_mrd_multi_coro( | ||
| loop, client, files_to_download, other_params, chunks | ||
| ) | ||
| finally: | ||
| tasks = asyncio.all_tasks(loop=loop) | ||
| for task in tasks: | ||
| task.cancel() | ||
| loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) | ||
| loop.close() | ||
| return result | ||
| worker_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(worker_loop) | ||
| worker_client = worker_loop.run_until_complete(create_client()) | ||
| else: # regional | ||
| from google.cloud import storage | ||
|
|
||
| json_client = storage.Client() | ||
| worker_json_client = storage.Client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resources created in this initializer (worker_loop, worker_client, worker_json_client) are not being cleaned up. This can lead to resource leaks, such as open network connections.
While terminating worker processes will cause the OS to reclaim these resources, it's better to perform a graceful shutdown. You can use the atexit module to register cleanup functions that will be called when the worker process exits.
I suggest updating _worker_init to register cleanup functions. Please also add import atexit at the top of the file.
def _worker_init(bucket_type):
"""Initializes a persistent event loop and client for each worker process."""
global worker_loop, worker_client, worker_json_client
import atexit
if bucket_type == "zonal":
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
def _cleanup_zonal():
# Ensure resources are cleaned up when the worker process exits.
if worker_client and worker_loop and not worker_loop.is_closed():
try:
worker_loop.run_until_complete(worker_client.close())
finally:
worker_loop.close()
atexit.register(_cleanup_zonal)
else: # regional
from google.cloud import storage
worker_json_client = storage.Client()
def _cleanup_regional():
# Ensure resources are cleaned up when the worker process exits.
if worker_json_client:
worker_json_client.close()
atexit.register(_cleanup_regional)
fix: instance grpc client once per process in benchmarks