Skip to content

Conversation

@thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented Dec 31, 2025

Fixes #14018

Description

#13842 added retry for R2C convert to prevent Host OOM. Following on from that, this pr aimed to add split and retry for GPU OOM when copying the converted results to GPU.

This PR:

  • Introduced HostColumnarBatchWithRowRange, a wrapper for host columns that supports logical slicing without copying underlying host memory. This allows splitting a large host batch into smaller chunks for transfer.
  • Updated GpuRowToColumnarExec to use split and retry when a GPU OOM occurs during transfer.

Note that when we split a batch into two halves, we can't free them until both halves are processed. So, the first half just borrows the data, but we pass the ownership of the host columns to the second half. This ensures the host memory stays alive exactly as long as needed and is freed when the last split is closed.

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

@greptile full review

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds split retry functionality to Row-to-Columnar (R2C) conversion to prevent GPU out-of-memory (OOM) errors by implementing a mechanism to split batches in half when GPU OOM occurs during host-to-GPU data transfer.

Key Changes

  • Introduced a new HostColumnarBatchWithRowRange class that wraps host columns with row range tracking and supports splitting
  • Modified RowToColumnarIterator to use split retry logic for GPU OOM scenarios, allowing single input batches to produce multiple output batches
  • Added buildHostColumnsWithoutOwnership() method to transfer ownership of host columns to the retry framework

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala New class that wraps host columns with row range support, implements splitting logic for GPU OOM retry, and handles slicing of various column types (LIST, STRUCT, STRING, fixed-width)
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala Updated RowToColumnarIterator to build host columns separately, use split retry for TargetSize goals, and maintain a pending batch iterator for split outputs
sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java Added buildHostColumnsWithoutOwnership() method to transfer host column ownership to caller
tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala Added comprehensive test coverage for GPU OOM split retry scenarios including single batch requirement enforcement, multiple batch production, multiple consecutive splits, and single row edge case

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 31, 2025

Greptile Overview

Greptile Summary

This PR adds GPU OOM protection during Row-to-Columnar host-to-GPU transfer by introducing split-and-retry capability. The implementation creates HostColumnarBatchWithRowRange, a zero-copy wrapper that enables logical slicing of host batches through reference counting, avoiding expensive memory copies during splits.

Key Implementation Details:

The solution separates the row-to-columnar conversion into two phases:

  1. Host conversion (lines 688-694 in GpuRowToColumnarExec.scala): Builds host columns with retry-only support (no split)
  2. GPU transfer (lines 708-724): Copies to GPU with full split-and-retry support via the new wrapper

Memory Management: Reference counting is correctly implemented throughout the lifecycle:

  • Initial creation increments refCount (line 60 in HostColumnarBatchWithRowRange.scala)
  • Split creates two new instances sharing underlying columns (lines 310-313), properly managing refCounts
  • Each batch decrements refCount on close (line 278), freeing memory when the last reference is released

Slicing Logic: The sliceHostColumn method (lines 112-233) handles all data types with proper offset normalization:

  • Fixed-width types: Direct memory slicing using pointer arithmetic
  • STRING/LIST: Offsets normalized to start at 0, with copied offset arrays
  • STRUCT: Recursive slicing of child columns
  • Validity: Reconstructed for the row range, optimized away if no nulls

Edge Case Handling: The implementation properly handles:

  • Empty strings (requires 1-byte dummy buffer, lines 186-194)
  • Zero columns/rows (early return, lines 67-69)
  • Single-row batches that cannot split (throws GpuSplitAndRetryOOM, lines 301-303)
  • RequireSingleBatch constraint (uses withRetryNoSplit, lines 696-706)

Iterator Pattern: The pendingBatchIter mechanism (lines 607-619) elegantly handles multiple output batches from a single input when splits occur, ensuring only one GPU allocation happens at a time.

Test Coverage: Comprehensive tests cover simple retry, split-and-retry, multiple consecutive splits, unsplittable edge cases, and complex nested data structures with nulls.

Confidence Score: 4/5

  • This PR is safe to merge with minimal risk, though thorough testing in production workloads is recommended due to the complexity of memory management
  • The implementation demonstrates solid software engineering with correct reference counting, proper resource cleanup, comprehensive edge case handling, and extensive test coverage. The split-and-retry logic integrates cleanly with the existing retry framework. Score is 4 (not 5) due to the inherent complexity of reference-counted memory management across split operations and the limited production testing of this specific code path (GPU OOM during transfer is relatively rare). The code quality is high with clear documentation and well-thought-out design.
  • No files require special attention - all three files show appropriate implementation quality and testing

Important Files Changed

File Analysis

