-
Notifications
You must be signed in to change notification settings - Fork 273
Prevent potential GPU OOM in R2C with split retry #14073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Prevent potential GPU OOM in R2C with split retry #14073
Conversation
Signed-off-by: Haoyang Li <[email protected]>
|
@greptile full review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds split retry functionality to Row-to-Columnar (R2C) conversion to prevent GPU out-of-memory (OOM) errors by implementing a mechanism to split batches in half when GPU OOM occurs during host-to-GPU data transfer.
Key Changes
- Introduced a new
HostColumnarBatchWithRowRangeclass that wraps host columns with row range tracking and supports splitting - Modified
RowToColumnarIteratorto use split retry logic for GPU OOM scenarios, allowing single input batches to produce multiple output batches - Added
buildHostColumnsWithoutOwnership()method to transfer ownership of host columns to the retry framework
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala |
New class that wraps host columns with row range support, implements splitting logic for GPU OOM retry, and handles slicing of various column types (LIST, STRUCT, STRING, fixed-width) |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala |
Updated RowToColumnarIterator to build host columns separately, use split retry for TargetSize goals, and maintain a pending batch iterator for split outputs |
sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java |
Added buildHostColumnsWithoutOwnership() method to transfer host column ownership to caller |
tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala |
Added comprehensive test coverage for GPU OOM split retry scenarios including single batch requirement enforcement, multiple batch production, multiple consecutive splits, and single row edge case |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Greptile OverviewGreptile SummaryThis PR adds GPU OOM protection during Row-to-Columnar host-to-GPU transfer by introducing split-and-retry capability. The implementation creates Key Implementation Details: The solution separates the row-to-columnar conversion into two phases:
Memory Management: Reference counting is correctly implemented throughout the lifecycle:
Slicing Logic: The
Edge Case Handling: The implementation properly handles:
Iterator Pattern: The Test Coverage: Comprehensive tests cover simple retry, split-and-retry, multiple consecutive splits, unsplittable edge cases, and complex nested data structures with nulls. Confidence Score: 4/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant Caller
participant RowToColumnarIterator
participant GpuColumnarBatchBuilder
participant HostColumnarBatchWithRowRange
participant RetryFramework
participant GPU
Caller->>RowToColumnarIterator: next()
RowToColumnarIterator->>RowToColumnarIterator: buildBatch()
RowToColumnarIterator->>GpuColumnarBatchBuilder: build rows on host
GpuColumnarBatchBuilder-->>RowToColumnarIterator: hostColumns (refCount=1)
RowToColumnarIterator->>HostColumnarBatchWithRowRange: create(hostColumns) [incRef to 2]
Note over RowToColumnarIterator: withResource closes hostColumns [decRef to 1]
RowToColumnarIterator->>RetryFramework: withRetry(hostBatch, splitPolicy)
RetryFramework->>HostColumnarBatchWithRowRange: copyToGpu()
HostColumnarBatchWithRowRange->>GPU: copy full batch
GPU-->>HostColumnarBatchWithRowRange: GPU OOM!
HostColumnarBatchWithRowRange-->>RetryFramework: throw GpuSplitAndRetryOOM
RetryFramework->>RetryFramework: call splitPolicy(hostBatch)
RetryFramework->>HostColumnarBatchWithRowRange: splitInHalf(hostBatch)
Note over HostColumnarBatchWithRowRange: Create 2 splits [incRef to 3]<br/>Close original [decRef to 2]
HostColumnarBatchWithRowRange-->>RetryFramework: [firstHalf, secondHalf]
RetryFramework->>HostColumnarBatchWithRowRange: copyToGpu() on firstHalf
HostColumnarBatchWithRowRange->>HostColumnarBatchWithRowRange: sliceHostColumn()
Note over HostColumnarBatchWithRowRange: Create sliced view<br/>(shared data, copied offsets/validity)
HostColumnarBatchWithRowRange->>GPU: copy sliced columns
GPU-->>HostColumnarBatchWithRowRange: ColumnarBatch (success)
HostColumnarBatchWithRowRange-->>RetryFramework: ColumnarBatch
Note over RetryFramework: Close firstHalf [decRef to 1]
RetryFramework-->>RowToColumnarIterator: Iterator[ColumnarBatch]
RowToColumnarIterator->>RetryFramework: it.next() (get first)
RetryFramework-->>RowToColumnarIterator: firstBatch
RowToColumnarIterator->>RowToColumnarIterator: pendingBatchIter = remaining
RowToColumnarIterator-->>Caller: firstBatch
Caller->>RowToColumnarIterator: next() (second call)
RowToColumnarIterator->>RetryFramework: pendingBatchIter.next()
RetryFramework->>HostColumnarBatchWithRowRange: copyToGpu() on secondHalf
HostColumnarBatchWithRowRange->>GPU: copy sliced columns
GPU-->>HostColumnarBatchWithRowRange: ColumnarBatch (success)
Note over RetryFramework: Close secondHalf [decRef to 0, free memory]
RetryFramework-->>RowToColumnarIterator: secondBatch
RowToColumnarIterator-->>Caller: secondBatch
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 176-184 (link)logic: Workaround condition may not cover all cases. If
dataLen == 0butnullCount > 0(empty strings with some null values), the code goes to the else branch and creates a 0-length slice, which could trigger the samecopyToDeviceissue mentioned in the comment.
4 files reviewed, 1 comment
Signed-off-by: Haoyang Li <[email protected]>
|
build |
Signed-off-by: Haoyang Li <[email protected]>
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 178-188 (link)style: Check that this empty string workaround handles all edge cases correctly. The 1-byte allocation when
dataLen == 0 && nullCount == 0works aroundcopyToDevicebehavior, but verify it handles: (1) all strings in slice are empty (not null), (2) mixed empty strings at different positions. This edge case could cause issues with certain data distributions.
4 files reviewed, 1 comment
Signed-off-by: Haoyang Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 180 (link)syntax: typo: "Existsing" should be "Existing"
4 files reviewed, 1 comment
Signed-off-by: Haoyang Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 178-186 (link)style: Consider if allocating a 1-byte dummy buffer for empty strings when
dataLen == 0butnumRows > nullCountis necessary, or if cudf would acceptnullhere to save allocations.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
4 files reviewed, 1 comment
sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
|
@greptileai can you review it again ? |
Signed-off-by: Haoyang Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 180-188 (link)style: Edge case handling for empty strings is correct: when
dataLen == 0but there are non-null rows (numRows > nullCount), allocates 1-byte buffer to satisfy cuDF requirements. However, consider validating that this 1-byte allocation doesn't cause issues if we're in a tight memory situation (though this is unlikely given the small size).Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
4 files reviewed, 1 comment
tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <[email protected]>
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 271-282 (link)logic: Documentation claims split ownership transfer but implementation doesn't match - both splits increment reference count, so memory isn't transferred to last split as stated. Should the memory ownership transfer logic be implemented, or should the documentation be corrected to reflect the current reference counting approach?
3 files reviewed, 1 comment
Signed-off-by: Haoyang Li <[email protected]>
|
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <[email protected]>
|
Overall looks good to me. But nice to have more eyes on this. |
| * @param numRows The number of rows in this range | ||
| * @param dataTypes The Spark data types for each column | ||
| */ | ||
| class HostColumnarBatchWithRowRange private ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add the slice functionality to cuDF instead of spark-rapids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it looks more natural to put it in cuDF Java. I would prefer to do it as a follow-up PR, but happy to do it right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@revans2 any preference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with Bobby that it would be nice to have this long term in cuDF, but it's ok to start here.
| numInputRows += rowCount | ||
| ret | ||
| } else { | ||
| val it = RmmRapidsRetryIterator.withRetry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, import RmmRapidsRetryIterator.withRetry, like it was done for withRetryNoSplit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@thirtiseven I should have time to look at this tomorrow, if it can wait until then. |
| totalOutputBytes += GpuColumnVector.getTotalDeviceMemoryUsed(batch) | ||
| totalOutputRows += batch.numRows() | ||
| if (totalOutputRows > 0 && totalOutputBytes > 0) { | ||
| targetRows = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call this "nextBatchTargetRows"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| // Return the first batch now and keep the iterator for subsequent output batches. | ||
| // This ensures we only transfer one split at a time (avoid multiple device allocations). | ||
| closeOnExcept(it.next()) { first => | ||
| pendingBatchIter = it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check if it.hasNext before we set pendingBatchIter? e.g. set to None otherwise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes good catch
| * | ||
| * Memory management uses reference counting: each instance increments the reference count | ||
| * of the host columns on construction and decrements it on close. The host columns are | ||
| * freed when the last reference is closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add that validity and offset buffers are copied, not logically sliced, in this description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Signed-off-by: Haoyang Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No files reviewed, no comments
|
build |
|
Hi @abellina could you take another look? |
|
NOTE: release/26.02 has been created from main. Please retarget your PR to release/26.02 if it should be included in the release. |
|
build |
| if (totalOutputRows > 0 && totalOutputBytes > 0) { | ||
| targetRows = | ||
| GpuBatchUtils.estimateRowCount(targetSizeBytes, totalOutputBytes, totalOutputRows) | ||
| val dataTypes = localSchema.fields.map(_.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be better if this was a class val.
| HostColumnarBatchWithRowRange(hostColumns, rowCount, dataTypes) | ||
| } | ||
|
|
||
| if (localGoal.isInstanceOf[RequireSingleBatchLike]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
localGoal match {
case RequireSingleBatchLike =>
...
case => // other goals
...
}
| // Return the first batch now and keep the iterator for subsequent output batches. | ||
| // This ensures we only transfer one split at a time (avoid multiple device allocations). | ||
| closeOnExcept(it.next()) { first => | ||
| pendingBatchIter = if (it.hasNext) it else Iterator.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, could just set pendingBatchIter = it, without checking, since we should be gating all the other logic in that it.hasNext returns false
Fixes #14018
Description
#13842 added retry for R2C
convertto prevent Host OOM. Following on from that, this pr aimed to add split and retry for GPU OOM when copying the converted results to GPU.This PR:
HostColumnarBatchWithRowRange, a wrapper for host columns that supports logical slicing without copying underlying host memory. This allows splitting a large host batch into smaller chunks for transfer.GpuRowToColumnarExecto use split and retry when a GPU OOM occurs during transfer.Note that when we split a batch into two halves, we can't free them until both halves are processed. So, the first half just borrows the data, but we pass the ownership of the host columns to the second half. This ensures the host memory stays alive exactly as long as needed and is freed when the last split is closed.
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)