-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
In many operators, when the input is done, the output is emitted all at once, this leads to huge batches, which can be inefficient in some cases, and OOM in other cases.
For example:
Using a fair pool implementation where each operator gets 1GB of memory, and an aggregate's total size is 0.99GB.
The aggregate will Emit::All, and output the 0.99GB batch, and a following sort will immediately try and allocate memory for the sorting(let's say 2x the batch size).
The memory pool has no idea that the agg is done at this point, as the reservation still exists, so it will try and maintain 1GB per operator.
The sort now attempts to get about 1.98GB, which is almost twice what it is allocated.
Describe the solution you'd like
If the aggregate(and any other operator, for that matter) respected batch_size, it would output smaller batches, which are easily handled by ExternalSorter, which will spill when it needs to, and at some point when poll returns Read(None) we can drop the input stream, and the reservation will drop and the sort will get even more memory.
That will make every application much more resilient under every memory constraints.
Describe alternatives you've considered
No response
Additional context
No response