Filename Score Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala 4/5 New file implementing zero-copy host column slicing with reference counting. Memory management is correctly implemented with proper incRefCount/close pairing. Edge cases for empty strings, null columns, and nested types are properly handled. Offset normalization for LIST/STRING types is mathematically correct.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala 4/5 Modified to use HostColumnarBatchWithRowRange for split-and-retry during GPU transfer. Introduced pendingBatchIter to handle multiple output batches from splits. Reference counting and iterator lifecycle management appear correct. RequireSingleBatch constraint properly throws GpuSplitAndRetryOOM when split is needed.
tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala 5/5 Comprehensive test coverage including simple retry, split-and-retry, multiple splits, edge cases (single row, empty schema), and complex nested types. Tests verify data integrity across splits and proper exception handling for unsplittable cases.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant RowToColumnarIterator
    participant GpuColumnarBatchBuilder
    participant HostColumnarBatchWithRowRange
    participant RetryFramework
    participant GPU
    
    Caller->>RowToColumnarIterator: next()
    RowToColumnarIterator->>RowToColumnarIterator: buildBatch()
    RowToColumnarIterator->>GpuColumnarBatchBuilder: build rows on host
    GpuColumnarBatchBuilder-->>RowToColumnarIterator: hostColumns (refCount=1)
    RowToColumnarIterator->>HostColumnarBatchWithRowRange: create(hostColumns) [incRef to 2]
    Note over RowToColumnarIterator: withResource closes hostColumns [decRef to 1]
    RowToColumnarIterator->>RetryFramework: withRetry(hostBatch, splitPolicy)
    RetryFramework->>HostColumnarBatchWithRowRange: copyToGpu()
    HostColumnarBatchWithRowRange->>GPU: copy full batch
    GPU-->>HostColumnarBatchWithRowRange: GPU OOM!
    HostColumnarBatchWithRowRange-->>RetryFramework: throw GpuSplitAndRetryOOM
    RetryFramework->>RetryFramework: call splitPolicy(hostBatch)
    RetryFramework->>HostColumnarBatchWithRowRange: splitInHalf(hostBatch)
    Note over HostColumnarBatchWithRowRange: Create 2 splits [incRef to 3]<br/>Close original [decRef to 2]
    HostColumnarBatchWithRowRange-->>RetryFramework: [firstHalf, secondHalf]
    RetryFramework->>HostColumnarBatchWithRowRange: copyToGpu() on firstHalf
    HostColumnarBatchWithRowRange->>HostColumnarBatchWithRowRange: sliceHostColumn()
    Note over HostColumnarBatchWithRowRange: Create sliced view<br/>(shared data, copied offsets/validity)
    HostColumnarBatchWithRowRange->>GPU: copy sliced columns
    GPU-->>HostColumnarBatchWithRowRange: ColumnarBatch (success)
    HostColumnarBatchWithRowRange-->>RetryFramework: ColumnarBatch
    Note over RetryFramework: Close firstHalf [decRef to 1]
    RetryFramework-->>RowToColumnarIterator: Iterator[ColumnarBatch]
    RowToColumnarIterator->>RetryFramework: it.next() (get first)
    RetryFramework-->>RowToColumnarIterator: firstBatch
    RowToColumnarIterator->>RowToColumnarIterator: pendingBatchIter = remaining
    RowToColumnarIterator-->>Caller: firstBatch
    Caller->>RowToColumnarIterator: next() (second call)
    RowToColumnarIterator->>RetryFramework: pendingBatchIter.next()
    RetryFramework->>HostColumnarBatchWithRowRange: copyToGpu() on secondHalf
    HostColumnarBatchWithRowRange->>GPU: copy sliced columns
    GPU-->>HostColumnarBatchWithRowRange: ColumnarBatch (success)
    Note over RetryFramework: Close secondHalf [decRef to 0, free memory]
    RetryFramework-->>RowToColumnarIterator: secondBatch
    RowToColumnarIterator-->>Caller: secondBatch
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 176-184 (link)

    logic: Workaround condition may not cover all cases. If dataLen == 0 but nullCount > 0 (empty strings with some null values), the code goes to the else branch and creates a 0-length slice, which could trigger the same copyToDevice issue mentioned in the comment.

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

build

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

