From 3984457e5bea1f73d4340edb010a4debab054596 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 11 Jan 2021 16:20:13 -0800 Subject: [PATCH] Add missing unit tests for segment loading in historicals (#10737) * Add missing unit tests for segment loading in historicals * unused import --- .../SegmentLoaderLocalCacheManager.java | 9 ++- .../SegmentLoaderLocalCacheManagerTest.java | 59 ++++++++++++------ .../SegmentLoadDropHandlerTest.java | 60 ++++++++++++++++++- 3 files changed, 104 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index de9bff15169..16bf499a407 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -31,6 +31,7 @@ import org.apache.druid.segment.Segment; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -41,6 +42,9 @@ import java.util.concurrent.ConcurrentHashMap; */ public class SegmentLoaderLocalCacheManager implements SegmentLoader { + @VisibleForTesting + static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker"; + private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); private final IndexIO indexIO; @@ -132,6 +136,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader return findStorageLocationIfLoaded(segment) != null; } + @Nullable private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) { for (StorageLocation location : locations) { @@ -167,7 +172,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader */ private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) { - final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker"); + final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME); return downloadStartMarker.exists(); } @@ -270,7 +275,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { // We use a marker to prevent the case where a segment is downloaded, but before the download completes, // the parent directories of the segment are removed - final File downloadStartMarker = new File(storageDir, "downloadStartMarker"); + final File downloadStartMarker = new File(storageDir, DOWNLOAD_START_MARKER_FILE_NAME); synchronized (directoryWriteRemoveLock) { if (!storageDir.mkdirs()) { log.debug("Unable to make parent file[%s]", storageDir); diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index a01aef45540..75bbeff61c7 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -28,9 +28,6 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -39,36 +36,22 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.io.File; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -@RunWith(Parameterized.class) public class SegmentLoaderLocalCacheManagerTest { - @Parameterized.Parameters - public static Collection constructorFeeder() - { - return ImmutableList.of( - new Object[] {TmpFileSegmentWriteOutMediumFactory.instance()}, - new Object[] {OffHeapMemorySegmentWriteOutMediumFactory.instance()} - ); - } - @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); private final ObjectMapper jsonMapper; - private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; private File localSegmentCacheFolder; private SegmentLoaderLocalCacheManager manager; - public SegmentLoaderLocalCacheManagerTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) + public SegmentLoaderLocalCacheManagerTest() { jsonMapper = new DefaultObjectMapper(); jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); @@ -78,7 +61,6 @@ public class SegmentLoaderLocalCacheManagerTest new LocalDataSegmentPuller() ) ); - this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; } @Before @@ -750,4 +732,43 @@ public class SegmentLoaderLocalCacheManagerTest Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3)); } + @Test + public void testGetSegmentFilesWhenDownloadStartMarkerExists() throws Exception + { + final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); + + final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + localStorageFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + + // manually create a local segment under localStorageFolder + final File localSegmentFile = new File( + localStorageFolder, + "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0" + ); + Assert.assertTrue(localSegmentFile.mkdirs()); + final File indexZip = new File(localSegmentFile, "index.zip"); + Assert.assertTrue(indexZip.createNewFile()); + + final File cachedSegmentDir = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + // Emulate a corrupted segment file + final File downloadMarker = new File( + cachedSegmentDir, + SegmentLoaderLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME + ); + Assert.assertTrue(downloadMarker.createNewFile()); + + Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse(cachedSegmentDir.exists()); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 32368b700c6..a9797846e64 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -29,15 +29,18 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.loading.CacheTestSegmentLoader; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus; +import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -45,6 +48,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -92,6 +97,11 @@ public class SegmentLoadDropHandlerTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + public SegmentLoadDropHandlerTest() + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + } + @Before public void setUp() { @@ -234,7 +244,7 @@ public class SegmentLoadDropHandlerTest jsonMapper, segmentLoaderConfig, announcer, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), new ServerTypeConfig(ServerType.HISTORICAL) @@ -449,7 +459,9 @@ public class SegmentLoadDropHandlerTest return 50; } }, - announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager, + announcer, + Mockito.mock(DataSegmentServerAnnouncer.class), + segmentManager, new ServerTypeConfig(ServerType.HISTORICAL) ); @@ -522,4 +534,46 @@ public class SegmentLoadDropHandlerTest segmentLoadDropHandler.stop(); } + + @Test(timeout = 60_000L) + public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception + { + final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); + Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())) + .thenThrow(new RuntimeException("segment loading failure test")) + .thenReturn(true); + final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( + jsonMapper, + segmentLoaderConfig, + announcer, + Mockito.mock(DataSegmentServerAnnouncer.class), + segmentManager, + scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), + new ServerTypeConfig(ServerType.HISTORICAL) + ); + + segmentLoadDropHandler.start(); + + DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); + + List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); + + ListenableFuture> future = segmentLoadDropHandler + .processBatch(batch); + + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + List result = future.get(); + Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState()); + + future = segmentLoadDropHandler.processBatch(batch); + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + result = future.get(); + Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + + segmentLoadDropHandler.stop(); + } }