Quick fix for SegmentLoadDropHandler bug (#14670)

This commit is contained in:
Kashif Faraz 2023-07-27 11:53:58 +05:30 committed by GitHub
parent dd204e596d
commit 7634ac896e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 23 deletions

View File

@ -773,7 +773,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
} }
// Future with cancel() implementation to remove it from "waitingFutures" list // Future with cancel() implementation to remove it from "waitingFutures" list
private static class CustomSettableFuture extends AbstractFuture<List<DataSegmentChangeRequestAndStatus>> private class CustomSettableFuture extends AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
{ {
private final LinkedHashSet<CustomSettableFuture> waitingFutures; private final LinkedHashSet<CustomSettableFuture> waitingFutures;
private final Map<DataSegmentChangeRequest, AtomicReference<Status>> statusRefs; private final Map<DataSegmentChangeRequest, AtomicReference<Status>> statusRefs;
@ -789,15 +789,20 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
public void resolve() public void resolve()
{ {
synchronized (statusRefs) { synchronized (requestStatusesLock) {
if (isDone()) { if (isDone()) {
return; return;
} }
List<DataSegmentChangeRequestAndStatus> result = new ArrayList<>(statusRefs.size()); final List<DataSegmentChangeRequestAndStatus> result = new ArrayList<>(statusRefs.size());
statusRefs.forEach( statusRefs.forEach((request, statusRef) -> {
(request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get())) // Remove complete statuses from the cache
); final Status status = statusRef.get();
if (status != null && status.getState() != Status.STATE.PENDING) {
requestStatuses.invalidate(request);
}
result.add(new DataSegmentChangeRequestAndStatus(request, status));
});
set(result); set(result);
} }

View File

@ -562,8 +562,12 @@ public class SegmentLoadDropHandlerTest
public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception
{ {
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any())) Mockito.doReturn(true).when(segmentManager).loadSegment(
.thenReturn(true); ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any()); Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
jsonMapper, jsonMapper,
@ -578,11 +582,11 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.start(); segmentLoadDropHandler.start();
DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
// load the segment // Request 1: Load the segment
ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
.processBatch(batch); .processBatch(batch);
for (Runnable runnable : scheduledRunnable) { for (Runnable runnable : scheduledRunnable) {
@ -592,7 +596,7 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear(); scheduledRunnable.clear();
// drop the segment // Request 2: Drop the segment
batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1)); batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
future = segmentLoadDropHandler.processBatch(batch); future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) { for (Runnable runnable : scheduledRunnable) {
@ -603,23 +607,36 @@ public class SegmentLoadDropHandlerTest
scheduledRunnable.clear(); scheduledRunnable.clear();
// check invocations after a load-drop sequence // check invocations after a load-drop sequence
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());
// try to reload the segment - this should be a no-op since it might be the case that this is the first load client // Request 3: Reload the segment
// with this request, we'll forget about the success of the load request
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
future = segmentLoadDropHandler.processBatch(batch); future = segmentLoadDropHandler.processBatch(batch);
Assert.assertEquals(scheduledRunnable.size(), 0); for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
result = future.get(); result = future.get();
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();
// check invocations - should stay the same // check invocations - 1 more load has happened
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());
// try to reload the segment - this time the loader will know that is a fresh request to load // Request 4: Try to reload the segment - segment is loaded again
// so, the segment manager will be asked to load
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
future = segmentLoadDropHandler.processBatch(batch); future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) { for (Runnable runnable : scheduledRunnable) {
@ -630,8 +647,14 @@ public class SegmentLoadDropHandlerTest
scheduledRunnable.clear(); scheduledRunnable.clear();
// check invocations - the load segment counter should bump up // check invocations - the load segment counter should bump up
Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); Mockito.verify(segmentManager, Mockito.times(3)).loadSegment(
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());
segmentLoadDropHandler.stop(); segmentLoadDropHandler.stop();
} }