build

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sameerz sameerz added the bug Something isn't working label Jan 4, 2026
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven self-assigned this Jan 5, 2026
@thirtiseven thirtiseven marked this pull request as ready for review January 5, 2026 07:28
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 178-188 (link)

    style: Check that this empty string workaround handles all edge cases correctly. The 1-byte allocation when dataLen == 0 && nullCount == 0 works around copyToDevice behavior, but verify it handles: (1) all strings in slice are empty (not null), (2) mixed empty strings at different positions. This edge case could cause issues with certain data distributions.

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@wjxiz1992 wjxiz1992 changed the title Prevent potiential GPU OOM in R2C with split retry Prevent potential GPU OOM in R2C with split retry Jan 5, 2026
Signed-off-by: Haoyang Li <[email protected]>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 180 (link)

    syntax: typo: "Existsing" should be "Existing"

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Signed-off-by: Haoyang Li <[email protected]>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 178-186 (link)

    style: Consider if allocating a 1-byte dummy buffer for empty strings when dataLen == 0 but numRows > nullCount is necessary, or if cudf would accept null here to save allocations.

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@firestarman
Copy link
Collaborator

@greptileai can you review it again ?

Signed-off-by: Haoyang Li <[email protected]>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 180-188 (link)

    style: Edge case handling for empty strings is correct: when dataLen == 0 but there are non-null rows (numRows > nullCount), allocates 1-byte buffer to satisfy cuDF requirements. However, consider validating that this 1-byte allocation doesn't cause issues if we're in a tight memory situation (though this is unlikely given the small size).

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

build

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarBatchWithRowRange.scala, line 271-282 (link)

    logic: Documentation claims split ownership transfer but implementation doesn't match - both splits increment reference count, so memory isn't transferred to last split as stated. Should the memory ownership transfer logic be implemented, or should the documentation be corrected to reflect the current reference counting approach?

3 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

build

Signed-off-by: Haoyang Li <[email protected]>
@firestarman
Copy link
Collaborator

Overall looks good to me. But nice to have more eyes on this.

* @param numRows The number of rows in this range
* @param dataTypes The Spark data types for each column
*/
class HostColumnarBatchWithRowRange private (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the slice functionality to cuDF instead of spark-rapids?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it looks more natural to put it in cuDF Java. I would prefer to do it as a follow-up PR, but happy to do it right now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 any preference?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Bobby that it would be nice to have this long term in cuDF, but it's ok to start here.

numInputRows += rowCount
ret
} else {
val it = RmmRapidsRetryIterator.withRetry(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, import RmmRapidsRetryIterator.withRetry, like it was done for withRetryNoSplit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@abellina
Copy link
Collaborator

abellina commented Jan 8, 2026

@thirtiseven I should have time to look at this tomorrow, if it can wait until then.

totalOutputBytes += GpuColumnVector.getTotalDeviceMemoryUsed(batch)
totalOutputRows += batch.numRows()
if (totalOutputRows > 0 && totalOutputBytes > 0) {
targetRows =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call this "nextBatchTargetRows"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Return the first batch now and keep the iterator for subsequent output batches.
// This ensures we only transfer one split at a time (avoid multiple device allocations).
closeOnExcept(it.next()) { first =>
pendingBatchIter = it
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if it.hasNext before we set pendingBatchIter? e.g. set to None otherwise?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes good catch

*
* Memory management uses reference counting: each instance increments the reference count
* of the host columns on construction and decrements it on close. The host columns are
* freed when the last reference is closed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add that validity and offset buffers are copied, not logically sliced, in this description.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Signed-off-by: Haoyang Li <[email protected]>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@thirtiseven
Copy link
Collaborator Author

build

@thirtiseven
Copy link
Collaborator Author

Hi @abellina could you take another look?

@thirtiseven thirtiseven requested a review from abellina January 23, 2026 01:13
@nvauto
Copy link
Collaborator

nvauto commented Jan 26, 2026

NOTE: release/26.02 has been created from main. Please retarget your PR to release/26.02 if it should be included in the release.

@thirtiseven thirtiseven changed the base branch from main to release/26.02 January 26, 2026 06:52
@thirtiseven
Copy link
Collaborator Author

build

if (totalOutputRows > 0 && totalOutputBytes > 0) {
targetRows =
GpuBatchUtils.estimateRowCount(targetSizeBytes, totalOutputBytes, totalOutputRows)
val dataTypes = localSchema.fields.map(_.dataType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better if this was a class val.

HostColumnarBatchWithRowRange(hostColumns, rowCount, dataTypes)
}

if (localGoal.isInstanceOf[RequireSingleBatchLike]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

localGoal match { 
case RequireSingleBatchLike => 
  ...
case => // other goals
  ...
}

// Return the first batch now and keep the iterator for subsequent output batches.
// This ensures we only transfer one split at a time (avoid multiple device allocations).
closeOnExcept(it.next()) { first =>
pendingBatchIter = if (it.hasNext) it else Iterator.empty
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, could just set pendingBatchIter = it, without checking, since we should be gating all the other logic in that it.hasNext returns false

@thirtiseven thirtiseven changed the base branch from release/26.02 to main February 3, 2026 02:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Follow up] Fix potiential GPU OOM in R2C

5 participants