From 7634ac896e793f5c6b51d0a421796d36871caa8b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Jul 2023 11:53:58 +0530 Subject: [PATCH] Quick fix for SegmentLoadDropHandler bug (#14670) --- .../coordination/SegmentLoadDropHandler.java | 17 ++++-- .../SegmentLoadDropHandlerTest.java | 57 +++++++++++++------ 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 2ba5dc93304..636894d214f 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -773,7 +773,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler } // Future with cancel() implementation to remove it from "waitingFutures" list - private static class CustomSettableFuture extends AbstractFuture> + private class CustomSettableFuture extends AbstractFuture> { private final LinkedHashSet waitingFutures; private final Map> statusRefs; @@ -789,15 +789,20 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler public void resolve() { - synchronized (statusRefs) { + synchronized (requestStatusesLock) { if (isDone()) { return; } - List result = new ArrayList<>(statusRefs.size()); - statusRefs.forEach( - (request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get())) - ); + final List result = new ArrayList<>(statusRefs.size()); + statusRefs.forEach((request, statusRef) -> { + // Remove complete statuses from the cache + final Status status = statusRef.get(); + if (status != null && status.getState() != Status.STATE.PENDING) { + requestStatuses.invalidate(request); + } + result.add(new DataSegmentChangeRequestAndStatus(request, status)); + }); set(result); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index d6d4d2374df..c734d7b8f8f 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -562,8 +562,12 @@ public class SegmentLoadDropHandlerTest public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception { final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); - Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenReturn(true); + Mockito.doReturn(true).when(segmentManager).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any()); final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( jsonMapper, @@ -578,11 +582,11 @@ public class SegmentLoadDropHandlerTest segmentLoadDropHandler.start(); - DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); + final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - // load the segment + // Request 1: Load the segment ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); for (Runnable runnable : scheduledRunnable) { @@ -592,7 +596,7 @@ public class SegmentLoadDropHandlerTest Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); - // drop the segment + // Request 2: Drop the segment batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1)); future = segmentLoadDropHandler.processBatch(batch); for (Runnable runnable : scheduledRunnable) { @@ -603,23 +607,36 @@ public class SegmentLoadDropHandlerTest scheduledRunnable.clear(); // check invocations after a load-drop sequence - Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); + Mockito.verify(segmentManager, Mockito.times(1)).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Mockito.verify(segmentManager, Mockito.times(1)) + .dropSegment(ArgumentMatchers.any()); - // try to reload the segment - this should be a no-op since it might be the case that this is the first load client - // with this request, we'll forget about the success of the load request + // Request 3: Reload the segment batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); future = segmentLoadDropHandler.processBatch(batch); - Assert.assertEquals(scheduledRunnable.size(), 0); + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } result = future.get(); Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + scheduledRunnable.clear(); - // check invocations - should stay the same - Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); + // check invocations - 1 more load has happened + Mockito.verify(segmentManager, Mockito.times(2)).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Mockito.verify(segmentManager, Mockito.times(1)) + .dropSegment(ArgumentMatchers.any()); - // try to reload the segment - this time the loader will know that is a fresh request to load - // so, the segment manager will be asked to load + // Request 4: Try to reload the segment - segment is loaded again batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); future = segmentLoadDropHandler.processBatch(batch); for (Runnable runnable : scheduledRunnable) { @@ -630,8 +647,14 @@ public class SegmentLoadDropHandlerTest scheduledRunnable.clear(); // check invocations - the load segment counter should bump up - Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); + Mockito.verify(segmentManager, Mockito.times(3)).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Mockito.verify(segmentManager, Mockito.times(1)) + .dropSegment(ArgumentMatchers.any()); segmentLoadDropHandler.stop(); }