From c7eb7cd01837c48914ba284d08b6096b47c957b0 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 18 Jul 2019 14:46:47 -0700 Subject: [PATCH] Add intermediary data server for shuffle (#8088) * Add intermediary data server for shuffle * javadoc * adjust timeout * resolved todo * fix test * style * address comments * rename to shuffleDataLocations * Address comments * bit adjustment StorageLocation * fix test * address comment & fix test * handle interrupted exception --- .../indexing/kafka/KafkaIndexTaskTest.java | 1 + .../kinesis/KinesisIndexTaskTest.java | 1 + .../indexing/common/SegmentLoaderFactory.java | 2 +- .../indexing/common/config/TaskConfig.java | 25 +- .../overlord/http/OverlordResource.java | 4 +- .../worker/IntermediaryDataManager.java | 331 ++++++++++++++++++ .../indexing/worker/config/WorkerConfig.java | 31 +- .../indexing/worker/http/ShuffleResource.java | 122 +++++++ .../indexing/common/TaskToolboxTest.java | 2 +- ...penderatorDriverRealtimeIndexTaskTest.java | 2 +- .../common/task/CompactionTaskRunTest.java | 11 +- .../indexing/common/task/HadoopTaskTest.java | 1 + .../indexing/common/task/IndexTaskTest.java | 2 +- .../common/task/RealtimeIndexTaskTest.java | 2 +- .../IngestSegmentFirehoseFactoryTest.java | 24 +- ...estSegmentFirehoseFactoryTimelineTest.java | 4 +- .../SingleTaskBackgroundRunnerTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 2 +- ...ntermediaryDataManagerAutoCleanupTest.java | 148 ++++++++ ...iaryDataManagerManualAddAndDeleteTest.java | 174 +++++++++ .../worker/WorkerTaskManagerTest.java | 1 + .../worker/WorkerTaskMonitorTest.java | 1 + .../indexing/HttpIndexingServiceClient.java | 21 ++ .../indexing/IndexingServiceClient.java | 3 + .../client/indexing/TaskStatusResponse.java | 2 +- .../SegmentLoaderLocalCacheManager.java | 7 +- .../segment/loading/StorageLocation.java | 55 ++- .../loading/StorageLocationConfig.java | 58 +-- .../indexing/NoopIndexingServiceClient.java | 7 + .../SegmentLoaderLocalCacheManagerTest.java | 36 +- .../segment/loading/StorageLocationTest.java | 12 +- .../SegmentManagerThreadSafetyTest.java | 2 +- .../apache/druid/cli/CliMiddleManager.java | 5 +- 33 files changed, 990 insertions(+), 110 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index edf4c22842a..3fd6511c752 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2562,6 +2562,7 @@ public class KafkaIndexTaskTest null, true, null, + null, null ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index bb83c0878ad..4fc7ba1b445 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2747,6 +2747,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport null, true, null, + null, null ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java index 83fa9dbb06b..17b8dc13164 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java @@ -54,7 +54,7 @@ public class SegmentLoaderFactory return new SegmentLoaderLocalCacheManager( indexIO, new SegmentLoaderConfig().withLocations( - Collections.singletonList(new StorageLocationConfig().setPath(storageDir))), + Collections.singletonList(new StorageLocationConfig(storageDir, null, null))), jsonMapper ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 94a7c7d7572..31405fe4e7c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -22,10 +22,13 @@ package org.apache.druid.indexing.common.config; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import org.apache.druid.segment.loading.StorageLocationConfig; import org.joda.time.Period; +import javax.annotation.Nullable; import java.io.File; import java.nio.file.Paths; +import java.util.Collections; import java.util.List; public class TaskConfig @@ -35,7 +38,6 @@ public class TaskConfig ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); - private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M"); @JsonProperty @@ -62,6 +64,9 @@ public class TaskConfig @JsonProperty private final Period directoryLockTimeout; + @JsonProperty + private final List shuffleDataLocations; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -71,7 +76,8 @@ public class TaskConfig @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates, @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart, @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, - @JsonProperty("directoryLockTimeout") Period directoryLockTimeout + @JsonProperty("directoryLockTimeout") Period directoryLockTimeout, + @JsonProperty("shuffleDataLocations") List shuffleDataLocations ) { this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir; @@ -89,6 +95,13 @@ public class TaskConfig this.directoryLockTimeout = directoryLockTimeout == null ? DEFAULT_DIRECTORY_LOCK_TIMEOUT : directoryLockTimeout; + if (shuffleDataLocations == null) { + this.shuffleDataLocations = Collections.singletonList( + new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null) + ); + } else { + this.shuffleDataLocations = shuffleDataLocations; + } } @JsonProperty @@ -154,7 +167,13 @@ public class TaskConfig return directoryLockTimeout; } - private String defaultDir(String configParameter, final String defaultVal) + @JsonProperty + public List getShuffleDataLocations() + { + return shuffleDataLocations; + } + + private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { return Paths.get(getBaseDir(), defaultVal).toString(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 7ea5c8ef717..b1539589ec9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -372,9 +372,7 @@ public class OverlordResource @Path("/taskStatus") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(StateResourceFilter.class) - public Response getMultipleTaskStatuses( - Set taskIds - ) + public Response getMultipleTaskStatuses(Set taskIds) { if (taskIds == null || taskIds.size() == 0) { return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java new file mode 100644 index 00000000000..bfd202e8835 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import com.google.common.collect.Iterators; +import com.google.common.io.Files; +import com.google.inject.Inject; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.TaskStatus; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.loading.StorageLocation; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * This class manages intermediary segments for data shuffle between native parallel index tasks. + * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers + * and phase 2 tasks read those files via HTTP. + * + * The directory where segment files are placed is structured as + * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment. + * + * This class provides interfaces to store, find, and remove segment files. + * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time + * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore. + */ +@ManageLifecycle +public class IntermediaryDataManager +{ + private static final Logger log = new Logger(IntermediaryDataManager.class); + + private final long intermediaryPartitionDiscoveryPeriodSec; + private final long intermediaryPartitionCleanupPeriodSec; + private final Period intermediaryPartitionTimeout; + private final List shuffleDataLocations; + private final IndexingServiceClient indexingServiceClient; + + // supervisorTaskId -> time to check supervisorTask status + // This time is initialized when a new supervisorTask is found and updated whenever a partition is accessed for + // the supervisor. + private final ConcurrentHashMap supervisorTaskCheckTimes = new ConcurrentHashMap<>(); + + // supervisorTaskId -> cyclic iterator of storage locations + private final Map> locationIterators = new HashMap<>(); + + // The overlord is supposed to send a cleanup request as soon as the supervisorTask is finished in parallel indexing, + // but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary + // partitions. + // This can be null until IntermediaryDataManager is started. + @Nullable + private ScheduledExecutorService supervisorTaskChecker; + + @Inject + public IntermediaryDataManager( + WorkerConfig workerConfig, + TaskConfig taskConfig, + IndexingServiceClient indexingServiceClient + ) + { + this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec(); + this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec(); + this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout(); + this.shuffleDataLocations = taskConfig + .getShuffleDataLocations() + .stream() + .map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent())) + .collect(Collectors.toList()); + this.indexingServiceClient = indexingServiceClient; + } + + @LifecycleStart + public void start() + { + supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d"); + // Discover partitions for new supervisorTasks + supervisorTaskChecker.scheduleAtFixedRate( + () -> { + try { + discoverSupervisorTaskPartitions(); + } + catch (Exception e) { + log.warn(e, "Error while discovering supervisorTasks"); + } + }, + intermediaryPartitionDiscoveryPeriodSec, + intermediaryPartitionDiscoveryPeriodSec, + TimeUnit.SECONDS + ); + + supervisorTaskChecker.scheduleAtFixedRate( + () -> { + try { + deleteExpiredSuprevisorTaskPartitionsIfNotRunning(); + } + catch (InterruptedException e) { + log.error(e, "Error while cleaning up partitions for expired supervisors"); + } + catch (Exception e) { + log.warn(e, "Error while cleaning up partitions for expired supervisors"); + } + }, + intermediaryPartitionCleanupPeriodSec, + intermediaryPartitionCleanupPeriodSec, + TimeUnit.SECONDS + ); + } + + @LifecycleStop + public void stop() throws InterruptedException + { + if (supervisorTaskChecker != null) { + supervisorTaskChecker.shutdownNow(); + supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS); + } + supervisorTaskCheckTimes.clear(); + } + + private void discoverSupervisorTaskPartitions() + { + for (StorageLocation location : shuffleDataLocations) { + final MutableInt numDiscovered = new MutableInt(0); + final File[] dirsPerSupervisorTask = location.getPath().listFiles(); + if (dirsPerSupervisorTask != null) { + for (File supervisorTaskDir : dirsPerSupervisorTask) { + final String supervisorTaskId = supervisorTaskDir.getName(); + supervisorTaskCheckTimes.computeIfAbsent( + supervisorTaskId, + k -> { + numDiscovered.increment(); + return DateTimes.nowUtc().plus(intermediaryPartitionTimeout); + } + ); + } + } + log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue()); + } + } + + /** + * Check supervisorTask status if its partitions have not been accessed in timeout and + * delete all partitions for the supervisorTask if it is already finished. + * + * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger + * the self-cleanup for when the cleanup request is missing. + */ + private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException + { + final DateTime now = DateTimes.nowUtc(); + final Set expiredSupervisorTasks = new HashSet<>(); + for (Entry entry : supervisorTaskCheckTimes.entrySet()) { + final String supervisorTaskId = entry.getKey(); + final DateTime checkTime = entry.getValue(); + if (checkTime.isAfter(now)) { + expiredSupervisorTasks.add(supervisorTaskId); + } + } + + log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size()); + + final Map taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks); + for (Entry entry : taskStatuses.entrySet()) { + final String supervisorTaskId = entry.getKey(); + final TaskStatus status = entry.getValue(); + if (status.getStatusCode().isComplete()) { + // If it's finished, clean up all partitions for the supervisor task. + try { + deletePartitions(supervisorTaskId); + } + catch (IOException e) { + log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId); + } + } else { + // If it's still running, update last access time. + supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc()); + } + } + } + + /** + * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per + * supervisorTaskId. + * + * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static + * addSegment method. + */ + public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) + throws IOException + { + final Iterator iterator = locationIterators.computeIfAbsent( + supervisorTaskId, + k -> Iterators.cycle(shuffleDataLocations) + ); + addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile); + } + + public List findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) + { + for (StorageLocation location : shuffleDataLocations) { + final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId); + if (partitionDir.exists()) { + supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc()); + final File[] segmentFiles = partitionDir.listFiles(); + return segmentFiles == null ? Collections.emptyList() : Arrays.asList(segmentFiles); + } + } + + return Collections.emptyList(); + } + + public void deletePartitions(String supervisorTaskId) throws IOException + { + for (StorageLocation location : shuffleDataLocations) { + final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId); + if (supervisorTaskPath.exists()) { + log.info("Cleaning up [%s]", supervisorTaskPath); + for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) { + location.removeFile(eachFile); + } + FileUtils.forceDelete(supervisorTaskPath); + } + } + supervisorTaskCheckTimes.remove(supervisorTaskId); + } + + /** + * Iterate through the given storage locations to find one which can handle the given segment. + */ + public static void addSegment( + Iterator cyclicIterator, + int numLocations, + String supervisorTaskId, + String subTaskId, + DataSegment segment, + File segmentFile + ) throws IOException + { + final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment); + final File destFile = new File( + getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), + subTaskId + ); + FileUtils.forceMkdirParent(destFile); + final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); + if (copiedBytes == 0) { + throw new IOE( + "0 bytes copied after copying a segment file from [%s] to [%s]", + segmentFile.getAbsolutePath(), + destFile.getAbsolutePath() + ); + } + location.addFile(destFile); + } + + private static StorageLocation findLocationForSegment( + Iterator cyclicIterator, + int numLocations, + DataSegment segment + ) + { + for (int i = 0; i < numLocations; i++) { + final StorageLocation location = cyclicIterator.next(); + if (location.canHandle(segment)) { + return location; + } + } + throw new ISE("Can't find location to handle segment[%s]", segment); + } + + private static File getPartitionDir( + StorageLocation location, + String supervisorTaskId, + Interval interval, + int partitionId + ) + { + return FileUtils.getFile( + location.getPath(), + supervisorTaskId, + interval.getStart().toString(), + interval.getEnd().toString(), + String.valueOf(partitionId) + ); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java index e574a0d151c..65d09e7521b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.worker.config; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.server.DruidNode; import org.apache.druid.utils.JvmUtils; +import org.joda.time.Period; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -32,15 +33,24 @@ public class WorkerConfig { @JsonProperty @NotNull - private String ip = DruidNode.getDefaultHost(); + private final String ip = DruidNode.getDefaultHost(); @JsonProperty @NotNull - private String version = "0"; + private final String version = "0"; @JsonProperty @Min(1) - private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1); + private final int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1); + + @JsonProperty + private final long intermediaryPartitionDiscoveryPeriodSec = 60L; + + @JsonProperty + private final long intermediaryPartitionCleanupPeriodSec = 300L; + + @JsonProperty + private final Period intermediaryPartitionTimeout = new Period("P1D"); public String getIp() { @@ -56,4 +66,19 @@ public class WorkerConfig { return capacity; } + + public long getIntermediaryPartitionDiscoveryPeriodSec() + { + return intermediaryPartitionDiscoveryPeriodSec; + } + + public long getIntermediaryPartitionCleanupPeriodSec() + { + return intermediaryPartitionCleanupPeriodSec; + } + + public Period getIntermediaryPartitionTimeout() + { + return intermediaryPartitionTimeout; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java new file mode 100644 index 00000000000..9bf197c20f5 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.http; + +import com.google.common.io.ByteStreams; +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.indexing.worker.IntermediaryDataManager; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.http.security.StateResourceFilter; +import org.joda.time.Interval; + +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.List; + +/** + * HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle + * data. + * + * We use {@link StateResourceFilter} here because it performs an admin-like authorization and + * all endpoints here are supposed to be used for only internal communcation. + * Another possible alternate could be performing datasource-level authorization as in TaskResourceFilter. + * However, datasource information is not available in middleManagers or indexers yet which makes hard to use it. + * We could develop a new ResourceFileter in the future if needed. + */ +@Path("/druid/worker/v1/shuffle") +@ResourceFilters(StateResourceFilter.class) +public class ShuffleResource +{ + private static final Logger log = new Logger(ShuffleResource.class); + + private final IntermediaryDataManager intermediaryDataManager; + + @Inject + public ShuffleResource(IntermediaryDataManager intermediaryDataManager) + { + this.intermediaryDataManager = intermediaryDataManager; + } + + @GET + @Path("/task/{supervisorTaskId}/partition") + @Produces(MediaType.APPLICATION_OCTET_STREAM) + public Response getPartition( + @PathParam("supervisorTaskId") String supervisorTaskId, + @QueryParam("startTime") String startTime, + @QueryParam("endTime") String endTime, + @QueryParam("partitionId") int partitionId + ) + { + final Interval interval = new Interval(DateTimes.of(startTime), DateTimes.of(endTime)); + final List partitionFiles = intermediaryDataManager.findPartitionFiles( + supervisorTaskId, + interval, + partitionId + ); + + if (partitionFiles.isEmpty()) { + final String errorMessage = StringUtils.format( + "Can't find the partition for supervisor[%s], interval[%s], and partitionId[%s]", + supervisorTaskId, + interval, + partitionId + ); + return Response.status(Status.NOT_FOUND).entity(errorMessage).build(); + } else { + return Response.ok( + (StreamingOutput) output -> { + for (File partitionFile : partitionFiles) { + try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) { + ByteStreams.copy(fileInputStream, output); + } + } + } + ).build(); + } + } + + @DELETE + @Path("/task/{supervisorTaskId}") + public Response deletePartitions(@PathParam("supervisorTaskId") String supervisorTaskId) + { + try { + intermediaryDataManager.deletePartitions(supervisorTaskId); + return Response.ok(supervisorTaskId).build(); + } + catch (IOException e) { + log.error(e, "Error while deleting partitions of supervisorTask[%s]", supervisorTaskId); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build(); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index 0966d1b84c9..0575afe585e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -95,7 +95,7 @@ public class TaskToolboxTest EasyMock.replay(task, mockHandoffNotifierFactory); taskToolbox = new TaskToolboxFactory( - new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null), + new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null), mockTaskActionClientFactory, mockEmitter, mockSegmentPusher, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index d675069b102..7628dea7536 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1519,7 +1519,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest return result; } }; - final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null); + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index f157513d709..9e5ab4c1216 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -346,16 +346,7 @@ public class CompactionTaskRunTest extends IngestionTestBase @Override public List getLocations() { - return ImmutableList.of( - new StorageLocationConfig() - { - @Override - public File getPath() - { - return deepStorageDir; - } - } - ); + return ImmutableList.of(new StorageLocationConfig(deepStorageDir, null, null)); } }, objectMapper diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java index c77f063228f..64d3dd80113 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java @@ -78,6 +78,7 @@ public class HadoopTaskTest ImmutableList.of("something:hadoop:1"), false, null, + null, null )).once(); EasyMock.replay(toolbox); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 4bae4c8ed75..a8f02e603ca 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -204,7 +204,7 @@ public class IndexTaskTest public List getLocations() { return Collections.singletonList( - new StorageLocationConfig().setPath(cacheDir) + new StorageLocationConfig(cacheDir, null, null) ); } }, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index daeeaab63bb..316327b7fb7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -881,7 +881,7 @@ public class RealtimeIndexTaskTest final File directory ) { - final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null); + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); try { taskStorage.insert(task, TaskStatus.running(task.getId())); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 70e5544b00e..f17274a5ade 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -84,10 +84,14 @@ import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.easymock.EasyMock; import org.joda.time.Interval; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -115,6 +119,9 @@ public class IngestSegmentFirehoseFactoryTest private static final TaskLockbox TASK_LOCKBOX; private static final Task TASK; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + static { TestUtils testUtils = new TestUtils(); MAPPER = setupInjectablesInObjectMapper(TestHelper.makeJsonMapper()); @@ -299,6 +306,7 @@ public class IngestSegmentFirehoseFactoryTest private final FirehoseFactory factory; private final InputRowParser rowParser; + private File tempDir; private static final InputRowParser> ROW_PARSER = new MapInputRowParser( new TimeAndDimsParseSpec( @@ -376,6 +384,18 @@ public class IngestSegmentFirehoseFactoryTest } } + @Before + public void setup() throws IOException + { + tempDir = temporaryFolder.newFolder(); + } + + @After + public void teardown() + { + tempDir.delete(); + } + @Test public void sanityTest() { @@ -402,7 +422,7 @@ public class IngestSegmentFirehoseFactoryTest { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final Firehose firehose = factory.connect(rowParser, null)) { + try (final Firehose firehose = factory.connect(rowParser, tmpDir)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); @@ -432,7 +452,7 @@ public class IngestSegmentFirehoseFactoryTest ); int skipped = 0; try (final Firehose firehose = - factory.connect(transformSpec.decorate(rowParser), null)) { + factory.connect(transformSpec.decorate(rowParser), tmpDir)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); if (row == null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 119097b10e4..4944d276522 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -148,7 +148,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest int count = 0; long sum = 0; - try (final Firehose firehose = factory.connect(ROW_PARSER, null)) { + try (final Firehose firehose = factory.connect(ROW_PARSER, tmpDir)) { while (firehose.hasMore()) { final InputRow row = firehose.nextRow(); count++; @@ -176,7 +176,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest for (InputSplit> split : splits) { final FiniteFirehoseFactory> splitFactory = factory.withSplit(split); - try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) { + try (final Firehose firehose = splitFactory.connect(ROW_PARSER, tmpDir)) { while (firehose.hasMore()) { final InputRow row = firehose.nextRow(); count++; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 49315d3973d..a16a80a244d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -76,6 +76,7 @@ public class SingleTaskBackgroundRunnerTest null, true, null, + null, null ); final ServiceEmitter emitter = new NoopServiceEmitter(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index d8c0af320a9..8139f8dcae1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -540,7 +540,7 @@ public class TaskLifecycleTest new TaskAuditLogConfig(true) ); File tmpDir = temporaryFolder.newFolder(); - taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); + taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null); SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java new file mode 100644 index 00000000000..c158c0e33a5 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import com.amazonaws.util.StringUtils; +import com.google.common.collect.ImmutableList; +import org.apache.commons.io.FileUtils; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.client.indexing.TaskStatus; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class IntermediaryDataManagerAutoCleanupTest +{ + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + + private IntermediaryDataManager intermediaryDataManager; + + @Before + public void setup() throws IOException + { + final WorkerConfig workerConfig = new WorkerConfig() + { + @Override + public long getIntermediaryPartitionDiscoveryPeriodSec() + { + return 1; + } + + @Override + public long getIntermediaryPartitionCleanupPeriodSec() + { + return 2; + } + + @Override + public Period getIntermediaryPartitionTimeout() + { + return new Period("PT2S"); + } + + }; + final TaskConfig taskConfig = new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)) + ); + final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() + { + @Override + public Map getTaskStatuses(Set taskIds) + { + final Map result = new HashMap<>(); + for (String taskId : taskIds) { + result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10)); + } + return result; + } + }; + intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + intermediaryDataManager.start(); + } + + @After + public void teardown() throws InterruptedException + { + intermediaryDataManager.stop(); + } + + @Test + public void testCleanup() throws IOException, InterruptedException + { + final String supervisorTaskId = "supervisorTaskId"; + final Interval interval = Intervals.of("2018/2019"); + final File segmentFile = generateSegmentFile(); + final DataSegment segment = newSegment(interval, 0); + intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); + + Thread.sleep(8000); + Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty()); + } + + private File generateSegmentFile() throws IOException + { + final File segmentFile = tempDir.newFile(); + FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); + return segmentFile; + } + + private DataSegment newSegment(Interval interval, int partitionId) + { + return new DataSegment( + "dataSource", + interval, + "version", + null, + null, + null, + new NumberedShardSpec(partitionId, 0), + 9, + 10 + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java new file mode 100644 index 00000000000..7bd9e906a96 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import com.amazonaws.util.StringUtils; +import com.google.common.collect.ImmutableList; +import org.apache.commons.io.FileUtils; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Comparator; +import java.util.List; + +public class IntermediaryDataManagerManualAddAndDeleteTest +{ + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private IntermediaryDataManager intermediaryDataManager; + + @Before + public void setup() throws IOException + { + final WorkerConfig workerConfig = new WorkerConfig(); + final TaskConfig taskConfig = new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null)) + ); + final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); + intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + intermediaryDataManager.start(); + } + + @After + public void teardown() throws InterruptedException + { + intermediaryDataManager.stop(); + } + + @Test + public void testAddSegmentFailure() throws IOException + { + for (int i = 0; i < 15; i++) { + File segmentFile = generateSegmentFile(); + DataSegment segment = newSegment(Intervals.of("2018/2019"), i); + intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); + } + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Can't find location to handle segment"); + File segmentFile = generateSegmentFile(); + DataSegment segment = newSegment(Intervals.of("2018/2019"), 16); + intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); + } + + @Test + public void testFindPartitionFiles() throws IOException + { + final String supervisorTaskId = "supervisorTaskId"; + final Interval interval = Intervals.of("2018/2019"); + final int partitionId = 0; + for (int i = 0; i < 10; i++) { + final File segmentFile = generateSegmentFile(); + final DataSegment segment = newSegment(interval, partitionId); + intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile); + } + final List files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId); + Assert.assertEquals(10, files.size()); + files.sort(Comparator.comparing(File::getName)); + for (int i = 0; i < 10; i++) { + Assert.assertEquals("subTaskId_" + i, files.get(i).getName()); + } + } + + @Test + public void deletePartitions() throws IOException + { + final String supervisorTaskId = "supervisorTaskId"; + final Interval interval = Intervals.of("2018/2019"); + for (int partitionId = 0; partitionId < 5; partitionId++) { + for (int subTaskId = 0; subTaskId < 3; subTaskId++) { + final File segmentFile = generateSegmentFile(); + final DataSegment segment = newSegment(interval, partitionId); + intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile); + } + } + + intermediaryDataManager.deletePartitions(supervisorTaskId); + + for (int partitionId = 0; partitionId < 5; partitionId++) { + Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty()); + } + } + + @Test + public void testAddRemoveAdd() throws IOException + { + final String supervisorTaskId = "supervisorTaskId"; + final Interval interval = Intervals.of("2018/2019"); + for (int i = 0; i < 15; i++) { + File segmentFile = generateSegmentFile(); + DataSegment segment = newSegment(interval, i); + intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); + } + intermediaryDataManager.deletePartitions(supervisorTaskId); + File segmentFile = generateSegmentFile(); + DataSegment segment = newSegment(interval, 16); + intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); + } + + private File generateSegmentFile() throws IOException + { + final File segmentFile = tempDir.newFile(); + FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); + return segmentFile; + } + + private DataSegment newSegment(Interval interval, int partitionId) + { + return new DataSegment( + "dataSource", + interval, + "version", + null, + null, + null, + new NumberedShardSpec(partitionId, 0), + 9, + 10 + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index b86b654847a..635bf4ad6b5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -85,6 +85,7 @@ public class WorkerTaskManagerTest null, false, null, + null, null ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 4afdd3cf9d5..8c22aaa85a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -153,6 +153,7 @@ public class WorkerTaskMonitorTest null, false, null, + null, null ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 7588da780fb..2b609c20355 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -270,6 +270,27 @@ public class HttpIndexingServiceClient implements IndexingServiceClient } } + @Override + public Map getTaskStatuses(Set taskIds) throws InterruptedException + { + try { + final FullResponseHolder responseHolder = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus") + .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskIds)) + ); + + return jsonMapper.readValue( + responseHolder.getContent(), + new TypeReference>() + { + } + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override @Nullable public TaskStatusPlus getLastCompleteTask() diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index f2299c9d2ee..39fd93fa772 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -27,6 +27,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Set; public interface IndexingServiceClient { @@ -55,6 +56,8 @@ public interface IndexingServiceClient TaskStatusResponse getTaskStatus(String taskId); + Map getTaskStatuses(Set taskIds) throws InterruptedException; + @Nullable TaskStatusPlus getLastCompleteTask(); diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java b/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java index 08a85193f2d..247ecabd53f 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java +++ b/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java @@ -33,7 +33,7 @@ public class TaskStatusResponse { private final String task; // Task ID, named "task" in the JSONification of this class. @Nullable - private final TaskStatusPlus status; + private final TaskStatusPlus status; // null for unknown tasks @JsonCreator public TaskStatusResponse( diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 327db5d2cdd..b25c2163f71 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -163,8 +163,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); } - loc.addSegment(segment); - return new File(loc.getPath(), storageDir); + final File localStorageDir = new File(loc.getPath(), storageDir); + loc.addSegmentDir(localStorageDir, segment); + return localStorageDir; } finally { unlock(segment, lock); @@ -270,7 +271,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader // Druid creates folders of the form dataSource/interval/version/partitionNum. // We need to clean up all these directories if they are all empty. cleanupCacheFiles(location.getPath(), localStorageDir); - location.removeSegment(segment); + location.removeSegmentDir(localStorageDir, segment); } } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index e91ffee25cc..4fb4c4ea0bc 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -19,6 +19,8 @@ package org.apache.druid.segment.loading; +import org.apache.commons.io.FileUtils; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; @@ -29,18 +31,18 @@ import java.util.Set; /** */ -class StorageLocation +public class StorageLocation { private static final Logger log = new Logger(StorageLocation.class); private final File path; private final long maxSize; private final long freeSpaceToKeep; - private final Set segments; + private final Set files = new HashSet<>(); private volatile long currSize = 0; - StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent) + public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent) { this.path = path; this.maxSize = maxSize; @@ -57,35 +59,62 @@ class StorageLocation } else { this.freeSpaceToKeep = 0; } - - this.segments = new HashSet<>(); } - File getPath() + public File getPath() { return path; } - long getMaxSize() + public long getMaxSize() { return maxSize; } - synchronized void addSegment(DataSegment segment) + /** + * Add a new file to this location. The given file argument must be a file rather than directory. + */ + public synchronized void addFile(File file) { - if (segments.add(segment)) { + if (file.isDirectory()) { + throw new ISE("[%s] must be a file. Use a"); + } + if (files.add(file)) { + currSize += FileUtils.sizeOf(file); + } + } + + /** + * Add a new segment dir to this location. The segment size is added to currSize. + */ + public synchronized void addSegmentDir(File segmentDir, DataSegment segment) + { + if (files.add(segmentDir)) { currSize += segment.getSize(); } } - synchronized void removeSegment(DataSegment segment) + /** + * Remove a segment file from this location. The given file argument must be a file rather than directory. + */ + public synchronized void removeFile(File file) { - if (segments.remove(segment)) { + if (files.remove(file)) { + currSize -= FileUtils.sizeOf(file); + } + } + + /** + * Remove a segment dir from this location. The segment size is subtracted from currSize. + */ + public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) + { + if (files.remove(segmentDir)) { currSize -= segment.getSize(); } } - boolean canHandle(DataSegment segment) + public boolean canHandle(DataSegment segment) { if (available() < segment.getSize()) { log.warn( @@ -114,7 +143,7 @@ class StorageLocation return true; } - synchronized long available() + public synchronized long available() { return maxSize - currSize; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java index 88779406dab..bbfd7ed9d91 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java @@ -19,66 +19,66 @@ package org.apache.druid.segment.loading; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; +import javax.annotation.Nullable; import java.io.File; /** */ public class StorageLocationConfig { - @JsonProperty - @NotNull - private File path = null; + private final File path; + private final long maxSize; + @Nullable + private final Double freeSpacePercent; + + @JsonCreator + public StorageLocationConfig( + @JsonProperty("path") File path, + @JsonProperty("maxSize") @Nullable Long maxSize, + @JsonProperty("freeSpacePercent") @Nullable Double freeSpacePercent + ) + { + this.path = Preconditions.checkNotNull(path, "path"); + this.maxSize = maxSize == null ? Long.MAX_VALUE : maxSize; + this.freeSpacePercent = freeSpacePercent; + Preconditions.checkArgument(this.maxSize > 0, "maxSize[%s] should be positive", this.maxSize); + Preconditions.checkArgument( + this.freeSpacePercent == null || this.freeSpacePercent >= 0, + "freeSpacePercent[%s] should be 0 or a positive double", + this.freeSpacePercent + ); + } @JsonProperty - @Min(1) - private long maxSize = Long.MAX_VALUE; - - @JsonProperty - private Double freeSpacePercent; - public File getPath() { return path; } - public StorageLocationConfig setPath(File path) - { - this.path = path; - return this; - } - + @JsonProperty public long getMaxSize() { return maxSize; } - public StorageLocationConfig setMaxSize(long maxSize) - { - this.maxSize = maxSize; - return this; - } - + @JsonProperty + @Nullable public Double getFreeSpacePercent() { return freeSpacePercent; } - public StorageLocationConfig setFreeSpacePercent(Double freeSpacePercent) - { - this.freeSpacePercent = freeSpacePercent; - return this; - } - @Override public String toString() { return "StorageLocationConfig{" + "path=" + path + ", maxSize=" + maxSize + + ", freeSpacePercent=" + freeSpacePercent + '}'; } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 8908173fd7d..794ea08ad05 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; public class NoopIndexingServiceClient implements IndexingServiceClient { @@ -85,6 +86,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient return new TaskStatusResponse(taskId, null); } + @Override + public Map getTaskStatuses(Set taskIds) + { + return Collections.emptyMap(); + } + @Nullable @Override public TaskStatusPlus getLastCompleteTask() diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 5a54f8ad4d6..cb38d7b1dc9 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -88,9 +88,7 @@ public class SegmentLoaderLocalCacheManagerTest localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder"); final List locations = new ArrayList<>(); - final StorageLocationConfig locationConfig = new StorageLocationConfig(); - locationConfig.setPath(localSegmentCacheFolder); - locationConfig.setMaxSize(10000000000L); + final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 10000000000L, null); locations.add(locationConfig); manager = new SegmentLoaderLocalCacheManager( @@ -157,14 +155,10 @@ public class SegmentLoaderLocalCacheManagerTest final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); final List locations = new ArrayList<>(); - final StorageLocationConfig locationConfig = new StorageLocationConfig(); - locationConfig.setPath(localStorageFolder); - locationConfig.setMaxSize(10000000000L); + final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10000000000L, null); locations.add(locationConfig); - final StorageLocationConfig locationConfig2 = new StorageLocationConfig(); final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2"); - locationConfig2.setPath(localStorageFolder2); - locationConfig2.setMaxSize(1000000000L); + final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 1000000000L, null); locations.add(locationConfig2); manager = new SegmentLoaderLocalCacheManager( @@ -207,17 +201,13 @@ public class SegmentLoaderLocalCacheManagerTest public void testRetrySuccessAtSecondLocation() throws Exception { final List locations = new ArrayList<>(); - final StorageLocationConfig locationConfig = new StorageLocationConfig(); final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); // mock can't write in first location localStorageFolder.setWritable(false); - locationConfig.setPath(localStorageFolder); - locationConfig.setMaxSize(1000000000L); + final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null); locations.add(locationConfig); - final StorageLocationConfig locationConfig2 = new StorageLocationConfig(); final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2"); - locationConfig2.setPath(localStorageFolder2); - locationConfig2.setMaxSize(10000000L); + final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null); locations.add(locationConfig2); manager = new SegmentLoaderLocalCacheManager( @@ -260,19 +250,15 @@ public class SegmentLoaderLocalCacheManagerTest public void testRetryAllFail() throws Exception { final List locations = new ArrayList<>(); - final StorageLocationConfig locationConfig = new StorageLocationConfig(); final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); // mock can't write in first location localStorageFolder.setWritable(false); - locationConfig.setPath(localStorageFolder); - locationConfig.setMaxSize(1000000000L); + final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null); locations.add(locationConfig); - final StorageLocationConfig locationConfig2 = new StorageLocationConfig(); final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2"); // mock can't write in second location localStorageFolder2.setWritable(false); - locationConfig2.setPath(localStorageFolder2); - locationConfig2.setMaxSize(10000000L); + final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null); locations.add(locationConfig2); manager = new SegmentLoaderLocalCacheManager( @@ -316,17 +302,13 @@ public class SegmentLoaderLocalCacheManagerTest public void testEmptyToFullOrder() throws Exception { final List locations = new ArrayList<>(); - final StorageLocationConfig locationConfig = new StorageLocationConfig(); final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); localStorageFolder.setWritable(true); - locationConfig.setPath(localStorageFolder); - locationConfig.setMaxSize(10L); + final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10L, null); locations.add(locationConfig); - final StorageLocationConfig locationConfig2 = new StorageLocationConfig(); final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2"); localStorageFolder2.setWritable(true); - locationConfig2.setPath(localStorageFolder2); - locationConfig2.setMaxSize(10L); + final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10L, null); locations.add(locationConfig2); manager = new SegmentLoaderLocalCacheManager( diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index cc6b6fe960a..cdfcd47ce8d 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -71,25 +71,25 @@ public class StorageLocationTest final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23); - loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10)); + loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail -= 10; verifyLoc(expectedAvail, loc); - loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10)); + loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.addSegment(secondSegment); + loc.addSegmentDir(new File("test2"), secondSegment); expectedAvail -= 23; verifyLoc(expectedAvail, loc); - loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail += 10; verifyLoc(expectedAvail, loc); - loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.removeSegment(secondSegment); + loc.removeSegmentDir(new File("test2"), secondSegment); expectedAvail += 23; verifyLoc(expectedAvail, loc); } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java index 649c8b23692..fdd89d1962c 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -105,7 +105,7 @@ public class SegmentManagerThreadSafetyTest public List getLocations() { return Collections.singletonList( - new StorageLocationConfig().setPath(segmentCacheDir) + new StorageLocationConfig(segmentCacheDir, null, null) ); } }, diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index eeb8ae1f0fa..68d374c2b01 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -29,6 +29,7 @@ import com.google.inject.multibindings.MapBinder; import com.google.inject.name.Names; import com.google.inject.util.Providers; import io.airlift.airline.Command; +import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; @@ -53,6 +54,7 @@ import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; import org.apache.druid.indexing.worker.WorkerTaskMonitor; import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.indexing.worker.http.ShuffleResource; import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.WorkerResource; import org.apache.druid.java.util.common.logger.Logger; @@ -101,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); - binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null)); + binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class); binder.bind(new TypeLiteral>() {}) .toProvider(Providers.of(null)); binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); @@ -129,6 +131,7 @@ public class CliMiddleManager extends ServerRunnable .in(LazySingleton.class); Jerseys.addResource(binder, WorkerResource.class); Jerseys.addResource(binder, TaskManagementResource.class); + Jerseys.addResource(binder, ShuffleResource.class); LifecycleModule.register(binder, Server.class);