Add missing unit tests for segment loading in historicals (#10737)

* Add missing unit tests for segment loading in historicals

* unused import
This commit is contained in:
Jihoon Son 2021-01-11 16:20:13 -08:00 committed by GitHub
parent fe0511b16a
commit 3984457e5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 24 deletions

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
@ -41,6 +42,9 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class SegmentLoaderLocalCacheManager implements SegmentLoader public class SegmentLoaderLocalCacheManager implements SegmentLoader
{ {
@VisibleForTesting
static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker";
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
private final IndexIO indexIO; private final IndexIO indexIO;
@ -132,6 +136,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
return findStorageLocationIfLoaded(segment) != null; return findStorageLocationIfLoaded(segment) != null;
} }
@Nullable
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{ {
for (StorageLocation location : locations) { for (StorageLocation location : locations) {
@ -167,7 +172,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
*/ */
private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
{ {
final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker"); final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME);
return downloadStartMarker.exists(); return downloadStartMarker.exists();
} }
@ -270,7 +275,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
{ {
// We use a marker to prevent the case where a segment is downloaded, but before the download completes, // We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed // the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker"); final File downloadStartMarker = new File(storageDir, DOWNLOAD_START_MARKER_FILE_NAME);
synchronized (directoryWriteRemoveLock) { synchronized (directoryWriteRemoveLock) {
if (!storageDir.mkdirs()) { if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir); log.debug("Unable to make parent file[%s]", storageDir);

View File

@ -28,9 +28,6 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec;
@ -39,36 +36,22 @@ import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
@RunWith(Parameterized.class)
public class SegmentLoaderLocalCacheManagerTest public class SegmentLoaderLocalCacheManagerTest
{ {
@Parameterized.Parameters
public static Collection<?> constructorFeeder()
{
return ImmutableList.of(
new Object[] {TmpFileSegmentWriteOutMediumFactory.instance()},
new Object[] {OffHeapMemorySegmentWriteOutMediumFactory.instance()}
);
}
@Rule @Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder(); public final TemporaryFolder tmpFolder = new TemporaryFolder();
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
private File localSegmentCacheFolder; private File localSegmentCacheFolder;
private SegmentLoaderLocalCacheManager manager; private SegmentLoaderLocalCacheManager manager;
public SegmentLoaderLocalCacheManagerTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) public SegmentLoaderLocalCacheManagerTest()
{ {
jsonMapper = new DefaultObjectMapper(); jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
@ -78,7 +61,6 @@ public class SegmentLoaderLocalCacheManagerTest
new LocalDataSegmentPuller() new LocalDataSegmentPuller()
) )
); );
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
} }
@Before @Before
@ -750,4 +732,43 @@ public class SegmentLoaderLocalCacheManagerTest
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3)); Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
} }
@Test
public void testGetSegmentFilesWhenDownloadStartMarkerExists() throws Exception
{
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
localStorageFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under localStorageFolder
final File localSegmentFile = new File(
localStorageFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
);
Assert.assertTrue(localSegmentFile.mkdirs());
final File indexZip = new File(localSegmentFile, "index.zip");
Assert.assertTrue(indexZip.createNewFile());
final File cachedSegmentDir = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
// Emulate a corrupted segment file
final File downloadMarker = new File(
cachedSegmentDir,
SegmentLoaderLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME
);
Assert.assertTrue(downloadMarker.createNewFile());
Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentLoaded(segmentToDownload));
Assert.assertFalse(cachedSegmentDir.exists());
}
} }

View File

@ -29,15 +29,18 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.CacheTestSegmentLoader; import org.apache.druid.segment.loading.CacheTestSegmentLoader;
import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus;
import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -45,6 +48,8 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -92,6 +97,11 @@ public class SegmentLoadDropHandlerTest
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
public SegmentLoadDropHandlerTest()
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
@Before @Before
public void setUp() public void setUp()
{ {
@ -234,7 +244,7 @@ public class SegmentLoadDropHandlerTest
jsonMapper, jsonMapper,
segmentLoaderConfig, segmentLoaderConfig,
announcer, announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), Mockito.mock(DataSegmentServerAnnouncer.class),
segmentManager, segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL) new ServerTypeConfig(ServerType.HISTORICAL)
@ -449,7 +459,9 @@ public class SegmentLoadDropHandlerTest
return 50; return 50;
} }
}, },
announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager, announcer,
Mockito.mock(DataSegmentServerAnnouncer.class),
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL) new ServerTypeConfig(ServerType.HISTORICAL)
); );
@ -522,4 +534,46 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.stop(); segmentLoadDropHandler.stop();
} }
@Test(timeout = 60_000L)
public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception
{
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean()))
.thenThrow(new RuntimeException("segment loading failure test"))
.thenReturn(true);
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
jsonMapper,
segmentLoaderConfig,
announcer,
Mockito.mock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL)
);
segmentLoadDropHandler.start();
DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
List<DataSegmentChangeRequestAndStatus> result = future.get();
Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState());
future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
result = future.get();
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
segmentLoadDropHandler.stop();
}
} }