mirror of https://github.com/apache/druid.git
Fix load-drop-load sequence for same segment and historical in http loadqueue peon (#11717)
Fixes an issue where a load-drop-load sequence for a segment and historical doesn't work correctly for http based load queue peon. The first cycle of load-drop works fine - the problem comes when there is an attempt to reload the segment. The historical caches load success for some recent segments and makes the reload as a no-op. But it doesn't consider that fact that the segment was also dropped in between the load requests. This change invalidates the cache after a client tries to fetch a success result.
This commit is contained in:
parent
fe8530dac4
commit
c4fa3ccfc4
|
@ -555,6 +555,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
},
|
||||
this::resolveWaitingFutures
|
||||
);
|
||||
} else if (status.get().getState() == Status.STATE.SUCCESS) {
|
||||
// SUCCESS case, we'll clear up the cached success while serving it to this client
|
||||
// Not doing this can lead to an incorrect response to upcoming clients for a reload
|
||||
requestStatuses.invalidate(changeRequest);
|
||||
return status;
|
||||
}
|
||||
return requestStatuses.getIfPresent(changeRequest);
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ public class SegmentLoadDropHandlerTest
|
|||
private SegmentManager segmentManager;
|
||||
private List<Runnable> scheduledRunnable;
|
||||
private SegmentLoaderConfig segmentLoaderConfig;
|
||||
private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig;
|
||||
private SegmentLoaderConfig segmentLoaderConfigNoLocations;
|
||||
private ScheduledExecutorFactory scheduledExecutorFactory;
|
||||
private List<StorageLocationConfig> locations;
|
||||
|
@ -194,6 +195,39 @@ public class SegmentLoadDropHandlerTest
|
|||
}
|
||||
};
|
||||
|
||||
noAnnouncerSegmentLoaderConfig = new SegmentLoaderConfig()
|
||||
{
|
||||
@Override
|
||||
public File getInfoDir()
|
||||
{
|
||||
return testStorageLocation.getInfoDir();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumLoadingThreads()
|
||||
{
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getAnnounceIntervalMillis()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StorageLocationConfig> getLocations()
|
||||
{
|
||||
return locations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDropSegmentDelayMillis()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -475,15 +509,8 @@ public class SegmentLoadDropHandlerTest
|
|||
runnable.run();
|
||||
}
|
||||
|
||||
result = segmentLoadDropHandler.processBatch(batch).get();
|
||||
result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get();
|
||||
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus());
|
||||
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus());
|
||||
|
||||
|
||||
for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : segmentLoadDropHandler.processBatch(batch)
|
||||
.get()) {
|
||||
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, e.getStatus());
|
||||
}
|
||||
|
||||
segmentLoadDropHandler.stop();
|
||||
}
|
||||
|
@ -530,4 +557,82 @@ public class SegmentLoadDropHandlerTest
|
|||
|
||||
segmentLoadDropHandler.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000L)
|
||||
public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception
|
||||
{
|
||||
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
|
||||
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
|
||||
.thenReturn(true);
|
||||
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
|
||||
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
|
||||
jsonMapper,
|
||||
noAnnouncerSegmentLoaderConfig,
|
||||
announcer,
|
||||
Mockito.mock(DataSegmentServerAnnouncer.class),
|
||||
segmentManager,
|
||||
segmentCacheManager,
|
||||
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
|
||||
new ServerTypeConfig(ServerType.HISTORICAL)
|
||||
);
|
||||
|
||||
segmentLoadDropHandler.start();
|
||||
|
||||
DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
|
||||
|
||||
List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
|
||||
|
||||
// load the segment
|
||||
ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
|
||||
.processBatch(batch);
|
||||
for (Runnable runnable : scheduledRunnable) {
|
||||
runnable.run();
|
||||
}
|
||||
List<DataSegmentChangeRequestAndStatus> result = future.get();
|
||||
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
|
||||
scheduledRunnable.clear();
|
||||
|
||||
// drop the segment
|
||||
batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
|
||||
future = segmentLoadDropHandler.processBatch(batch);
|
||||
for (Runnable runnable : scheduledRunnable) {
|
||||
runnable.run();
|
||||
}
|
||||
result = future.get();
|
||||
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
|
||||
scheduledRunnable.clear();
|
||||
|
||||
// check invocations after a load-drop sequence
|
||||
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
|
||||
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
||||
|
||||
// try to reload the segment - this should be a no-op since it might be the case that this is the first load client
|
||||
// with this request, we'll forget about the success of the load request
|
||||
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
|
||||
future = segmentLoadDropHandler.processBatch(batch);
|
||||
Assert.assertEquals(scheduledRunnable.size(), 0);
|
||||
result = future.get();
|
||||
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
|
||||
|
||||
// check invocations - should stay the same
|
||||
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
|
||||
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
||||
|
||||
// try to reload the segment - this time the loader will know that is a fresh request to load
|
||||
// so, the segment manager will be asked to load
|
||||
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
|
||||
future = segmentLoadDropHandler.processBatch(batch);
|
||||
for (Runnable runnable : scheduledRunnable) {
|
||||
runnable.run();
|
||||
}
|
||||
result = future.get();
|
||||
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
|
||||
scheduledRunnable.clear();
|
||||
|
||||
// check invocations - the load segment counter should bump up
|
||||
Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
|
||||
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
|
||||
|
||||
segmentLoadDropHandler.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue