diff --git a/core/pom.xml b/core/pom.xml
index efec90a7b05..e0ccfa1a04b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -331,6 +331,12 @@
com.google.errorprone
error_prone_annotations
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index ac926bee749..2ccd5db3ca6 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -29,5 +29,10 @@ public interface Monitor
void stop();
+ /**
+ * Emit metrics using the given emitter.
+ *
+ * @return true if this monitor needs to continue monitoring. False otherwise.
+ */
boolean monitor(ServiceEmitter emitter);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 118f283ac2c..2adbe9510a3 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
@@ -93,6 +94,17 @@ public class MonitorScheduler
}
}
+ /**
+ * Returns a {@link Monitor} instance of the given class if any. Note that this method searches for the monitor
+ * from the current snapshot of {@link #monitors}.
+ */
+ public Optional findMonitor(Class monitorClass)
+ {
+ synchronized (lock) {
+ return (Optional) monitors.stream().filter(m -> m.getClass() == monitorClass).findFirst();
+ }
+ }
+
@LifecycleStop
public void stop()
{
diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
new file mode 100644
index 00000000000..76671968b91
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Optional;
+
+public class MonitorSchedulerTest
+{
+ @Test
+ public void testFindMonitor()
+ {
+ class Monitor1 extends NoopMonitor
+ {
+ }
+ class Monitor2 extends NoopMonitor
+ {
+ }
+ class Monitor3 extends NoopMonitor
+ {
+ }
+
+ final Monitor1 monitor1 = new Monitor1();
+ final Monitor2 monitor2 = new Monitor2();
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ Mockito.mock(MonitorSchedulerConfig.class),
+ Execs.scheduledSingleThreaded("monitor-scheduler-test"),
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor1, monitor2)
+ );
+
+ final Optional maybeFound1 = scheduler.findMonitor(Monitor1.class);
+ final Optional maybeFound2 = scheduler.findMonitor(Monitor2.class);
+ Assert.assertTrue(maybeFound1.isPresent());
+ Assert.assertTrue(maybeFound2.isPresent());
+ Assert.assertSame(monitor1, maybeFound1.get());
+ Assert.assertSame(monitor2, maybeFound2.get());
+
+ Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
+ }
+
+ private static class NoopMonitor implements Monitor
+ {
+ @Override
+ public void start()
+ {
+
+ }
+
+ @Override
+ public void stop()
+ {
+
+ }
+
+ @Override
+ public boolean monitor(ServiceEmitter emitter)
+ {
+ return true;
+ }
+ }
+}
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 1b4ed7fd135..95bc3453c82 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -181,7 +181,7 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
-### Indexing service
+## Indexing service
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
@@ -202,6 +202,16 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
+## Shuffle metrics (Native parallel task)
+
+The shuffle metrics can be enabled by adding `org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in `druid.monitoring.monitors`
+See [Enabling Metrics](../configuration/index.md#enabling-metrics) for more details.
+
+|Metric|Description|Dimensions|Normal Value|
+|------|-----------|----------|------------|
+|`ingest/shuffle/bytes`|Number of bytes shuffled per emission period.|supervisorTaskId|Varies|
+|`ingest/shuffle/requests`|Number of shuffle requests per emission period.|supervisorTaskId|Varies|
+
## Coordination
These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic.
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index cea66e770c3..d00de06aa8d 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -278,6 +278,12 @@
system-rules
test
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 5f6f392f563..cbdfc68975d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -42,7 +42,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 8c883a4e1bc..2ef1f88ac0a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -41,7 +41,7 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index acac2792ab8..d5d8a8f13e7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -62,6 +62,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRun
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@@ -517,7 +518,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
* - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key).
- * - Partitioned data is maintained by {@link org.apache.druid.indexing.worker.IntermediaryDataManager}.
+ * - Partitioned data is maintained by {@link IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
* or indexer) and merges them to create the final segments.
*/
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index ff0090c27f3..3b1f7ba4cd0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
+import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
@@ -46,7 +47,7 @@ import java.util.stream.Collectors;
/**
* The worker task of {@link PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by
* hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are
- * stored in local storage using {@link org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}.
+ * stored in local storage using {@link ShuffleDataSegmentPusher}.
*/
public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 89b9f80fcf4..57978f4b8af 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -31,7 +31,7 @@ import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
-import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
+import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 27160c96303..ec8530bcf20 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -30,7 +30,7 @@ import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
-import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
+import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
index 4395c7c6f75..b6ea7aad6cb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
@@ -19,6 +19,8 @@
package org.apache.druid.indexing.common.task.batch.parallel;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
+
import java.io.File;
import java.io.IOException;
@@ -27,7 +29,7 @@ import java.io.IOException;
* The only available implementation for production code is {@link HttpShuffleClient} and
* this interface is more for easier testing.
*
- * @see org.apache.druid.indexing.worker.IntermediaryDataManager
+ * @see IntermediaryDataManager
* @see PartialSegmentMergeTask
*/
public interface ShuffleClient
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
similarity index 98%
rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
rename to indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
index 408fba5fa6e..9afa1c03d3e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.Iterators;
import com.google.common.io.Files;
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -67,7 +68,7 @@ import java.util.stream.IntStream;
/**
* This class manages intermediary segments for data shuffle between native parallel index tasks.
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
- * and phase 2 tasks read those files via HTTP.
+ * and phase 2 tasks read those files over HTTP.
*
* The directory where segment files are placed is structured as
* {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
@@ -100,7 +101,7 @@ public class IntermediaryDataManager
// but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
// partitions.
// This can be null until IntermediaryDataManager is started.
- @Nullable
+ @MonotonicNonNull
private ScheduledExecutorService supervisorTaskChecker;
@Inject
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
similarity index 97%
rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
rename to indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
index fcbdf9d6c3f..6bc83ba17ba 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java
new file mode 100644
index 00000000000..500d8e96a8d
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Shuffle metrcis for middleManagers and indexers. This class is thread-safe because shuffle can be performed by
+ * multiple HTTP threads while a monitoring thread periodically emits the snapshot of metrics.
+ *
+ * @see ShuffleResource
+ * @see org.apache.druid.java.util.metrics.MonitorScheduler
+ */
+public class ShuffleMetrics
+{
+ /**
+ * This lock is used to synchronize accesses to the reference to {@link #datasourceMetrics} and the
+ * {@link PerDatasourceShuffleMetrics} values of the map. This means,
+ *
+ * - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key as well) should be synchronized
+ * under this lock.
+ * - Any updates on the reference to datasourceMetrics should be synchronized under this lock.
+ */
+ private final Object lock = new Object();
+
+ /**
+ * A map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. This map is replaced with an empty map
+ * whenever a snapshot is taken since the map can keep growing over time otherwise. For concurrent access pattern,
+ * see {@link #shuffleRequested} and {@link #snapshotAndReset()}.
+ */
+ @GuardedBy("lock")
+ private Map datasourceMetrics = new HashMap<>();
+
+ /**
+ * This method is called whenever a new shuffle is requested. Multiple tasks can request shuffle at the same time,
+ * while the monitoring thread takes a snapshot of the metrics. There is a happens-before relationship between
+ * shuffleRequested and {@link #snapshotAndReset()}.
+ */
+ public void shuffleRequested(String supervisorTaskId, long fileLength)
+ {
+ synchronized (lock) {
+ datasourceMetrics.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics())
+ .accumulate(fileLength);
+ }
+ }
+
+ /**
+ * This method is called whenever the monitoring thread takes a snapshot of the current metrics.
+ * {@link #datasourceMetrics} will be reset to an empty map after this call. This is to return the snapshot
+ * metrics collected during the monitornig period. There is a happens-before relationship between snapshotAndReset()
+ * and {@link #shuffleRequested}.
+ */
+ public Map snapshotAndReset()
+ {
+ synchronized (lock) {
+ final Map snapshot = Collections.unmodifiableMap(datasourceMetrics);
+ datasourceMetrics = new HashMap<>();
+ return snapshot;
+ }
+ }
+
+ /**
+ * This method is visible only for testing. Use {@link #snapshotAndReset()} instead to get the current snapshot.
+ */
+ @VisibleForTesting
+ Map getDatasourceMetrics()
+ {
+ synchronized (lock) {
+ return datasourceMetrics;
+ }
+ }
+
+ /**
+ * This class represents shuffle metrics of one datasource. This class is not thread-safe and should never be accessed
+ * by multiple threads at the same time.
+ */
+ public static class PerDatasourceShuffleMetrics
+ {
+ private long shuffleBytes;
+ private int shuffleRequests;
+
+ @VisibleForTesting
+ void accumulate(long shuffleBytes)
+ {
+ this.shuffleBytes += shuffleBytes;
+ this.shuffleRequests++;
+ }
+
+ public long getShuffleBytes()
+ {
+ return shuffleBytes;
+ }
+
+ public int getShuffleRequests()
+ {
+ return shuffleRequests;
+ }
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java
new file mode 100644
index 00000000000..1c2afb15e6d
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import org.apache.druid.guice.Jerseys;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.metrics.MonitorScheduler;
+
+import java.util.Optional;
+
+public class ShuffleModule implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ Jerseys.addResource(binder, ShuffleResource.class);
+ }
+
+ /**
+ * {@link ShuffleMetrics} is used in {@link ShuffleResource} and {@link ShuffleMonitor} to collect metrics
+ * and report them, respectively. Unlike ShuffleResource, ShuffleMonitor can be created via a user config
+ * ({@link org.apache.druid.server.metrics.MonitorsConfig}) in potentially any node types, where it is not
+ * possible to create ShuffleMetrics. This method checks the {@link MonitorScheduler} if ShuffleMonitor is
+ * registered on it, and sets the proper ShuffleMetrics.
+ */
+ @Provides
+ @LazySingleton
+ public Optional getShuffleMetrics(MonitorScheduler monitorScheduler)
+ {
+ // ShuffleMonitor cannot be registered dynamically, but can only via the static configuration (MonitorsConfig).
+ // As a result, it is safe to check only one time if it is registered in MonitorScheduler.
+ final Optional maybeMonitor = monitorScheduler.findMonitor(ShuffleMonitor.class);
+ if (maybeMonitor.isPresent()) {
+ final ShuffleMetrics metrics = new ShuffleMetrics();
+ maybeMonitor.get().setShuffleMetrics(metrics);
+ return Optional.of(metrics);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java
new file mode 100644
index 00000000000..6157698d627
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+import java.util.Map;
+
+public class ShuffleMonitor extends AbstractMonitor
+{
+ static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId";
+ static final String SHUFFLE_BYTES_KEY = "ingest/shuffle/bytes";
+ static final String SHUFFLE_REQUESTS_KEY = "ingest/shuffle/requests";
+
+ /**
+ * ShuffleMonitor can be instantiated in any node types if it is defined in
+ * {@link org.apache.druid.server.metrics.MonitorsConfig}. Since {@link ShuffleMetrics} is defined
+ * in the `indexing-service` module, some node types (such as broker) would fail to create it
+ * if they don't have required dependencies. To avoid this problem, this variable is lazily initialized
+ * only in the node types which has the {@link ShuffleModule}.
+ */
+ @MonotonicNonNull
+ private ShuffleMetrics shuffleMetrics;
+
+ public void setShuffleMetrics(ShuffleMetrics shuffleMetrics)
+ {
+ this.shuffleMetrics = shuffleMetrics;
+ }
+
+ @Override
+ public boolean doMonitor(ServiceEmitter emitter)
+ {
+ if (shuffleMetrics != null) {
+ final Map snapshot = shuffleMetrics.snapshotAndReset();
+ snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> {
+ final Builder metricBuilder = ServiceMetricEvent
+ .builder()
+ .setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId);
+ emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes()));
+ emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests()));
+ });
+ }
+ return true;
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
similarity index 92%
rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
rename to indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
index 0e0e9364e21..dd885a2ab0d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
@@ -17,12 +17,11 @@
* under the License.
*/
-package org.apache.druid.indexing.worker.http;
+package org.apache.druid.indexing.worker.shuffle;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -42,6 +41,7 @@ import javax.ws.rs.core.StreamingOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.util.Optional;
/**
* HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle
@@ -60,11 +60,13 @@ public class ShuffleResource
private static final Logger log = new Logger(ShuffleResource.class);
private final IntermediaryDataManager intermediaryDataManager;
+ private final Optional shuffleMetrics;
@Inject
- public ShuffleResource(IntermediaryDataManager intermediaryDataManager)
+ public ShuffleResource(IntermediaryDataManager intermediaryDataManager, Optional shuffleMetrics)
{
this.intermediaryDataManager = intermediaryDataManager;
+ this.shuffleMetrics = shuffleMetrics;
}
@GET
@@ -96,6 +98,7 @@ public class ShuffleResource
);
return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
} else {
+ shuffleMetrics.ifPresent(metrics -> metrics.shuffleRequested(supervisorTaskId, partitionFile.length()));
return Response.ok(
(StreamingOutput) output -> {
try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index af221602aa8..d87d1f09804 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -64,8 +64,8 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
similarity index 98%
rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
index 7d0233b6b16..dcce4809c4a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
similarity index 99%
rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
index 15aad92b6a3..4db1b39b809 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
similarity index 98%
rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
index 15319263396..0604742f100 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java
new file mode 100644
index 00000000000..d4f625893b4
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ShuffleMetricsTest
+{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testShuffleRequested()
+ {
+ final ShuffleMetrics metrics = new ShuffleMetrics();
+ final String supervisorTask1 = "supervisor1";
+ final String supervisorTask2 = "supervisor2";
+ final String supervisorTask3 = "supervisor3";
+ metrics.shuffleRequested(supervisorTask1, 1024);
+ metrics.shuffleRequested(supervisorTask2, 10);
+ metrics.shuffleRequested(supervisorTask1, 512);
+ metrics.shuffleRequested(supervisorTask3, 10000);
+ metrics.shuffleRequested(supervisorTask2, 30);
+
+ final Map snapshot = metrics.snapshotAndReset();
+ Assert.assertEquals(ImmutableSet.of(supervisorTask1, supervisorTask2, supervisorTask3), snapshot.keySet());
+
+ PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = snapshot.get(supervisorTask1);
+ Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests());
+ Assert.assertEquals(1536, perDatasourceShuffleMetrics.getShuffleBytes());
+
+ perDatasourceShuffleMetrics = snapshot.get(supervisorTask2);
+ Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests());
+ Assert.assertEquals(40, perDatasourceShuffleMetrics.getShuffleBytes());
+
+ perDatasourceShuffleMetrics = snapshot.get(supervisorTask3);
+ Assert.assertEquals(1, perDatasourceShuffleMetrics.getShuffleRequests());
+ Assert.assertEquals(10000, perDatasourceShuffleMetrics.getShuffleBytes());
+ }
+
+ @Test
+ public void testSnapshotUnmodifiable()
+ {
+ expectedException.expect(UnsupportedOperationException.class);
+ new ShuffleMetrics().snapshotAndReset().put("k", new PerDatasourceShuffleMetrics());
+ }
+
+ @Test
+ public void testResetDatasourceMetricsAfterSnapshot()
+ {
+ final ShuffleMetrics shuffleMetrics = new ShuffleMetrics();
+ shuffleMetrics.shuffleRequested("supervisor", 10);
+ shuffleMetrics.shuffleRequested("supervisor", 10);
+ shuffleMetrics.shuffleRequested("supervisor2", 10);
+ shuffleMetrics.snapshotAndReset();
+
+ Assert.assertEquals(Collections.emptyMap(), shuffleMetrics.getDatasourceMetrics());
+ }
+
+ @Test(timeout = 5000L)
+ public void testConcurrency() throws ExecutionException, InterruptedException
+ {
+ final ExecutorService exec = Execs.multiThreaded(3, "shuffle-metrics-test-%d"); // 2 for write, 1 for read
+
+ try {
+ final ShuffleMetrics metrics = new ShuffleMetrics();
+ final String supervisorTask1 = "supervisor1";
+ final String supervisorTask2 = "supervisor2";
+
+ final CountDownLatch firstUpdatelatch = new CountDownLatch(2);
+ final List> futures = new ArrayList<>();
+
+ futures.add(
+ exec.submit(() -> {
+ metrics.shuffleRequested(supervisorTask1, 1024);
+ metrics.shuffleRequested(supervisorTask2, 30);
+ firstUpdatelatch.countDown();
+ Thread.sleep(ThreadLocalRandom.current().nextInt(10));
+ metrics.shuffleRequested(supervisorTask2, 10);
+ return null;
+ })
+ );
+ futures.add(
+ exec.submit(() -> {
+ metrics.shuffleRequested(supervisorTask2, 30);
+ metrics.shuffleRequested(supervisorTask1, 1024);
+ firstUpdatelatch.countDown();
+ Thread.sleep(ThreadLocalRandom.current().nextInt(10));
+ metrics.shuffleRequested(supervisorTask1, 32);
+ return null;
+ })
+ );
+ final Map firstSnapshot = exec.submit(() -> {
+ firstUpdatelatch.await();
+ Thread.sleep(ThreadLocalRandom.current().nextInt(10));
+ return metrics.snapshotAndReset();
+ }).get();
+
+ int expectedSecondSnapshotSize = 0;
+ boolean task1ShouldBeInSecondSnapshot = false;
+ boolean task2ShouldBeInSecondSnapshot = false;
+
+ Assert.assertEquals(2, firstSnapshot.size());
+ Assert.assertNotNull(firstSnapshot.get(supervisorTask1));
+ Assert.assertTrue(
+ 2048 == firstSnapshot.get(supervisorTask1).getShuffleBytes()
+ || 2080 == firstSnapshot.get(supervisorTask1).getShuffleBytes()
+ );
+ Assert.assertTrue(
+ 2 == firstSnapshot.get(supervisorTask1).getShuffleRequests()
+ || 3 == firstSnapshot.get(supervisorTask1).getShuffleRequests()
+ );
+ if (firstSnapshot.get(supervisorTask1).getShuffleRequests() == 2) {
+ expectedSecondSnapshotSize++;
+ task1ShouldBeInSecondSnapshot = true;
+ }
+ Assert.assertNotNull(firstSnapshot.get(supervisorTask2));
+ Assert.assertTrue(
+ 60 == firstSnapshot.get(supervisorTask2).getShuffleBytes()
+ || 70 == firstSnapshot.get(supervisorTask2).getShuffleBytes()
+ );
+ Assert.assertTrue(
+ 2 == firstSnapshot.get(supervisorTask2).getShuffleRequests()
+ || 3 == firstSnapshot.get(supervisorTask2).getShuffleRequests()
+ );
+ if (firstSnapshot.get(supervisorTask2).getShuffleRequests() == 2) {
+ expectedSecondSnapshotSize++;
+ task2ShouldBeInSecondSnapshot = true;
+ }
+
+ for (Future future : futures) {
+ future.get();
+ }
+ final Map secondSnapshot = metrics.snapshotAndReset();
+
+ Assert.assertEquals(expectedSecondSnapshotSize, secondSnapshot.size());
+ Assert.assertEquals(task1ShouldBeInSecondSnapshot, secondSnapshot.containsKey(supervisorTask1));
+ if (task1ShouldBeInSecondSnapshot) {
+ Assert.assertEquals(32, secondSnapshot.get(supervisorTask1).getShuffleBytes());
+ Assert.assertEquals(1, secondSnapshot.get(supervisorTask1).getShuffleRequests());
+ }
+ Assert.assertEquals(task2ShouldBeInSecondSnapshot, secondSnapshot.containsKey(supervisorTask2));
+ if (task2ShouldBeInSecondSnapshot) {
+ Assert.assertEquals(10, secondSnapshot.get(supervisorTask2).getShuffleBytes());
+ Assert.assertEquals(1, secondSnapshot.get(supervisorTask2).getShuffleRequests());
+ }
+
+ }
+ finally {
+ exec.shutdown();
+ }
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java
new file mode 100644
index 00000000000..fec860ab502
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.Optional;
+
+public class ShuffleModuleTest
+{
+ private ShuffleModule shuffleModule;
+
+ @Before
+ public void setup()
+ {
+ shuffleModule = new ShuffleModule();
+ }
+
+ @Test
+ public void testGetShuffleMetricsWhenShuffleMonitorExists()
+ {
+ final ShuffleMonitor shuffleMonitor = new ShuffleMonitor();
+ final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class);
+ Mockito.when(monitorScheduler.findMonitor(ShuffleMonitor.class))
+ .thenReturn(Optional.of(shuffleMonitor));
+ final Injector injector = createInjector(monitorScheduler);
+ final Optional optional = injector.getInstance(
+ Key.get(new TypeLiteral>() {})
+ );
+ Assert.assertTrue(optional.isPresent());
+ }
+
+ @Test
+ public void testGetShuffleMetricsWithNoShuffleMonitor()
+ {
+ final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class);
+ Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class)))
+ .thenReturn(Optional.empty());
+ final Injector injector = createInjector(monitorScheduler);
+ final Optional optional = injector.getInstance(
+ Key.get(new TypeLiteral>() {})
+ );
+ Assert.assertFalse(optional.isPresent());
+ }
+
+ private Injector createInjector(MonitorScheduler monitorScheduler)
+ {
+ return Guice.createInjector(
+ binder -> {
+ binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+ binder.bind(MonitorScheduler.class).toInstance(monitorScheduler);
+ binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class));
+ },
+ shuffleModule
+ );
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
new file mode 100644
index 00000000000..1174bc842ed
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+public class ShuffleMonitorTest
+{
+ @Test
+ public void testDoMonitor()
+ {
+ final ShuffleMetrics shuffleMetrics = Mockito.mock(ShuffleMetrics.class);
+ final PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = new PerDatasourceShuffleMetrics();
+ perDatasourceShuffleMetrics.accumulate(100);
+ perDatasourceShuffleMetrics.accumulate(200);
+ perDatasourceShuffleMetrics.accumulate(10);
+ Mockito.when(shuffleMetrics.snapshotAndReset())
+ .thenReturn(ImmutableMap.of("supervisor", perDatasourceShuffleMetrics));
+ final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final ShuffleMonitor monitor = new ShuffleMonitor();
+ monitor.setShuffleMetrics(shuffleMetrics);
+ Assert.assertTrue(monitor.doMonitor(emitter));
+ final List events = emitter.getEvents();
+ Assert.assertEquals(2, events.size());
+ Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass());
+ ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
+ Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric());
+ Assert.assertEquals(310L, event.getValue());
+ Assert.assertEquals(
+ ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
+ event.getUserDims()
+ );
+ Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass());
+ event = (ServiceMetricEvent) events.get(1);
+ Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, event.getMetric());
+ Assert.assertEquals(3, event.getValue());
+ Assert.assertEquals(
+ ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
+ event.getUserDims()
+ );
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
new file mode 100644
index 00000000000..bd1b2117042
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.client.indexing.TaskStatus;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.EasyMock;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class ShuffleResourceTest
+{
+ private static final String DATASOURCE = "datasource";
+
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+
+ private IntermediaryDataManager intermediaryDataManager;
+ private ShuffleMetrics shuffleMetrics;
+ private ShuffleResource shuffleResource;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final WorkerConfig workerConfig = new WorkerConfig()
+ {
+ @Override
+ public long getIntermediaryPartitionDiscoveryPeriodSec()
+ {
+ return 1;
+ }
+
+ @Override
+ public long getIntermediaryPartitionCleanupPeriodSec()
+ {
+ return 2;
+ }
+
+ @Override
+ public Period getIntermediaryPartitionTimeout()
+ {
+ return new Period("PT2S");
+ }
+
+ };
+ final TaskConfig taskConfig = new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
+ );
+ final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
+ {
+ @Override
+ public Map getTaskStatuses(Set taskIds)
+ {
+ final Map result = new HashMap<>();
+ for (String taskId : taskIds) {
+ result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
+ }
+ return result;
+ }
+ };
+ intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+ shuffleMetrics = new ShuffleMetrics();
+ shuffleResource = new ShuffleResource(intermediaryDataManager, Optional.of(shuffleMetrics));
+ }
+
+ @Test
+ public void testGetUnknownPartitionReturnNotFound()
+ {
+ final Response response = shuffleResource.getPartition(
+ "unknownSupervisorTask",
+ "unknownSubtask",
+ "2020-01-01",
+ "2020-01-02",
+ 0
+ );
+ Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
+ Assert.assertNotNull(response.getEntity());
+ final String errorMessage = (String) response.getEntity();
+ Assert.assertTrue(errorMessage.contains("Can't find the partition for supervisorTask"));
+ }
+
+ @Test
+ public void testGetPartitionWithValidParamsReturnOk() throws IOException
+ {
+ final String supervisorTaskId = "supervisorTask";
+ final String subtaskId = "subtaskId";
+ final Interval interval = Intervals.of("2020-01-01/P1D");
+ final DataSegment segment = newSegment(interval);
+ final File segmentDir = generateSegmentDir("test");
+ intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, segmentDir);
+
+ final Response response = shuffleResource.getPartition(
+ supervisorTaskId,
+ subtaskId,
+ interval.getStart().toString(),
+ interval.getEnd().toString(),
+ segment.getId().getPartitionNum()
+ );
+ final Map snapshot = shuffleMetrics.snapshotAndReset();
+ Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ Assert.assertEquals(1, snapshot.get(supervisorTaskId).getShuffleRequests());
+ Assert.assertEquals(134, snapshot.get(supervisorTaskId).getShuffleBytes());
+ }
+
+ @Test
+ public void testDeleteUnknownPartitionReturnOk()
+ {
+ final Response response = shuffleResource.deletePartitions("unknownSupervisorTask");
+ Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ }
+
+ @Test
+ public void testDeletePartitionWithValidParamsReturnOk() throws IOException
+ {
+ final String supervisorTaskId = "supervisorTask";
+ final String subtaskId = "subtaskId";
+ final Interval interval = Intervals.of("2020-01-01/P1D");
+ final DataSegment segment = newSegment(interval);
+ final File segmentDir = generateSegmentDir("test");
+ intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, segmentDir);
+
+ final Response response = shuffleResource.deletePartitions(supervisorTaskId);
+ Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ }
+
+ @Test
+ public void testDeletePartitionThrowingExceptionReturnIntervalServerError() throws IOException
+ {
+ final IntermediaryDataManager exceptionThrowingManager = EasyMock.niceMock(IntermediaryDataManager.class);
+ exceptionThrowingManager.deletePartitions(EasyMock.anyString());
+ EasyMock.expectLastCall().andThrow(new IOException("test"));
+ EasyMock.replay(exceptionThrowingManager);
+ final ShuffleResource shuffleResource = new ShuffleResource(exceptionThrowingManager, Optional.of(shuffleMetrics));
+
+ final Response response = shuffleResource.deletePartitions("supervisorTask");
+ Assert.assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+ }
+
+ private static DataSegment newSegment(Interval interval)
+ {
+ return new DataSegment(
+ DATASOURCE,
+ interval,
+ "version",
+ null,
+ null,
+ null,
+ new NumberedShardSpec(0, 0),
+ 0,
+ 10
+ );
+ }
+
+ private File generateSegmentDir(String fileName) throws IOException
+ {
+ // Each file size is 138 bytes after compression
+ final File segmentDir = tempDir.newFolder();
+ FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+ return segmentDir;
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 953c03fb055..10a70145331 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -57,7 +57,7 @@ import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.indexing.worker.http.ShuffleResource;
+import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.QuerySegmentWalker;
@@ -143,7 +143,6 @@ public class CliIndexer extends ServerRunnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class);
- Jerseys.addResource(binder, ShuffleResource.class);
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
@@ -201,6 +200,7 @@ public class CliIndexer extends ServerRunnable
);
}
},
+ new ShuffleModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 531746f804f..4438a07b4e5 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -56,9 +56,9 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.indexing.worker.http.TaskManagementResource;
import org.apache.druid.indexing.worker.http.WorkerResource;
+import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.lookup.LookupSerdeModule;
@@ -142,8 +142,6 @@ public class CliMiddleManager extends ServerRunnable
.to(DummyForInjectionAppenderatorsManager.class)
.in(LazySingleton.class);
- Jerseys.addResource(binder, ShuffleResource.class);
-
LifecycleModule.register(binder, Server.class);
bindNodeRoleAndAnnouncer(
@@ -184,6 +182,7 @@ public class CliMiddleManager extends ServerRunnable
);
}
},
+ new ShuffleModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(),
diff --git a/website/.spelling b/website/.spelling
index 5a4453ed3dd..343be4d17b5 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -382,6 +382,7 @@ subsecond
substring
subtask
subtasks
+supervisorTaskId
symlink
tiering
timeseries