mirror of https://github.com/apache/druid.git
Reload segment usage when starting the process (#10884)
* Reload segment usage when starting the process * doc * Add more tests * remove forbidden method * Add alert
This commit is contained in:
parent
b7e9f5bc85
commit
3a0a0c033f
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.segment.loading;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.utils.JvmUtils;
|
||||
|
||||
|
@ -128,9 +129,19 @@ public class SegmentLoaderConfig
|
|||
return retVal;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public SegmentLoaderConfig withInfoDir(File infoDir)
|
||||
{
|
||||
SegmentLoaderConfig retVal = new SegmentLoaderConfig();
|
||||
retVal.locations = this.locations;
|
||||
retVal.deleteOnRemove = this.deleteOnRemove;
|
||||
retVal.infoDir = infoDir;
|
||||
return retVal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert StorageLocationConfig objects to StorageLocation objects
|
||||
*
|
||||
* <p>
|
||||
* Note: {@link #getLocations} is called instead of variable access because some testcases overrides this method
|
||||
*/
|
||||
public List<StorageLocation> toStorageLocations()
|
||||
|
|
|
@ -224,6 +224,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
|
||||
if (loc == null) {
|
||||
loc = loadSegmentWithRetry(segment, storageDir);
|
||||
} else {
|
||||
// If the segment is already downloaded on disk, we just update the current usage
|
||||
loc.maybeReserve(storageDir, segment);
|
||||
}
|
||||
return new File(loc.getPath(), storageDir);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.apache.druid.segment.loading;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -41,7 +41,7 @@ import java.util.Set;
|
|||
*/
|
||||
public class StorageLocation
|
||||
{
|
||||
private static final Logger log = new Logger(StorageLocation.class);
|
||||
private static final EmittingLogger log = new EmittingLogger(StorageLocation.class);
|
||||
|
||||
private final File path;
|
||||
private final long maxSizeBytes;
|
||||
|
@ -117,6 +117,33 @@ public class StorageLocation
|
|||
return reserve(segmentDir, segment.getId().toString(), segment.getSize());
|
||||
}
|
||||
|
||||
/**
|
||||
* Reserves space to store the given segment, only if it has not been done already. This can be used
|
||||
* when segment is already downloaded on the disk. Unlike {@link #reserve(String, DataSegment)}, this function
|
||||
* skips the check on disk availability. We also account for segment usage even if available size dips below 0.
|
||||
* Such a situation indicates a configuration problem or a bug and we don't let segment loading fail because
|
||||
* of this.
|
||||
*/
|
||||
public synchronized void maybeReserve(String segmentFilePathToAdd, DataSegment segment)
|
||||
{
|
||||
final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
|
||||
if (files.contains(segmentFileToAdd)) {
|
||||
// Already reserved
|
||||
return;
|
||||
}
|
||||
files.add(segmentFileToAdd);
|
||||
currSizeBytes += segment.getSize();
|
||||
if (availableSizeBytes() < 0) {
|
||||
log.makeAlert(
|
||||
"storage[%s:%,d] has more segments than it is allowed. Currently loading Segment[%s:%,d]. Please increase druid.segmentCache.locations maxSize param",
|
||||
getPath(),
|
||||
availableSizeBytes(),
|
||||
segment.getId(),
|
||||
segment.getSize()
|
||||
).emit();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reserves space to store the given segment.
|
||||
* If it succeeds, it returns a file for the given segmentFilePathToAdd in this storage location.
|
||||
|
|
|
@ -298,7 +298,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
|
||||
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
|
||||
{
|
||||
Status result = null;
|
||||
try {
|
||||
|
@ -339,7 +339,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
}
|
||||
finally {
|
||||
updateRequestStatus(new SegmentChangeRequestLoad(segment), result);
|
||||
callback.execute();
|
||||
if (null != callback) {
|
||||
callback.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -417,14 +419,15 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
}
|
||||
|
||||
@Override
|
||||
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
|
||||
public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
|
||||
{
|
||||
removeSegment(segment, callback, true);
|
||||
}
|
||||
|
||||
private void removeSegment(
|
||||
@VisibleForTesting
|
||||
void removeSegment(
|
||||
final DataSegment segment,
|
||||
final DataSegmentChangeCallback callback,
|
||||
@Nullable final DataSegmentChangeCallback callback,
|
||||
final boolean scheduleDrop
|
||||
)
|
||||
{
|
||||
|
@ -478,7 +481,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
|
|||
}
|
||||
finally {
|
||||
updateRequestStatus(new SegmentChangeRequestDrop(segment), result);
|
||||
callback.execute();
|
||||
if (null != callback) {
|
||||
callback.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,12 +20,19 @@
|
|||
package org.apache.druid.segment.loading;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.common.FileUtils;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.service.AlertBuilder;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collections;
|
||||
|
@ -53,6 +60,18 @@ public class StorageLocationTest
|
|||
Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStorageLocationRealFileSystem()
|
||||
{
|
||||
File file = FileUtils.createTempDir();
|
||||
file.deleteOnExit();
|
||||
StorageLocation location = new StorageLocation(file, 10_000, 100.0d);
|
||||
Assert.assertFalse(location.canHandle(newSegmentId("2012/2013").toString(), 5_000));
|
||||
|
||||
location = new StorageLocation(file, 10_000, 0.0001d);
|
||||
Assert.assertTrue(location.canHandle(newSegmentId("2012/2013").toString(), 1));
|
||||
}
|
||||
|
||||
private StorageLocation fakeLocation(long total, long free, long max, Double percent)
|
||||
{
|
||||
File file = EasyMock.mock(File.class);
|
||||
|
@ -95,6 +114,45 @@ public class StorageLocationTest
|
|||
verifyLoc(expectedAvail, loc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeReserve()
|
||||
{
|
||||
ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
|
||||
ArgumentCaptor<ServiceEventBuilder> argumentCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
|
||||
EmittingLogger.registerEmitter(emitter);
|
||||
long expectedAvail = 1000L;
|
||||
StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null);
|
||||
|
||||
verifyLoc(expectedAvail, loc);
|
||||
|
||||
final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23);
|
||||
|
||||
loc.maybeReserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
|
||||
expectedAvail -= 10;
|
||||
verifyLoc(expectedAvail, loc);
|
||||
|
||||
loc.maybeReserve("test1", makeSegment("2012-01-01/2012-01-02", 10));
|
||||
verifyLoc(expectedAvail, loc);
|
||||
|
||||
loc.maybeReserve("test2", secondSegment);
|
||||
expectedAvail -= 23;
|
||||
verifyLoc(expectedAvail, loc);
|
||||
|
||||
loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10));
|
||||
expectedAvail += 10;
|
||||
verifyLoc(expectedAvail, loc);
|
||||
|
||||
loc.maybeReserve("test3", makeSegment("2012-01-01/2012-01-02", 999));
|
||||
expectedAvail -= 999;
|
||||
verifyLoc(expectedAvail, loc);
|
||||
|
||||
Mockito.verify(emitter).emit(argumentCaptor.capture());
|
||||
AlertBuilder alertBuilder = (AlertBuilder) argumentCaptor.getValue();
|
||||
String description = alertBuilder.build(ImmutableMap.of()).getDescription();
|
||||
Assert.assertNotNull(description);
|
||||
Assert.assertTrue(description, description.contains("Please increase druid.segmentCache.locations maxSize param"));
|
||||
}
|
||||
|
||||
private void verifyLoc(long maxSize, StorageLocation loc)
|
||||
{
|
||||
Assert.assertEquals(maxSize, loc.availableSizeBytes());
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.server.coordination;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.druid.guice.ServerTypeConfig;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.segment.IndexIO;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.TestIndex;
|
||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||
import org.apache.druid.segment.loading.LoadSpec;
|
||||
import org.apache.druid.segment.loading.SegmentLoaderConfig;
|
||||
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
|
||||
import org.apache.druid.segment.loading.SegmentLoadingException;
|
||||
import org.apache.druid.segment.loading.SegmentizerFactory;
|
||||
import org.apache.druid.server.SegmentManager;
|
||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.partition.NoneShardSpec;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
||||
/**
|
||||
* This class includes tests that cover the storage location layer as well.
|
||||
*/
|
||||
public class SegmentLoadDropHandlerCacheTest
|
||||
{
|
||||
private static final long MAX_SIZE = 1000L;
|
||||
private static final long SEGMENT_SIZE = 100L;
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
private SegmentLoadDropHandler loadDropHandler;
|
||||
private TestStorageLocation storageLoc;
|
||||
private ObjectMapper objectMapper;
|
||||
private DataSegmentAnnouncer segmentAnnouncer;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException
|
||||
{
|
||||
storageLoc = new TestStorageLocation(temporaryFolder);
|
||||
SegmentLoaderConfig config = new SegmentLoaderConfig()
|
||||
.withLocations(Collections.singletonList(storageLoc.toStorageLocationConfig(MAX_SIZE, null)))
|
||||
.withInfoDir(storageLoc.getInfoDir());
|
||||
objectMapper = TestHelper.makeJsonMapper();
|
||||
objectMapper.registerSubtypes(TestLoadSpec.class);
|
||||
objectMapper.registerSubtypes(TestSegmentizerFactory.class);
|
||||
SegmentManager segmentManager = new SegmentManager(new SegmentLoaderLocalCacheManager(
|
||||
TestIndex.INDEX_IO,
|
||||
config,
|
||||
objectMapper
|
||||
));
|
||||
segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class);
|
||||
loadDropHandler = new SegmentLoadDropHandler(
|
||||
objectMapper,
|
||||
config,
|
||||
segmentAnnouncer,
|
||||
Mockito.mock(DataSegmentServerAnnouncer.class),
|
||||
segmentManager,
|
||||
new ServerTypeConfig(ServerType.HISTORICAL)
|
||||
);
|
||||
EmittingLogger.registerEmitter(new NoopServiceEmitter());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadLocalCache() throws IOException, SegmentLoadingException
|
||||
{
|
||||
File cacheDir = storageLoc.getCacheDir();
|
||||
|
||||
// write some segments to file bypassing loadDropHandler
|
||||
int numSegments = (int) (MAX_SIZE / SEGMENT_SIZE);
|
||||
List<DataSegment> expectedSegments = new ArrayList<>();
|
||||
for (int i = 0; i < numSegments; i++) {
|
||||
String name = "segment-" + i;
|
||||
DataSegment segment = makeSegment("test", name);
|
||||
storageLoc.writeSegmentInfoToCache(segment);
|
||||
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
|
||||
File segmentDir = new File(cacheDir, storageDir);
|
||||
new TestLoadSpec((int) SEGMENT_SIZE, name).loadSegment(segmentDir);
|
||||
expectedSegments.add(segment);
|
||||
}
|
||||
|
||||
// Start the load drop handler
|
||||
loadDropHandler.start();
|
||||
|
||||
// Verify the expected announcements
|
||||
ArgumentCaptor<Iterable<DataSegment>> argCaptor = ArgumentCaptor.forClass(Iterable.class);
|
||||
Mockito.verify(segmentAnnouncer).announceSegments(argCaptor.capture());
|
||||
List<DataSegment> announcedSegments = new ArrayList<>();
|
||||
argCaptor.getValue().forEach(announcedSegments::add);
|
||||
announcedSegments.sort(Comparator.comparing(DataSegment::getVersion));
|
||||
Assert.assertEquals(expectedSegments, announcedSegments);
|
||||
|
||||
// make sure adding segments beyond allowed size fails
|
||||
Mockito.reset(segmentAnnouncer);
|
||||
DataSegment newSegment = makeSegment("test", "new-segment");
|
||||
loadDropHandler.addSegment(newSegment, null);
|
||||
Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegment(any());
|
||||
Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegments(any());
|
||||
|
||||
// clearing some segment should allow for new segments
|
||||
loadDropHandler.removeSegment(expectedSegments.get(0), null, false);
|
||||
loadDropHandler.addSegment(newSegment, null);
|
||||
Mockito.verify(segmentAnnouncer).announceSegment(newSegment);
|
||||
}
|
||||
|
||||
private DataSegment makeSegment(String dataSource, String name)
|
||||
{
|
||||
return new DataSegment(
|
||||
dataSource,
|
||||
Intervals.utc(System.currentTimeMillis() - 60 * 1000, System.currentTimeMillis()),
|
||||
name,
|
||||
ImmutableMap.of("type", "test", "name", name, "size", SEGMENT_SIZE),
|
||||
Arrays.asList("dim1", "dim2", "dim3"),
|
||||
Arrays.asList("metric1", "metric2"),
|
||||
NoneShardSpec.instance(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
SEGMENT_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
@JsonTypeName("test")
|
||||
public static class TestLoadSpec implements LoadSpec
|
||||
{
|
||||
|
||||
private final int size;
|
||||
private final String name;
|
||||
|
||||
@JsonCreator
|
||||
public TestLoadSpec(
|
||||
@JsonProperty("size") int size,
|
||||
@JsonProperty("name") String name
|
||||
)
|
||||
{
|
||||
this.size = size;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
|
||||
{
|
||||
File segmentFile = new File(destDir, "segment");
|
||||
File factoryJson = new File(destDir, "factory.json");
|
||||
try {
|
||||
destDir.mkdirs();
|
||||
segmentFile.createNewFile();
|
||||
factoryJson.createNewFile();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(
|
||||
e,
|
||||
"Failed to create files under dir '%s'",
|
||||
destDir.getAbsolutePath()
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
byte[] bytes = new byte[size];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
Files.write(bytes, segmentFile);
|
||||
Files.write("{\"type\":\"testSegmentFactory\"}".getBytes(StandardCharsets.UTF_8), factoryJson);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(
|
||||
e,
|
||||
"Failed to write data in directory %s",
|
||||
destDir.getAbsolutePath()
|
||||
);
|
||||
}
|
||||
return new LoadSpecResult(size);
|
||||
}
|
||||
}
|
||||
|
||||
@JsonTypeName("testSegmentFactory")
|
||||
public static class TestSegmentizerFactory implements SegmentizerFactory
|
||||
{
|
||||
|
||||
@Override
|
||||
public Segment factorize(
|
||||
DataSegment segment,
|
||||
File parentDir,
|
||||
boolean lazy,
|
||||
SegmentLazyLoadFailCallback loadFailed
|
||||
)
|
||||
{
|
||||
return Mockito.mock(Segment.class);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -28,7 +28,6 @@ import org.apache.druid.guice.ServerTypeConfig;
|
|||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.segment.IndexIO;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
|
@ -65,7 +64,6 @@ import java.util.concurrent.ScheduledFuture;
|
|||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -73,14 +71,13 @@ public class SegmentLoadDropHandlerTest
|
|||
{
|
||||
public static final int COUNT = 50;
|
||||
|
||||
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
||||
|
||||
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
|
||||
|
||||
private SegmentLoadDropHandler segmentLoadDropHandler;
|
||||
|
||||
private DataSegmentAnnouncer announcer;
|
||||
private File infoDir;
|
||||
private TestStorageLocation testStorageLocation;
|
||||
private AtomicInteger announceCount;
|
||||
private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
|
||||
private CacheTestSegmentLoader segmentLoader;
|
||||
|
@ -106,19 +103,15 @@ public class SegmentLoadDropHandlerTest
|
|||
public void setUp()
|
||||
{
|
||||
try {
|
||||
infoDir = temporaryFolder.newFolder();
|
||||
log.info("Creating tmp test files in [%s]", infoDir);
|
||||
testStorageLocation = new TestStorageLocation(temporaryFolder);
|
||||
infoDir = testStorageLocation.getInfoDir();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
locations = Collections.singletonList(
|
||||
new StorageLocationConfig(
|
||||
infoDir,
|
||||
100L,
|
||||
100d
|
||||
)
|
||||
testStorageLocation.toStorageLocationConfig()
|
||||
);
|
||||
|
||||
scheduledRunnable = new ArrayList<>();
|
||||
|
@ -169,7 +162,7 @@ public class SegmentLoadDropHandlerTest
|
|||
@Override
|
||||
public File getInfoDir()
|
||||
{
|
||||
return infoDir;
|
||||
return testStorageLocation.getInfoDir();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -345,10 +338,10 @@ public class SegmentLoadDropHandlerTest
|
|||
}
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
writeSegmentToCache(segment);
|
||||
testStorageLocation.writeSegmentInfoToCache(segment);
|
||||
}
|
||||
|
||||
checkCache(segments);
|
||||
testStorageLocation.checkInfoCache(segments);
|
||||
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
|
||||
segmentLoadDropHandler.start();
|
||||
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
|
||||
|
@ -360,7 +353,7 @@ public class SegmentLoadDropHandlerTest
|
|||
segmentLoadDropHandler.stop();
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
deleteSegmentFromCache(segment);
|
||||
testStorageLocation.deleteSegmentInfoFromCache(segment);
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, infoDir.listFiles().length);
|
||||
|
@ -382,52 +375,6 @@ public class SegmentLoadDropHandlerTest
|
|||
);
|
||||
}
|
||||
|
||||
private void writeSegmentToCache(final DataSegment segment)
|
||||
{
|
||||
if (!infoDir.exists()) {
|
||||
infoDir.mkdir();
|
||||
}
|
||||
|
||||
File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
Assert.assertTrue(segmentInfoCacheFile.exists());
|
||||
}
|
||||
|
||||
private void deleteSegmentFromCache(final DataSegment segment)
|
||||
{
|
||||
File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
|
||||
if (segmentInfoCacheFile.exists()) {
|
||||
segmentInfoCacheFile.delete();
|
||||
}
|
||||
|
||||
Assert.assertTrue(!segmentInfoCacheFile.exists());
|
||||
}
|
||||
|
||||
private void checkCache(Set<DataSegment> expectedSegments)
|
||||
{
|
||||
Assert.assertTrue(infoDir.exists());
|
||||
File[] files = infoDir.listFiles();
|
||||
|
||||
Set<DataSegment> segmentsInFiles = Arrays
|
||||
.stream(files)
|
||||
.map(file -> {
|
||||
try {
|
||||
return jsonMapper.readValue(file, DataSegment.class);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
Assert.assertEquals(expectedSegments, segmentsInFiles);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartStop() throws Exception
|
||||
{
|
||||
|
@ -475,10 +422,10 @@ public class SegmentLoadDropHandlerTest
|
|||
}
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
writeSegmentToCache(segment);
|
||||
testStorageLocation.writeSegmentInfoToCache(segment);
|
||||
}
|
||||
|
||||
checkCache(segments);
|
||||
testStorageLocation.checkInfoCache(segments);
|
||||
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
|
||||
|
||||
handler.start();
|
||||
|
@ -491,7 +438,7 @@ public class SegmentLoadDropHandlerTest
|
|||
handler.stop();
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
deleteSegmentFromCache(segment);
|
||||
testStorageLocation.deleteSegmentInfoFromCache(segment);
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, infoDir.listFiles().length);
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.server.coordination;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.loading.StorageLocationConfig;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.junit.Assert;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class TestStorageLocation
|
||||
{
|
||||
private static final Logger log = new Logger(TestStorageLocation.class);
|
||||
private final File cacheDir;
|
||||
private final File infoDir;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public TestStorageLocation(TemporaryFolder temporaryFolder) throws IOException
|
||||
{
|
||||
cacheDir = temporaryFolder.newFolder();
|
||||
infoDir = temporaryFolder.newFolder();
|
||||
log.info("Creating tmp test files in [%s]", infoDir);
|
||||
jsonMapper = TestHelper.makeJsonMapper();
|
||||
}
|
||||
|
||||
public File getInfoDir()
|
||||
{
|
||||
return infoDir;
|
||||
}
|
||||
|
||||
public File getCacheDir()
|
||||
{
|
||||
return cacheDir;
|
||||
}
|
||||
|
||||
public void writeSegmentInfoToCache(final DataSegment segment)
|
||||
{
|
||||
if (!infoDir.exists()) {
|
||||
infoDir.mkdir();
|
||||
}
|
||||
|
||||
File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
Assert.assertTrue(segmentInfoCacheFile.exists());
|
||||
}
|
||||
|
||||
public void deleteSegmentInfoFromCache(final DataSegment segment)
|
||||
{
|
||||
File segmentInfoCacheFile = new File(infoDir, segment.getId().toString());
|
||||
if (segmentInfoCacheFile.exists()) {
|
||||
segmentInfoCacheFile.delete();
|
||||
}
|
||||
|
||||
Assert.assertFalse(segmentInfoCacheFile.exists());
|
||||
}
|
||||
|
||||
public void checkInfoCache(Set<DataSegment> expectedSegments)
|
||||
{
|
||||
Assert.assertTrue(infoDir.exists());
|
||||
File[] files = infoDir.listFiles();
|
||||
|
||||
Set<DataSegment> segmentsInFiles = Arrays
|
||||
.stream(files)
|
||||
.map(file -> {
|
||||
try {
|
||||
return jsonMapper.readValue(file, DataSegment.class);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
Assert.assertEquals(expectedSegments, segmentsInFiles);
|
||||
}
|
||||
|
||||
public StorageLocationConfig toStorageLocationConfig()
|
||||
{
|
||||
if (!cacheDir.exists()) {
|
||||
cacheDir.mkdirs();
|
||||
}
|
||||
return new StorageLocationConfig(cacheDir, 100L, 100d);
|
||||
}
|
||||
|
||||
public StorageLocationConfig toStorageLocationConfig(long maxSize, Double freeSpacePercent)
|
||||
{
|
||||
if (!cacheDir.exists()) {
|
||||
cacheDir.mkdirs();
|
||||
}
|
||||
return new StorageLocationConfig(cacheDir, maxSize, freeSpacePercent);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue