Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,6 +74,8 @@ public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() {
}
Set<String> consumingSegments = tableSegments.getValue();
Set<String> caughtUpSegments = _caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new HashSet<>());
boolean skippedSegmentsLogged = false;

for (String segName : consumingSegments) {
if (caughtUpSegments.contains(segName)) {
continue;
Expand All @@ -95,7 +98,22 @@ public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() {
continue;
}
RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager;
if (isSegmentCaughtUp(segName, rtSegmentDataManager, (RealtimeTableDataManager) tableDataManager)) {
RealtimeTableDataManager realtimeTableDataManager = (RealtimeTableDataManager) tableDataManager;

StreamMetadataProvider streamMetadataProvider =
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);

if (!streamMetadataProvider.supportsOffsetLag()) {
// Cannot conclude if segment has caught up or not. Skip such segments.
if (!skippedSegmentsLogged) {
_logger.warn(
"Stream provider for table: {} does not support offset subtraction. Cannot conclude if the segment "
+ "has caught up. Skipping the segments.",
realtimeTableDataManager.getTableName());
skippedSegmentsLogged = true;
}
caughtUpSegments.add(segName);
} else if (isSegmentCaughtUp(segName, rtSegmentDataManager, realtimeTableDataManager)) {
caughtUpSegments.add(segName);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,13 @@ public void regularCaseWithOffsetCatchup() {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);
when(segMngrB0.getSegmentName()).thenReturn(segB0);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
Expand Down Expand Up @@ -174,7 +180,9 @@ public void testWithDroppedTableAndSegment() {
StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA0.getSegmentName()).thenReturn(segA0);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));

when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
Expand Down Expand Up @@ -247,7 +255,13 @@ public void regularCaseWithFreshnessCatchup() {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);
when(segMngrB0.getSegmentName()).thenReturn(segB0);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
Expand Down Expand Up @@ -325,7 +339,13 @@ public void regularCaseWithIdleTimeout() {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);
when(segMngrB0.getSegmentName()).thenReturn(segB0);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
when(segB0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
Expand Down Expand Up @@ -414,7 +434,13 @@ public void testSegmentsNeverHealthyWhenIdleTimeoutZeroAndNoOtherCriteriaMet() {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);
when(segMngrB0.getSegmentName()).thenReturn(segB0);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
when(segB0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
Expand Down Expand Up @@ -479,7 +505,13 @@ public void segmentBeingCommitted() {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);
when(segMngrB0.getSegmentName()).thenReturn(segB0);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
Expand Down Expand Up @@ -555,7 +587,13 @@ public void testCannotGetOffsetsOrFreshness() {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);
when(segMngrB0.getSegmentName()).thenReturn(segB0);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of());
Expand All @@ -580,6 +618,7 @@ public void testCannotGetOffsetsOrFreshness() {
// segB0 0 0 100 0
setupLatestIngestionTimestamp(segMngrA0, 89L);
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0);
}
Expand Down Expand Up @@ -620,6 +659,8 @@ public void testTimeoutExceptionWhenFetchingLatestStreamOffset() {
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);

when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
// segA0 provider throws RuntimeException - this should be caught and handled gracefully
// In practice, RealtimeSegmentMetadataUtils wraps TimeoutException in RuntimeException
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.testng.annotations.Test;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -64,6 +65,17 @@ public void regularCase() {
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);

StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);

when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500));
Expand Down Expand Up @@ -117,6 +129,13 @@ public void dataMangersBeingSetup() {
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);

StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);

// latest ingested offset latest stream offset
// segA0 10 15
// segA1 100 150
Expand All @@ -131,6 +150,10 @@ public void dataMangersBeingSetup() {
RealtimeSegmentDataManager segMngrB0 = mock(RealtimeSegmentDataManager.class);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);

StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);

// latest ingested offset latest stream offset
// segA0 20 15
// segA1 200 150
Expand Down Expand Up @@ -179,6 +202,16 @@ public void segmentsBeingCommitted() {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);

StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);

// latest ingested offset latest stream offset
// segA0 10 15
// segA1 100 150
Expand Down Expand Up @@ -234,6 +267,16 @@ public void cannotGetLatestStreamOffset() {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);

StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.supportsOffsetLag()).thenReturn(true);

// latest ingested offset latest stream offset
// segA0 10 15
// segA1 100 150
Expand Down
Loading