diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
index b2cf07580cf..bc0b79c39f0 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.druid.utils.JvmUtils;
@@ -128,9 +129,19 @@ public class SegmentLoaderConfig
return retVal;
}
+ @VisibleForTesting
+ public SegmentLoaderConfig withInfoDir(File infoDir)
+ {
+ SegmentLoaderConfig retVal = new SegmentLoaderConfig();
+ retVal.locations = this.locations;
+ retVal.deleteOnRemove = this.deleteOnRemove;
+ retVal.infoDir = infoDir;
+ return retVal;
+ }
+
/**
* Convert StorageLocationConfig objects to StorageLocation objects
- *
+ *
* Note: {@link #getLocations} is called instead of variable access because some testcases overrides this method
*/
public List toStorageLocations()
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index cd4d3fd86e5..eb32ea4c5f6 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -224,6 +224,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
+ } else {
+ // If the segment is already downloaded on disk, we just update the current usage
+ loc.maybeReserve(storageDir, segment);
}
return new File(loc.getPath(), storageDir);
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 2d31c1bca15..ce36b786070 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -22,7 +22,7 @@ package org.apache.druid.segment.loading;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.commons.io.FileUtils;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@@ -41,7 +41,7 @@ import java.util.Set;
*/
public class StorageLocation
{
- private static final Logger log = new Logger(StorageLocation.class);
+ private static final EmittingLogger log = new EmittingLogger(StorageLocation.class);
private final File path;
private final long maxSizeBytes;
@@ -117,6 +117,33 @@ public class StorageLocation
return reserve(segmentDir, segment.getId().toString(), segment.getSize());
}
+ /**
+ * Reserves space to store the given segment, only if it has not been done already. This can be used
+ * when segment is already downloaded on the disk. Unlike {@link #reserve(String, DataSegment)}, this function
+ * skips the check on disk availability. We also account for segment usage even if available size dips below 0.
+ * Such a situation indicates a configuration problem or a bug and we don't let segment loading fail because
+ * of this.
+ */
+ public synchronized void maybeReserve(String segmentFilePathToAdd, DataSegment segment)
+ {
+ final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
+ if (files.contains(segmentFileToAdd)) {
+ // Already reserved
+ return;
+ }
+ files.add(segmentFileToAdd);
+ currSizeBytes += segment.getSize();
+ if (availableSizeBytes() < 0) {
+ log.makeAlert(
+ "storage[%s:%,d] has more segments than it is allowed. Currently loading Segment[%s:%,d]. Please increase druid.segmentCache.locations maxSize param",
+ getPath(),
+ availableSizeBytes(),
+ segment.getId(),
+ segment.getSize()
+ ).emit();
+ }
+ }
+
/**
* Reserves space to store the given segment.
* If it succeeds, it returns a file for the given segmentFilePathToAdd in this storage location.
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index dcc03097b55..bc0da936df0 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -298,7 +298,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
@Override
- public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
+ public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{
Status result = null;
try {
@@ -339,7 +339,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
finally {
updateRequestStatus(new SegmentChangeRequestLoad(segment), result);
- callback.execute();
+ if (null != callback) {
+ callback.execute();
+ }
}
}
@@ -417,14 +419,15 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
@Override
- public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
+ public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{
removeSegment(segment, callback, true);
}
- private void removeSegment(
+ @VisibleForTesting
+ void removeSegment(
final DataSegment segment,
- final DataSegmentChangeCallback callback,
+ @Nullable final DataSegmentChangeCallback callback,
final boolean scheduleDrop
)
{
@@ -478,7 +481,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
finally {
updateRequestStatus(new SegmentChangeRequestDrop(segment), result);
- callback.execute();
+ if (null != callback) {
+ callback.execute();
+ }
}
}
diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
index 7be5328de5b..0edcebdf0c8 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
@@ -20,12 +20,19 @@
package org.apache.druid.segment.loading;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.AlertBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import java.io.File;
import java.util.Collections;
@@ -53,6 +60,18 @@ public class StorageLocationTest
Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000));
}
+ @Test
+ public void testStorageLocationRealFileSystem()
+ {
+ File file = FileUtils.createTempDir();
+ file.deleteOnExit();
+ StorageLocation location = new StorageLocation(file, 10_000, 100.0d);
+ Assert.assertFalse(location.canHandle(newSegmentId("2012/2013").toString(), 5_000));
+
+ location = new StorageLocation(file, 10_000, 0.0001d);
+ Assert.assertTrue(location.canHandle(newSegmentId("2012/2013").toString(), 1));
+ }
+
private StorageLocation fakeLocation(long total, long free, long max, Double percent)
{
File file = EasyMock.mock(File.class);
@@ -95,6 +114,45 @@ public class StorageLocationTest
verifyLoc(expectedAvail, loc);
}
+ @Test
+ public void testMaybeReserve()
+ {
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
+ EmittingLogger.registerEmitter(emitter);
+ long expectedAvail = 1000L;
+ StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null);
+
+ verifyLoc(expectedAvail, loc);
+
+ final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23);
+
+ loc.maybeReserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
+ expectedAvail -= 10;
+ verifyLoc(expectedAvail, loc);
+
+ loc.maybeReserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
+ verifyLoc(expectedAvail, loc);
+
+ loc.maybeReserve("test2", secondSegment);
+ expectedAvail -= 23;
+ verifyLoc(expectedAvail, loc);
+
+ loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10));
+ expectedAvail += 10;
+ verifyLoc(expectedAvail, loc);
+
+ loc.maybeReserve("test3", makeSegment("2012-01-01/2012-01-02", 999));
+ expectedAvail -= 999;
+ verifyLoc(expectedAvail, loc);
+
+ Mockito.verify(emitter).emit(argumentCaptor.capture());
+ AlertBuilder alertBuilder = (AlertBuilder) argumentCaptor.getValue();
+ String description = alertBuilder.build(ImmutableMap.of()).getDescription();
+ Assert.assertNotNull(description);
+ Assert.assertTrue(description, description.contains("Please increase druid.segmentCache.locations maxSize param"));
+ }
+
private void verifyLoc(long maxSize, StorageLocation loc)
{
Assert.assertEquals(maxSize, loc.availableSizeBytes());
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
new file mode 100644
index 00000000000..37cc2606a63
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentizerFactory;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.mockito.ArgumentMatchers.any;
+
+/**
+ * This class includes tests that cover the storage location layer as well.
+ */
+public class SegmentLoadDropHandlerCacheTest
+{
+ private static final long MAX_SIZE = 1000L;
+ private static final long SEGMENT_SIZE = 100L;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private SegmentLoadDropHandler loadDropHandler;
+ private TestStorageLocation storageLoc;
+ private ObjectMapper objectMapper;
+ private DataSegmentAnnouncer segmentAnnouncer;
+
+ @Before
+ public void setup() throws IOException
+ {
+ storageLoc = new TestStorageLocation(temporaryFolder);
+ SegmentLoaderConfig config = new SegmentLoaderConfig()
+ .withLocations(Collections.singletonList(storageLoc.toStorageLocationConfig(MAX_SIZE, null)))
+ .withInfoDir(storageLoc.getInfoDir());
+ objectMapper = TestHelper.makeJsonMapper();
+ objectMapper.registerSubtypes(TestLoadSpec.class);
+ objectMapper.registerSubtypes(TestSegmentizerFactory.class);
+ SegmentManager segmentManager = new SegmentManager(new SegmentLoaderLocalCacheManager(
+ TestIndex.INDEX_IO,
+ config,
+ objectMapper
+ ));
+ segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class);
+ loadDropHandler = new SegmentLoadDropHandler(
+ objectMapper,
+ config,
+ segmentAnnouncer,
+ Mockito.mock(DataSegmentServerAnnouncer.class),
+ segmentManager,
+ new ServerTypeConfig(ServerType.HISTORICAL)
+ );
+ EmittingLogger.registerEmitter(new NoopServiceEmitter());
+ }
+
+ @Test
+ public void testLoadLocalCache() throws IOException, SegmentLoadingException
+ {
+ File cacheDir = storageLoc.getCacheDir();
+
+ // write some segments to file bypassing loadDropHandler
+ int numSegments = (int) (MAX_SIZE / SEGMENT_SIZE);
+ List expectedSegments = new ArrayList<>();
+ for (int i = 0; i < numSegments; i++) {
+ String name = "segment-" + i;
+ DataSegment segment = makeSegment("test", name);
+ storageLoc.writeSegmentInfoToCache(segment);
+ String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
+ File segmentDir = new File(cacheDir, storageDir);
+ new TestLoadSpec((int) SEGMENT_SIZE, name).loadSegment(segmentDir);
+ expectedSegments.add(segment);
+ }
+
+ // Start the load drop handler
+ loadDropHandler.start();
+
+ // Verify the expected announcements
+ ArgumentCaptor> argCaptor = ArgumentCaptor.forClass(Iterable.class);
+ Mockito.verify(segmentAnnouncer).announceSegments(argCaptor.capture());
+ List announcedSegments = new ArrayList<>();
+ argCaptor.getValue().forEach(announcedSegments::add);
+ announcedSegments.sort(Comparator.comparing(DataSegment::getVersion));
+ Assert.assertEquals(expectedSegments, announcedSegments);
+
+ // make sure adding segments beyond allowed size fails
+ Mockito.reset(segmentAnnouncer);
+ DataSegment newSegment = makeSegment("test", "new-segment");
+ loadDropHandler.addSegment(newSegment, null);
+ Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegment(any());
+ Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegments(any());
+
+ // clearing some segment should allow for new segments
+ loadDropHandler.removeSegment(expectedSegments.get(0), null, false);
+ loadDropHandler.addSegment(newSegment, null);
+ Mockito.verify(segmentAnnouncer).announceSegment(newSegment);
+ }
+
+ private DataSegment makeSegment(String dataSource, String name)
+ {
+ return new DataSegment(
+ dataSource,
+ Intervals.utc(System.currentTimeMillis() - 60 * 1000, System.currentTimeMillis()),
+ name,
+ ImmutableMap.of("type", "test", "name", name, "size", SEGMENT_SIZE),
+ Arrays.asList("dim1", "dim2", "dim3"),
+ Arrays.asList("metric1", "metric2"),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ SEGMENT_SIZE
+ );
+ }
+
+ @JsonTypeName("test")
+ public static class TestLoadSpec implements LoadSpec
+ {
+
+ private final int size;
+ private final String name;
+
+ @JsonCreator
+ public TestLoadSpec(
+ @JsonProperty("size") int size,
+ @JsonProperty("name") String name
+ )
+ {
+ this.size = size;
+ this.name = name;
+ }
+
+ @Override
+ public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
+ {
+ File segmentFile = new File(destDir, "segment");
+ File factoryJson = new File(destDir, "factory.json");
+ try {
+ destDir.mkdirs();
+ segmentFile.createNewFile();
+ factoryJson.createNewFile();
+ }
+ catch (IOException e) {
+ throw new SegmentLoadingException(
+ e,
+ "Failed to create files under dir '%s'",
+ destDir.getAbsolutePath()
+ );
+ }
+
+ try {
+ byte[] bytes = new byte[size];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ Files.write(bytes, segmentFile);
+ Files.write("{\"type\":\"testSegmentFactory\"}".getBytes(StandardCharsets.UTF_8), factoryJson);
+ }
+ catch (IOException e) {
+ throw new SegmentLoadingException(
+ e,
+ "Failed to write data in directory %s",
+ destDir.getAbsolutePath()
+ );
+ }
+ return new LoadSpecResult(size);
+ }
+ }
+
+ @JsonTypeName("testSegmentFactory")
+ public static class TestSegmentizerFactory implements SegmentizerFactory
+ {
+
+ @Override
+ public Segment factorize(
+ DataSegment segment,
+ File parentDir,
+ boolean lazy,
+ SegmentLazyLoadFailCallback loadFailed
+ )
+ {
+ return Mockito.mock(Segment.class);
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 8aa9f367835..3930706658e 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -28,7 +28,6 @@ import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
@@ -65,7 +64,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
/**
*/
@@ -73,14 +71,13 @@ public class SegmentLoadDropHandlerTest
{
public static final int COUNT = 50;
- private static final Logger log = new Logger(ZkCoordinatorTest.class);
-
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private SegmentLoadDropHandler segmentLoadDropHandler;
private DataSegmentAnnouncer announcer;
private File infoDir;
+ private TestStorageLocation testStorageLocation;
private AtomicInteger announceCount;
private ConcurrentSkipListSet segmentsAnnouncedByMe;
private CacheTestSegmentLoader segmentLoader;
@@ -106,19 +103,15 @@ public class SegmentLoadDropHandlerTest
public void setUp()
{
try {
- infoDir = temporaryFolder.newFolder();
- log.info("Creating tmp test files in [%s]", infoDir);
+ testStorageLocation = new TestStorageLocation(temporaryFolder);
+ infoDir = testStorageLocation.getInfoDir();
}
catch (IOException e) {
throw new RuntimeException(e);
}
locations = Collections.singletonList(
- new StorageLocationConfig(
- infoDir,
- 100L,
- 100d
- )
+ testStorageLocation.toStorageLocationConfig()
);
scheduledRunnable = new ArrayList<>();
@@ -169,7 +162,7 @@ public class SegmentLoadDropHandlerTest
@Override
public File getInfoDir()
{
- return infoDir;
+ return testStorageLocation.getInfoDir();
}
@Override
@@ -345,10 +338,10 @@ public class SegmentLoadDropHandlerTest
}
for (DataSegment segment : segments) {
- writeSegmentToCache(segment);
+ testStorageLocation.writeSegmentInfoToCache(segment);
}
- checkCache(segments);
+ testStorageLocation.checkInfoCache(segments);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
segmentLoadDropHandler.start();
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
@@ -360,7 +353,7 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.stop();
for (DataSegment segment : segments) {
- deleteSegmentFromCache(segment);
+ testStorageLocation.deleteSegmentInfoFromCache(segment);
}
Assert.assertEquals(0, infoDir.listFiles().length);
@@ -382,52 +375,6 @@ public class SegmentLoadDropHandlerTest
);
}
- private void writeSegmentToCache(final DataSegment segment)
- {
- if (!infoDir.exists()) {
- infoDir.mkdir();
- }
-
- File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
- try {
- jsonMapper.writeValue(segmentInfoCacheFile, segment);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- Assert.assertTrue(segmentInfoCacheFile.exists());
- }
-
- private void deleteSegmentFromCache(final DataSegment segment)
- {
- File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
- if (segmentInfoCacheFile.exists()) {
- segmentInfoCacheFile.delete();
- }
-
- Assert.assertTrue(!segmentInfoCacheFile.exists());
- }
-
- private void checkCache(Set expectedSegments)
- {
- Assert.assertTrue(infoDir.exists());
- File[] files = infoDir.listFiles();
-
- Set segmentsInFiles = Arrays
- .stream(files)
- .map(file -> {
- try {
- return jsonMapper.readValue(file, DataSegment.class);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .collect(Collectors.toSet());
- Assert.assertEquals(expectedSegments, segmentsInFiles);
- }
-
@Test
public void testStartStop() throws Exception
{
@@ -475,10 +422,10 @@ public class SegmentLoadDropHandlerTest
}
for (DataSegment segment : segments) {
- writeSegmentToCache(segment);
+ testStorageLocation.writeSegmentInfoToCache(segment);
}
- checkCache(segments);
+ testStorageLocation.checkInfoCache(segments);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
@@ -491,7 +438,7 @@ public class SegmentLoadDropHandlerTest
handler.stop();
for (DataSegment segment : segments) {
- deleteSegmentFromCache(segment);
+ testStorageLocation.deleteSegmentInfoFromCache(segment);
}
Assert.assertEquals(0, infoDir.listFiles().length);
diff --git a/server/src/test/java/org/apache/druid/server/coordination/TestStorageLocation.java b/server/src/test/java/org/apache/druid/server/coordination/TestStorageLocation.java
new file mode 100644
index 00000000000..d2b55f4b115
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordination/TestStorageLocation.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TestStorageLocation
+{
+ private static final Logger log = new Logger(TestStorageLocation.class);
+ private final File cacheDir;
+ private final File infoDir;
+ private final ObjectMapper jsonMapper;
+
+ public TestStorageLocation(TemporaryFolder temporaryFolder) throws IOException
+ {
+ cacheDir = temporaryFolder.newFolder();
+ infoDir = temporaryFolder.newFolder();
+ log.info("Creating tmp test files in [%s]", infoDir);
+ jsonMapper = TestHelper.makeJsonMapper();
+ }
+
+ public File getInfoDir()
+ {
+ return infoDir;
+ }
+
+ public File getCacheDir()
+ {
+ return cacheDir;
+ }
+
+ public void writeSegmentInfoToCache(final DataSegment segment)
+ {
+ if (!infoDir.exists()) {
+ infoDir.mkdir();
+ }
+
+ File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
+ try {
+ jsonMapper.writeValue(segmentInfoCacheFile, segment);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ Assert.assertTrue(segmentInfoCacheFile.exists());
+ }
+
+ public void deleteSegmentInfoFromCache(final DataSegment segment)
+ {
+ File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
+ if (segmentInfoCacheFile.exists()) {
+ segmentInfoCacheFile.delete();
+ }
+
+ Assert.assertFalse(segmentInfoCacheFile.exists());
+ }
+
+ public void checkInfoCache(Set expectedSegments)
+ {
+ Assert.assertTrue(infoDir.exists());
+ File[] files = infoDir.listFiles();
+
+ Set segmentsInFiles = Arrays
+ .stream(files)
+ .map(file -> {
+ try {
+ return jsonMapper.readValue(file, DataSegment.class);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toSet());
+ Assert.assertEquals(expectedSegments, segmentsInFiles);
+ }
+
+ public StorageLocationConfig toStorageLocationConfig()
+ {
+ if (!cacheDir.exists()) {
+ cacheDir.mkdirs();
+ }
+ return new StorageLocationConfig(cacheDir, 100L, 100d);
+ }
+
+ public StorageLocationConfig toStorageLocationConfig(long maxSize, Double freeSpacePercent)
+ {
+ if (!cacheDir.exists()) {
+ cacheDir.mkdirs();
+ }
+ return new StorageLocationConfig(cacheDir, maxSize, freeSpacePercent);
+ }
+}