Add intermediary data server for shuffle (#8088)

* Add intermediary data server for shuffle

* javadoc

* adjust timeout

* resolved todo

* fix test

* style

* address comments

* rename to shuffleDataLocations

* Address comments

* bit adjustment StorageLocation

* fix test

* address comment & fix test

* handle interrupted exception
This commit is contained in:
Jihoon Son 2019-07-18 14:46:47 -07:00 committed by GitHub
parent 03e55d30eb
commit c7eb7cd018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 990 additions and 110 deletions

View File

@ -2562,6 +2562,7 @@ public class KafkaIndexTaskTest
null,
true,
null,
null,
null
);
final TestDerbyConnector derbyConnector = derby.getConnector();

View File

@ -2747,6 +2747,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
null,
true,
null,
null,
null
);
final TestDerbyConnector derbyConnector = derby.getConnector();

View File

@ -54,7 +54,7 @@ public class SegmentLoaderFactory
return new SegmentLoaderLocalCacheManager(
indexIO,
new SegmentLoaderConfig().withLocations(
Collections.singletonList(new StorageLocationConfig().setPath(storageDir))),
Collections.singletonList(new StorageLocationConfig(storageDir, null, null))),
jsonMapper
);
}

View File

@ -22,10 +22,13 @@ package org.apache.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
public class TaskConfig
@ -35,7 +38,6 @@ public class TaskConfig
);
private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
@JsonProperty
@ -62,6 +64,9 @@ public class TaskConfig
@JsonProperty
private final Period directoryLockTimeout;
@JsonProperty
private final List<StorageLocationConfig> shuffleDataLocations;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@ -71,7 +76,8 @@ public class TaskConfig
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@ -89,6 +95,13 @@ public class TaskConfig
this.directoryLockTimeout = directoryLockTimeout == null
? DEFAULT_DIRECTORY_LOCK_TIMEOUT
: directoryLockTimeout;
if (shuffleDataLocations == null) {
this.shuffleDataLocations = Collections.singletonList(
new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null)
);
} else {
this.shuffleDataLocations = shuffleDataLocations;
}
}
@JsonProperty
@ -154,7 +167,13 @@ public class TaskConfig
return directoryLockTimeout;
}
private String defaultDir(String configParameter, final String defaultVal)
@JsonProperty
public List<StorageLocationConfig> getShuffleDataLocations()
{
return shuffleDataLocations;
}
private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
return Paths.get(getBaseDir(), defaultVal).toString();

View File

@ -372,9 +372,7 @@ public class OverlordResource
@Path("/taskStatus")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getMultipleTaskStatuses(
Set<String> taskIds
)
public Response getMultipleTaskStatuses(Set<String> taskIds)
{
if (taskIds == null || taskIds.size() == 0) {
return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build();

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker;
import com.google.common.collect.Iterators;
import com.google.common.io.Files;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatus;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* This class manages intermediary segments for data shuffle between native parallel index tasks.
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers
* and phase 2 tasks read those files via HTTP.
*
* The directory where segment files are placed is structured as
* {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment.
*
* This class provides interfaces to store, find, and remove segment files.
* It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time
* per supervisorTask and removes its all segment files if the supervisorTask is not running anymore.
*/
@ManageLifecycle
public class IntermediaryDataManager
{
private static final Logger log = new Logger(IntermediaryDataManager.class);
private final long intermediaryPartitionDiscoveryPeriodSec;
private final long intermediaryPartitionCleanupPeriodSec;
private final Period intermediaryPartitionTimeout;
private final List<StorageLocation> shuffleDataLocations;
private final IndexingServiceClient indexingServiceClient;
// supervisorTaskId -> time to check supervisorTask status
// This time is initialized when a new supervisorTask is found and updated whenever a partition is accessed for
// the supervisor.
private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap<>();
// supervisorTaskId -> cyclic iterator of storage locations
private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap<>();
// The overlord is supposed to send a cleanup request as soon as the supervisorTask is finished in parallel indexing,
// but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
// partitions.
// This can be null until IntermediaryDataManager is started.
@Nullable
private ScheduledExecutorService supervisorTaskChecker;
@Inject
public IntermediaryDataManager(
WorkerConfig workerConfig,
TaskConfig taskConfig,
IndexingServiceClient indexingServiceClient
)
{
this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
this.shuffleDataLocations = taskConfig
.getShuffleDataLocations()
.stream()
.map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent()))
.collect(Collectors.toList());
this.indexingServiceClient = indexingServiceClient;
}
@LifecycleStart
public void start()
{
supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
// Discover partitions for new supervisorTasks
supervisorTaskChecker.scheduleAtFixedRate(
() -> {
try {
discoverSupervisorTaskPartitions();
}
catch (Exception e) {
log.warn(e, "Error while discovering supervisorTasks");
}
},
intermediaryPartitionDiscoveryPeriodSec,
intermediaryPartitionDiscoveryPeriodSec,
TimeUnit.SECONDS
);
supervisorTaskChecker.scheduleAtFixedRate(
() -> {
try {
deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
}
catch (InterruptedException e) {
log.error(e, "Error while cleaning up partitions for expired supervisors");
}
catch (Exception e) {
log.warn(e, "Error while cleaning up partitions for expired supervisors");
}
},
intermediaryPartitionCleanupPeriodSec,
intermediaryPartitionCleanupPeriodSec,
TimeUnit.SECONDS
);
}
@LifecycleStop
public void stop() throws InterruptedException
{
if (supervisorTaskChecker != null) {
supervisorTaskChecker.shutdownNow();
supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
}
supervisorTaskCheckTimes.clear();
}
private void discoverSupervisorTaskPartitions()
{
for (StorageLocation location : shuffleDataLocations) {
final MutableInt numDiscovered = new MutableInt(0);
final File[] dirsPerSupervisorTask = location.getPath().listFiles();
if (dirsPerSupervisorTask != null) {
for (File supervisorTaskDir : dirsPerSupervisorTask) {
final String supervisorTaskId = supervisorTaskDir.getName();
supervisorTaskCheckTimes.computeIfAbsent(
supervisorTaskId,
k -> {
numDiscovered.increment();
return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
}
);
}
}
log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue());
}
}
/**
* Check supervisorTask status if its partitions have not been accessed in timeout and
* delete all partitions for the supervisorTask if it is already finished.
*
* Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
* the self-cleanup for when the cleanup request is missing.
*/
private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException
{
final DateTime now = DateTimes.nowUtc();
final Set<String> expiredSupervisorTasks = new HashSet<>();
for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
final String supervisorTaskId = entry.getKey();
final DateTime checkTime = entry.getValue();
if (checkTime.isAfter(now)) {
expiredSupervisorTasks.add(supervisorTaskId);
}
}
log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
final String supervisorTaskId = entry.getKey();
final TaskStatus status = entry.getValue();
if (status.getStatusCode().isComplete()) {
// If it's finished, clean up all partitions for the supervisor task.
try {
deletePartitions(supervisorTaskId);
}
catch (IOException e) {
log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
}
} else {
// If it's still running, update last access time.
supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
}
}
}
/**
* Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
* supervisorTaskId.
*
* This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static
* addSegment method.
*/
public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile)
throws IOException
{
final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
supervisorTaskId,
k -> Iterators.cycle(shuffleDataLocations)
);
addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile);
}
public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
{
for (StorageLocation location : shuffleDataLocations) {
final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId);
if (partitionDir.exists()) {
supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
final File[] segmentFiles = partitionDir.listFiles();
return segmentFiles == null ? Collections.emptyList() : Arrays.asList(segmentFiles);
}
}
return Collections.emptyList();
}
public void deletePartitions(String supervisorTaskId) throws IOException
{
for (StorageLocation location : shuffleDataLocations) {
final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
if (supervisorTaskPath.exists()) {
log.info("Cleaning up [%s]", supervisorTaskPath);
for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
location.removeFile(eachFile);
}
FileUtils.forceDelete(supervisorTaskPath);
}
}
supervisorTaskCheckTimes.remove(supervisorTaskId);
}
/**
* Iterate through the given storage locations to find one which can handle the given segment.
*/
public static void addSegment(
Iterator<StorageLocation> cyclicIterator,
int numLocations,
String supervisorTaskId,
String subTaskId,
DataSegment segment,
File segmentFile
) throws IOException
{
final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment);
final File destFile = new File(
getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()),
subTaskId
);
FileUtils.forceMkdirParent(destFile);
final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
if (copiedBytes == 0) {
throw new IOE(
"0 bytes copied after copying a segment file from [%s] to [%s]",
segmentFile.getAbsolutePath(),
destFile.getAbsolutePath()
);
}
location.addFile(destFile);
}
private static StorageLocation findLocationForSegment(
Iterator<StorageLocation> cyclicIterator,
int numLocations,
DataSegment segment
)
{
for (int i = 0; i < numLocations; i++) {
final StorageLocation location = cyclicIterator.next();
if (location.canHandle(segment)) {
return location;
}
}
throw new ISE("Can't find location to handle segment[%s]", segment);
}
private static File getPartitionDir(
StorageLocation location,
String supervisorTaskId,
Interval interval,
int partitionId
)
{
return FileUtils.getFile(
location.getPath(),
supervisorTaskId,
interval.getStart().toString(),
interval.getEnd().toString(),
String.valueOf(partitionId)
);
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.worker.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.DruidNode;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.Period;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
@ -32,15 +33,24 @@ public class WorkerConfig
{
@JsonProperty
@NotNull
private String ip = DruidNode.getDefaultHost();
private final String ip = DruidNode.getDefaultHost();
@JsonProperty
@NotNull
private String version = "0";
private final String version = "0";
@JsonProperty
@Min(1)
private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
private final int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
@JsonProperty
private final long intermediaryPartitionDiscoveryPeriodSec = 60L;
@JsonProperty
private final long intermediaryPartitionCleanupPeriodSec = 300L;
@JsonProperty
private final Period intermediaryPartitionTimeout = new Period("P1D");
public String getIp()
{
@ -56,4 +66,19 @@ public class WorkerConfig
{
return capacity;
}
public long getIntermediaryPartitionDiscoveryPeriodSec()
{
return intermediaryPartitionDiscoveryPeriodSec;
}
public long getIntermediaryPartitionCleanupPeriodSec()
{
return intermediaryPartitionCleanupPeriodSec;
}
public Period getIntermediaryPartitionTimeout()
{
return intermediaryPartitionTimeout;
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.http;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.http.security.StateResourceFilter;
import org.joda.time.Interval;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
/**
* HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle
* data.
*
* We use {@link StateResourceFilter} here because it performs an admin-like authorization and
* all endpoints here are supposed to be used for only internal communcation.
* Another possible alternate could be performing datasource-level authorization as in TaskResourceFilter.
* However, datasource information is not available in middleManagers or indexers yet which makes hard to use it.
* We could develop a new ResourceFileter in the future if needed.
*/
@Path("/druid/worker/v1/shuffle")
@ResourceFilters(StateResourceFilter.class)
public class ShuffleResource
{
private static final Logger log = new Logger(ShuffleResource.class);
private final IntermediaryDataManager intermediaryDataManager;
@Inject
public ShuffleResource(IntermediaryDataManager intermediaryDataManager)
{
this.intermediaryDataManager = intermediaryDataManager;
}
@GET
@Path("/task/{supervisorTaskId}/partition")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response getPartition(
@PathParam("supervisorTaskId") String supervisorTaskId,
@QueryParam("startTime") String startTime,
@QueryParam("endTime") String endTime,
@QueryParam("partitionId") int partitionId
)
{
final Interval interval = new Interval(DateTimes.of(startTime), DateTimes.of(endTime));
final List<File> partitionFiles = intermediaryDataManager.findPartitionFiles(
supervisorTaskId,
interval,
partitionId
);
if (partitionFiles.isEmpty()) {
final String errorMessage = StringUtils.format(
"Can't find the partition for supervisor[%s], interval[%s], and partitionId[%s]",
supervisorTaskId,
interval,
partitionId
);
return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
} else {
return Response.ok(
(StreamingOutput) output -> {
for (File partitionFile : partitionFiles) {
try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {
ByteStreams.copy(fileInputStream, output);
}
}
}
).build();
}
}
@DELETE
@Path("/task/{supervisorTaskId}")
public Response deletePartitions(@PathParam("supervisorTaskId") String supervisorTaskId)
{
try {
intermediaryDataManager.deletePartitions(supervisorTaskId);
return Response.ok(supervisorTaskId).build();
}
catch (IOException e) {
log.error(e, "Error while deleting partitions of supervisorTask[%s]", supervisorTaskId);
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
}
}
}

View File

@ -95,7 +95,7 @@ public class TaskToolboxTest
EasyMock.replay(task, mockHandoffNotifierFactory);
taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null),
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null),
mockTaskActionClientFactory,
mockEmitter,
mockSegmentPusher,

View File

@ -1519,7 +1519,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
return result;
}
};
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,

View File

@ -346,16 +346,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
@Override
public List<StorageLocationConfig> getLocations()
{
return ImmutableList.of(
new StorageLocationConfig()
{
@Override
public File getPath()
{
return deepStorageDir;
}
}
);
return ImmutableList.of(new StorageLocationConfig(deepStorageDir, null, null));
}
},
objectMapper

View File

@ -78,6 +78,7 @@ public class HadoopTaskTest
ImmutableList.of("something:hadoop:1"),
false,
null,
null,
null
)).once();
EasyMock.replay(toolbox);

View File

@ -204,7 +204,7 @@ public class IndexTaskTest
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
new StorageLocationConfig().setPath(cacheDir)
new StorageLocationConfig(cacheDir, null, null)
);
}
},

View File

@ -881,7 +881,7 @@ public class RealtimeIndexTaskTest
final File directory
)
{
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));

View File

@ -84,10 +84,14 @@ import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -115,6 +119,9 @@ public class IngestSegmentFirehoseFactoryTest
private static final TaskLockbox TASK_LOCKBOX;
private static final Task TASK;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
static {
TestUtils testUtils = new TestUtils();
MAPPER = setupInjectablesInObjectMapper(TestHelper.makeJsonMapper());
@ -299,6 +306,7 @@ public class IngestSegmentFirehoseFactoryTest
private final FirehoseFactory<InputRowParser> factory;
private final InputRowParser rowParser;
private File tempDir;
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new TimeAndDimsParseSpec(
@ -376,6 +384,18 @@ public class IngestSegmentFirehoseFactoryTest
}
}
@Before
public void setup() throws IOException
{
tempDir = temporaryFolder.newFolder();
}
@After
public void teardown()
{
tempDir.delete();
}
@Test
public void sanityTest()
{
@ -402,7 +422,7 @@ public class IngestSegmentFirehoseFactoryTest
{
Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
Integer rowcount = 0;
try (final Firehose firehose = factory.connect(rowParser, null)) {
try (final Firehose firehose = factory.connect(rowParser, tmpDir)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray());
@ -432,7 +452,7 @@ public class IngestSegmentFirehoseFactoryTest
);
int skipped = 0;
try (final Firehose firehose =
factory.connect(transformSpec.decorate(rowParser), null)) {
factory.connect(transformSpec.decorate(rowParser), tmpDir)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
if (row == null) {

View File

@ -148,7 +148,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
int count = 0;
long sum = 0;
try (final Firehose firehose = factory.connect(ROW_PARSER, null)) {
try (final Firehose firehose = factory.connect(ROW_PARSER, tmpDir)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;
@ -176,7 +176,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
for (InputSplit<List<WindowedSegmentId>> split : splits) {
final FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> splitFactory =
factory.withSplit(split);
try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) {
try (final Firehose firehose = splitFactory.connect(ROW_PARSER, tmpDir)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;

View File

@ -76,6 +76,7 @@ public class SingleTaskBackgroundRunnerTest
null,
true,
null,
null,
null
);
final ServiceEmitter emitter = new NoopServiceEmitter();

View File

@ -540,7 +540,7 @@ public class TaskLifecycleTest
new TaskAuditLogConfig(true)
);
File tmpDir = temporaryFolder.newFolder();
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker;
import com.amazonaws.util.StringUtils;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatus;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class IntermediaryDataManagerAutoCleanupTest
{
@Rule
public TemporaryFolder tempDir = new TemporaryFolder();
private IntermediaryDataManager intermediaryDataManager;
@Before
public void setup() throws IOException
{
final WorkerConfig workerConfig = new WorkerConfig()
{
@Override
public long getIntermediaryPartitionDiscoveryPeriodSec()
{
return 1;
}
@Override
public long getIntermediaryPartitionCleanupPeriodSec()
{
return 2;
}
@Override
public Period getIntermediaryPartitionTimeout()
{
return new Period("PT2S");
}
};
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
{
@Override
public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
{
final Map<String, TaskStatus> result = new HashMap<>();
for (String taskId : taskIds) {
result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
}
return result;
}
};
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
intermediaryDataManager.start();
}
@After
public void teardown() throws InterruptedException
{
intermediaryDataManager.stop();
}
@Test
public void testCleanup() throws IOException, InterruptedException
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
final File segmentFile = generateSegmentFile();
final DataSegment segment = newSegment(interval, 0);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
Thread.sleep(8000);
Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty());
}
private File generateSegmentFile() throws IOException
{
final File segmentFile = tempDir.newFile();
FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
return segmentFile;
}
private DataSegment newSegment(Interval interval, int partitionId)
{
return new DataSegment(
"dataSource",
interval,
"version",
null,
null,
null,
new NumberedShardSpec(partitionId, 0),
9,
10
);
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker;
import com.amazonaws.util.StringUtils;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
public class IntermediaryDataManagerManualAddAndDeleteTest
{
@Rule
public TemporaryFolder tempDir = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private IntermediaryDataManager intermediaryDataManager;
@Before
public void setup() throws IOException
{
final WorkerConfig workerConfig = new WorkerConfig();
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null))
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
intermediaryDataManager.start();
}
@After
public void teardown() throws InterruptedException
{
intermediaryDataManager.stop();
}
@Test
public void testAddSegmentFailure() throws IOException
{
for (int i = 0; i < 15; i++) {
File segmentFile = generateSegmentFile();
DataSegment segment = newSegment(Intervals.of("2018/2019"), i);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Can't find location to handle segment");
File segmentFile = generateSegmentFile();
DataSegment segment = newSegment(Intervals.of("2018/2019"), 16);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
@Test
public void testFindPartitionFiles() throws IOException
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
final int partitionId = 0;
for (int i = 0; i < 10; i++) {
final File segmentFile = generateSegmentFile();
final DataSegment segment = newSegment(interval, partitionId);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile);
}
final List<File> files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId);
Assert.assertEquals(10, files.size());
files.sort(Comparator.comparing(File::getName));
for (int i = 0; i < 10; i++) {
Assert.assertEquals("subTaskId_" + i, files.get(i).getName());
}
}
@Test
public void deletePartitions() throws IOException
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
for (int partitionId = 0; partitionId < 5; partitionId++) {
for (int subTaskId = 0; subTaskId < 3; subTaskId++) {
final File segmentFile = generateSegmentFile();
final DataSegment segment = newSegment(interval, partitionId);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile);
}
}
intermediaryDataManager.deletePartitions(supervisorTaskId);
for (int partitionId = 0; partitionId < 5; partitionId++) {
Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty());
}
}
@Test
public void testAddRemoveAdd() throws IOException
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
for (int i = 0; i < 15; i++) {
File segmentFile = generateSegmentFile();
DataSegment segment = newSegment(interval, i);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
intermediaryDataManager.deletePartitions(supervisorTaskId);
File segmentFile = generateSegmentFile();
DataSegment segment = newSegment(interval, 16);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
}
private File generateSegmentFile() throws IOException
{
final File segmentFile = tempDir.newFile();
FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
return segmentFile;
}
private DataSegment newSegment(Interval interval, int partitionId)
{
return new DataSegment(
"dataSource",
interval,
"version",
null,
null,
null,
new NumberedShardSpec(partitionId, 0),
9,
10
);
}
}

View File

@ -85,6 +85,7 @@ public class WorkerTaskManagerTest
null,
false,
null,
null,
null
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);

View File

@ -153,6 +153,7 @@ public class WorkerTaskMonitorTest
null,
false,
null,
null,
null
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);

View File

@ -270,6 +270,27 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
}
}
@Override
public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) throws InterruptedException
{
try {
final FullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus")
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskIds))
);
return jsonMapper.readValue(
responseHolder.getContent(),
new TypeReference<Map<String, TaskStatus>>()
{
}
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
@Nullable
public TaskStatusPlus getLastCompleteTask()

View File

@ -27,6 +27,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface IndexingServiceClient
{
@ -55,6 +56,8 @@ public interface IndexingServiceClient
TaskStatusResponse getTaskStatus(String taskId);
Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) throws InterruptedException;
@Nullable
TaskStatusPlus getLastCompleteTask();

View File

@ -33,7 +33,7 @@ public class TaskStatusResponse
{
private final String task; // Task ID, named "task" in the JSONification of this class.
@Nullable
private final TaskStatusPlus status;
private final TaskStatusPlus status; // null for unknown tasks
@JsonCreator
public TaskStatusResponse(

View File

@ -163,8 +163,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
}
loc.addSegment(segment);
return new File(loc.getPath(), storageDir);
final File localStorageDir = new File(loc.getPath(), storageDir);
loc.addSegmentDir(localStorageDir, segment);
return localStorageDir;
}
finally {
unlock(segment, lock);
@ -270,7 +271,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
cleanupCacheFiles(location.getPath(), localStorageDir);
location.removeSegment(segment);
location.removeSegmentDir(localStorageDir, segment);
}
}
}

