-
Notifications
You must be signed in to change notification settings - Fork 25
Worker network stream optimizations #283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker network stream optimizations #283
Conversation
b7727f5 to
57c5182
Compare
|
|
||
| /// Consumes all the provided streams in parallel sending their produced messages to a single | ||
| /// queue in random order. The resulting queue is returned as a stream. | ||
| pub(crate) fn spawn_select_all<T, El, Err>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as before, just moved closer to where its used now
57c5182 to
6af968b
Compare
0b29cb3 to
00dca96
Compare
6af968b to
bdef4cb
Compare
00dca96 to
66fa3a7
Compare
…o gabrielmusat/worker-network-stream-optimizations # Conflicts: # src/execution_plans/network_coalesce.rs # src/execution_plans/network_shuffle.rs # tests/tpch_explain_analyze.rs # tests/tpch_plans_test.rs
…ions # Conflicts: # tests/tpch_explain_analyze.rs # tests/tpch_plans_test.rs
jayshrivastava
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice PR!
Interestingly we had to do the exact same optimization in cockroachdb: cockroachdb/cockroach#75581
I really like all the simplification you've been doing lately. The network_*.rs files are much easier to read now. Plus no more MetricsCollectingStream 👍🏽
Previously, when two workers needed communication, one gRPC stream per partition was used for moving per-partition data from one worker to the other, something like:
At first sight, one could think that it's problematic to open so many streams, but fortunately the way gRPC and Tonic work is that as long as we reuse the same
Channel(which we do), Tonic will reuse the same underlaying TCP connection, interleaving HTTP2 frames in a very efficient manner.There do is a problem though... because of how the project works, we do have some extra overhead in opening multiple gRPC streams that Tonic is not going to save us from:
For each gRPC stream, we need to send the serialized plan over the wire. For example, for a UNION with hundreds of children, where each children has tens of partitions, we are sending a serialized plan with hundreds of children over the wire hundreds * tens times (one per partition).
This PR reworks the communication between workers by introducing some new
WorkerConnectionPoolandWorkerConnectionstructs that automatically handle gRPC stream batching, which results in:The results of executing the following query over TPCH SF1 data on the remote cluster are a ~1.5x speedup:
UNION of multiple children
The result of running any other benchmark is pretty much the same as before, as the size of the data in the benchmarks completely dominates the size of the plans themselves