View File

@ -19,6 +19,8 @@
package org.apache.druid.segment.loading;
import org.apache.commons.io.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
@ -29,18 +31,18 @@ import java.util.Set;
/**
*/
class StorageLocation
public class StorageLocation
{
private static final Logger log = new Logger(StorageLocation.class);
private final File path;
private final long maxSize;
private final long freeSpaceToKeep;
private final Set<DataSegment> segments;
private final Set<File> files = new HashSet<>();
private volatile long currSize = 0;
StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
{
this.path = path;
this.maxSize = maxSize;
@ -57,35 +59,62 @@ class StorageLocation
} else {
this.freeSpaceToKeep = 0;
}
this.segments = new HashSet<>();
}
File getPath()
public File getPath()
{
return path;
}
long getMaxSize()
public long getMaxSize()
{
return maxSize;
}
synchronized void addSegment(DataSegment segment)
/**
* Add a new file to this location. The given file argument must be a file rather than directory.
*/
public synchronized void addFile(File file)
{
if (segments.add(segment)) {
if (file.isDirectory()) {
throw new ISE("[%s] must be a file. Use a");
}
if (files.add(file)) {
currSize += FileUtils.sizeOf(file);
}
}
/**
* Add a new segment dir to this location. The segment size is added to currSize.
*/
public synchronized void addSegmentDir(File segmentDir, DataSegment segment)
{
if (files.add(segmentDir)) {
currSize += segment.getSize();
}
}
synchronized void removeSegment(DataSegment segment)
/**
* Remove a segment file from this location. The given file argument must be a file rather than directory.
*/
public synchronized void removeFile(File file)
{
if (segments.remove(segment)) {
if (files.remove(file)) {
currSize -= FileUtils.sizeOf(file);
}
}
/**
* Remove a segment dir from this location. The segment size is subtracted from currSize.
*/
public synchronized void removeSegmentDir(File segmentDir, DataSegment segment)
{
if (files.remove(segmentDir)) {
currSize -= segment.getSize();
}
}
boolean canHandle(DataSegment segment)
public boolean canHandle(DataSegment segment)
{
if (available() < segment.getSize()) {
log.warn(
@ -114,7 +143,7 @@ class StorageLocation
return true;
}
synchronized long available()
public synchronized long available()
{
return maxSize - currSize;
}

View File

@ -19,66 +19,66 @@
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.annotation.Nullable;
import java.io.File;
/**
*/
public class StorageLocationConfig
{
@JsonProperty
@NotNull
private File path = null;
private final File path;
private final long maxSize;
@Nullable
private final Double freeSpacePercent;
@JsonCreator
public StorageLocationConfig(
@JsonProperty("path") File path,
@JsonProperty("maxSize") @Nullable Long maxSize,
@JsonProperty("freeSpacePercent") @Nullable Double freeSpacePercent
)
{
this.path = Preconditions.checkNotNull(path, "path");
this.maxSize = maxSize == null ? Long.MAX_VALUE : maxSize;
this.freeSpacePercent = freeSpacePercent;
Preconditions.checkArgument(this.maxSize > 0, "maxSize[%s] should be positive", this.maxSize);
Preconditions.checkArgument(
this.freeSpacePercent == null || this.freeSpacePercent >= 0,
"freeSpacePercent[%s] should be 0 or a positive double",
this.freeSpacePercent
);
}
@JsonProperty
@Min(1)
private long maxSize = Long.MAX_VALUE;
@JsonProperty
private Double freeSpacePercent;
public File getPath()
{
return path;
}
public StorageLocationConfig setPath(File path)
{
this.path = path;
return this;
}
@JsonProperty
public long getMaxSize()
{
return maxSize;
}
public StorageLocationConfig setMaxSize(long maxSize)
{
this.maxSize = maxSize;
return this;
}
@JsonProperty
@Nullable
public Double getFreeSpacePercent()
{
return freeSpacePercent;
}
public StorageLocationConfig setFreeSpacePercent(Double freeSpacePercent)
{
this.freeSpacePercent = freeSpacePercent;
return this;
}
@Override
public String toString()
{
return "StorageLocationConfig{" +
"path=" + path +
", maxSize=" + maxSize +
", freeSpacePercent=" + freeSpacePercent +
'}';
}
}

View File

@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class NoopIndexingServiceClient implements IndexingServiceClient
{
@ -85,6 +86,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
return new TaskStatusResponse(taskId, null);
}
@Override
public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
{
return Collections.emptyMap();
}
@Nullable
@Override
public TaskStatusPlus getLastCompleteTask()

View File

@ -88,9 +88,7 @@ public class SegmentLoaderLocalCacheManagerTest
localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
final List<StorageLocationConfig> locations = new ArrayList<>();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
locationConfig.setPath(localSegmentCacheFolder);
locationConfig.setMaxSize(10000000000L);
final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 10000000000L, null);
locations.add(locationConfig);
manager = new SegmentLoaderLocalCacheManager(
@ -157,14 +155,10 @@ public class SegmentLoaderLocalCacheManagerTest
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
final List<StorageLocationConfig> locations = new ArrayList<>();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(10000000000L);
final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10000000000L, null);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(1000000000L);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 1000000000L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
@ -207,17 +201,13 @@ public class SegmentLoaderLocalCacheManagerTest
public void testRetrySuccessAtSecondLocation() throws Exception
{
final List<StorageLocationConfig> locations = new ArrayList<>();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
// mock can't write in first location
localStorageFolder.setWritable(false);
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(1000000000L);
final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(10000000L);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
@ -260,19 +250,15 @@ public class SegmentLoaderLocalCacheManagerTest
public void testRetryAllFail() throws Exception
{
final List<StorageLocationConfig> locations = new ArrayList<>();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
// mock can't write in first location
localStorageFolder.setWritable(false);
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(1000000000L);
final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
// mock can't write in second location
localStorageFolder2.setWritable(false);
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(10000000L);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
@ -316,17 +302,13 @@ public class SegmentLoaderLocalCacheManagerTest
public void testEmptyToFullOrder() throws Exception
{
final List<StorageLocationConfig> locations = new ArrayList<>();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
localStorageFolder.setWritable(true);
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(10L);
final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10L, null);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
localStorageFolder2.setWritable(true);
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(10L);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(

View File

@ -71,25 +71,25 @@ public class StorageLocationTest
final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23);
loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10));
loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
expectedAvail -= 10;
verifyLoc(expectedAvail, loc);
loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10));
loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
verifyLoc(expectedAvail, loc);
loc.addSegment(secondSegment);
loc.addSegmentDir(new File("test2"), secondSegment);
expectedAvail -= 23;
verifyLoc(expectedAvail, loc);
loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10));
loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
expectedAvail += 10;
verifyLoc(expectedAvail, loc);
loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10));
loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
verifyLoc(expectedAvail, loc);
loc.removeSegment(secondSegment);
loc.removeSegmentDir(new File("test2"), secondSegment);
expectedAvail += 23;
verifyLoc(expectedAvail, loc);
}

View File

@ -105,7 +105,7 @@ public class SegmentManagerThreadSafetyTest
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
new StorageLocationConfig().setPath(segmentCacheDir)
new StorageLocationConfig(segmentCacheDir, null, null)
);
}
},

View File

@ -29,6 +29,7 @@ import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.WorkerNodeService;
@ -53,6 +54,7 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.indexing.worker.http.TaskManagementResource;
import org.apache.druid.indexing.worker.http.WorkerResource;
import org.apache.druid.java.util.common.logger.Logger;
@ -101,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null));
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
.toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
@ -129,6 +131,7 @@ public class CliMiddleManager extends ServerRunnable
.in(LazySingleton.class);
Jerseys.addResource(binder, WorkerResource.class);
Jerseys.addResource(binder, TaskManagementResource.class);
Jerseys.addResource(binder, ShuffleResource.class);
LifecycleModule.register(binder, Server.class);