Add shuffle metrics for parallel indexing (#10359)

* Add shuffle metrics for parallel indexing

* javadoc and concurrency test

* concurrency

* fix javadoc

* Feature flag

* doc

* fix doc and add a test

* checkstyle

* add tests

* fix build and address comments
This commit is contained in:
Jihoon Son 2020-10-10 19:35:17 -07:00 committed by GitHub
parent 4d2a92f46a
commit ad437dd655
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 956 additions and 24 deletions

View File

@ -331,6 +331,12 @@
<groupId>com.google.errorprone</groupId> <groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId> <artifactId>error_prone_annotations</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -29,5 +29,10 @@ public interface Monitor
void stop(); void stop();
/**
* Emit metrics using the given emitter.
*
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter); boolean monitor(ServiceEmitter emitter);
} }

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -93,6 +94,17 @@ public class MonitorScheduler
} }
} }
/**
* Returns a {@link Monitor} instance of the given class if any. Note that this method searches for the monitor
* from the current snapshot of {@link #monitors}.
*/
public <T extends Monitor> Optional<T> findMonitor(Class<T> monitorClass)
{
synchronized (lock) {
return (Optional<T>) monitors.stream().filter(m -> m.getClass() == monitorClass).findFirst();
}
}
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Optional;
public class MonitorSchedulerTest
{
@Test
public void testFindMonitor()
{
class Monitor1 extends NoopMonitor
{
}
class Monitor2 extends NoopMonitor
{
}
class Monitor3 extends NoopMonitor
{
}
final Monitor1 monitor1 = new Monitor1();
final Monitor2 monitor2 = new Monitor2();
final MonitorScheduler scheduler = new MonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
Execs.scheduledSingleThreaded("monitor-scheduler-test"),
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor1, monitor2)
);
final Optional<Monitor1> maybeFound1 = scheduler.findMonitor(Monitor1.class);
final Optional<Monitor2> maybeFound2 = scheduler.findMonitor(Monitor2.class);
Assert.assertTrue(maybeFound1.isPresent());
Assert.assertTrue(maybeFound2.isPresent());
Assert.assertSame(monitor1, maybeFound1.get());
Assert.assertSame(monitor2, maybeFound2.get());
Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
}
private static class NoopMonitor implements Monitor
{
@Override
public void start()
{
}
@Override
public void stop()
{
}
@Override
public boolean monitor(ServiceEmitter emitter)
{
return true;
}
}
}

View File

@ -181,7 +181,7 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0. Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
### Indexing service ## Indexing service
|Metric|Description|Dimensions|Normal Value| |Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------| |------|-----------|----------|------------|
@ -202,6 +202,16 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
## Shuffle metrics (Native parallel task)
The shuffle metrics can be enabled by adding `org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in `druid.monitoring.monitors`
See [Enabling Metrics](../configuration/index.md#enabling-metrics) for more details.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/shuffle/bytes`|Number of bytes shuffled per emission period.|supervisorTaskId|Varies|
|`ingest/shuffle/requests`|Number of shuffle requests per emission period.|supervisorTaskId|Varies|
## Coordination ## Coordination
These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic. These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic.

View File

@ -278,6 +278,12 @@
<artifactId>system-rules</artifactId> <artifactId>system-rules</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -42,7 +42,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory; import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor; import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.java.util.metrics.MonitorScheduler;

View File

@ -41,7 +41,7 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;

View File

@ -62,6 +62,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRun
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
@ -517,7 +518,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
* - In the first phase, each task partitions input data and stores those partitions in local storage. * - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension * - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key). * values in {@link PartitionsSpec} (secondary partition key).
* - Partitioned data is maintained by {@link org.apache.druid.indexing.worker.IntermediaryDataManager}. * - Partitioned data is maintained by {@link IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the intermediary data server (middleManager * - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
* or indexer) and merges them to create the final segments. * or indexer) and merges them to create the final segments.
*/ */

View File

@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec; import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
@ -46,7 +47,7 @@ import java.util.stream.Collectors;
/** /**
* The worker task of {@link PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by * The worker task of {@link PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by
* hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are * hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are
* stored in local storage using {@link org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}. * stored in local storage using {@link ShuffleDataSegmentPusher}.
*/ */
public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport> public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport>
{ {

View File

@ -31,7 +31,7 @@ import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec; import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries; import org.apache.druid.timeline.partition.PartitionBoundaries;

View File

@ -30,7 +30,7 @@ import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SequenceNameFunction; import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowIngestionMeters;

View File

@ -19,6 +19,8 @@
package org.apache.druid.indexing.common.task.batch.parallel; package org.apache.druid.indexing.common.task.batch.parallel;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -27,7 +29,7 @@ import java.io.IOException;
* The only available implementation for production code is {@link HttpShuffleClient} and * The only available implementation for production code is {@link HttpShuffleClient} and
* this interface is more for easier testing. * this interface is more for easier testing.
* *
* @see org.apache.druid.indexing.worker.IntermediaryDataManager * @see IntermediaryDataManager
* @see PartialSegmentMergeTask * @see PartialSegmentMergeTask
*/ */
public interface ShuffleClient public interface ShuffleClient

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.indexing.worker; package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
@ -67,7 +68,7 @@ import java.util.stream.IntStream;
/** /**
* This class manages intermediary segments for data shuffle between native parallel index tasks. * This class manages intermediary segments for data shuffle between native parallel index tasks.
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer) * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
* and phase 2 tasks read those files via HTTP. * and phase 2 tasks read those files over HTTP.
* *
* The directory where segment files are placed is structured as * The directory where segment files are placed is structured as
* {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment. * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
@ -100,7 +101,7 @@ public class IntermediaryDataManager
// but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary // but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
// partitions. // partitions.
// This can be null until IntermediaryDataManager is started. // This can be null until IntermediaryDataManager is started.
@Nullable @MonotonicNonNull
private ScheduledExecutorService supervisorTaskChecker; private ScheduledExecutorService supervisorTaskChecker;
@Inject @Inject

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.indexing.worker; package org.apache.druid.indexing.worker.shuffle;
import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.DataSegmentPusher;

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Shuffle metrcis for middleManagers and indexers. This class is thread-safe because shuffle can be performed by
* multiple HTTP threads while a monitoring thread periodically emits the snapshot of metrics.
*
* @see ShuffleResource
* @see org.apache.druid.java.util.metrics.MonitorScheduler
*/
public class ShuffleMetrics
{
/**
* This lock is used to synchronize accesses to the reference to {@link #datasourceMetrics} and the
* {@link PerDatasourceShuffleMetrics} values of the map. This means,
*
* - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key as well) should be synchronized
* under this lock.
* - Any updates on the reference to datasourceMetrics should be synchronized under this lock.
*/
private final Object lock = new Object();
/**
* A map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. This map is replaced with an empty map
* whenever a snapshot is taken since the map can keep growing over time otherwise. For concurrent access pattern,
* see {@link #shuffleRequested} and {@link #snapshotAndReset()}.
*/
@GuardedBy("lock")
private Map<String, PerDatasourceShuffleMetrics> datasourceMetrics = new HashMap<>();
/**
* This method is called whenever a new shuffle is requested. Multiple tasks can request shuffle at the same time,
* while the monitoring thread takes a snapshot of the metrics. There is a happens-before relationship between
* shuffleRequested and {@link #snapshotAndReset()}.
*/
public void shuffleRequested(String supervisorTaskId, long fileLength)
{
synchronized (lock) {
datasourceMetrics.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics())
.accumulate(fileLength);
}
}
/**
* This method is called whenever the monitoring thread takes a snapshot of the current metrics.
* {@link #datasourceMetrics} will be reset to an empty map after this call. This is to return the snapshot
* metrics collected during the monitornig period. There is a happens-before relationship between snapshotAndReset()
* and {@link #shuffleRequested}.
*/
public Map<String, PerDatasourceShuffleMetrics> snapshotAndReset()
{
synchronized (lock) {
final Map<String, PerDatasourceShuffleMetrics> snapshot = Collections.unmodifiableMap(datasourceMetrics);
datasourceMetrics = new HashMap<>();
return snapshot;
}
}
/**
* This method is visible only for testing. Use {@link #snapshotAndReset()} instead to get the current snapshot.
*/
@VisibleForTesting
Map<String, PerDatasourceShuffleMetrics> getDatasourceMetrics()
{
synchronized (lock) {
return datasourceMetrics;
}
}
/**
* This class represents shuffle metrics of one datasource. This class is not thread-safe and should never be accessed
* by multiple threads at the same time.
*/
public static class PerDatasourceShuffleMetrics
{
private long shuffleBytes;
private int shuffleRequests;
@VisibleForTesting
void accumulate(long shuffleBytes)
{
this.shuffleBytes += shuffleBytes;
this.shuffleRequests++;
}
public long getShuffleBytes()
{
return shuffleBytes;
}
public int getShuffleRequests()
{
return shuffleRequests;
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import java.util.Optional;
public class ShuffleModule implements Module
{
@Override
public void configure(Binder binder)
{
Jerseys.addResource(binder, ShuffleResource.class);
}
/**
* {@link ShuffleMetrics} is used in {@link ShuffleResource} and {@link ShuffleMonitor} to collect metrics
* and report them, respectively. Unlike ShuffleResource, ShuffleMonitor can be created via a user config
* ({@link org.apache.druid.server.metrics.MonitorsConfig}) in potentially any node types, where it is not
* possible to create ShuffleMetrics. This method checks the {@link MonitorScheduler} if ShuffleMonitor is
* registered on it, and sets the proper ShuffleMetrics.
*/
@Provides
@LazySingleton
public Optional<ShuffleMetrics> getShuffleMetrics(MonitorScheduler monitorScheduler)
{
// ShuffleMonitor cannot be registered dynamically, but can only via the static configuration (MonitorsConfig).
// As a result, it is safe to check only one time if it is registered in MonitorScheduler.
final Optional<ShuffleMonitor> maybeMonitor = monitorScheduler.findMonitor(ShuffleMonitor.class);
if (maybeMonitor.isPresent()) {
final ShuffleMetrics metrics = new ShuffleMetrics();
maybeMonitor.get().setShuffleMetrics(metrics);
return Optional.of(metrics);
}
return Optional.empty();
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import java.util.Map;
public class ShuffleMonitor extends AbstractMonitor
{
static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId";
static final String SHUFFLE_BYTES_KEY = "ingest/shuffle/bytes";
static final String SHUFFLE_REQUESTS_KEY = "ingest/shuffle/requests";
/**
* ShuffleMonitor can be instantiated in any node types if it is defined in
* {@link org.apache.druid.server.metrics.MonitorsConfig}. Since {@link ShuffleMetrics} is defined
* in the `indexing-service` module, some node types (such as broker) would fail to create it
* if they don't have required dependencies. To avoid this problem, this variable is lazily initialized
* only in the node types which has the {@link ShuffleModule}.
*/
@MonotonicNonNull
private ShuffleMetrics shuffleMetrics;
public void setShuffleMetrics(ShuffleMetrics shuffleMetrics)
{
this.shuffleMetrics = shuffleMetrics;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
if (shuffleMetrics != null) {
final Map<String, PerDatasourceShuffleMetrics> snapshot = shuffleMetrics.snapshotAndReset();
snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> {
final Builder metricBuilder = ServiceMetricEvent
.builder()
.setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId);
emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes()));
emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests()));
});
}
return true;
}
}

View File

@ -17,12 +17,11 @@
* under the License. * under the License.
*/ */
package org.apache.druid.indexing.worker.http; package org.apache.druid.indexing.worker.shuffle;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters; import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -42,6 +41,7 @@ import javax.ws.rs.core.StreamingOutput;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
/** /**
* HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle * HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle
@ -60,11 +60,13 @@ public class ShuffleResource
private static final Logger log = new Logger(ShuffleResource.class); private static final Logger log = new Logger(ShuffleResource.class);
private final IntermediaryDataManager intermediaryDataManager; private final IntermediaryDataManager intermediaryDataManager;
private final Optional<ShuffleMetrics> shuffleMetrics;
@Inject @Inject
public ShuffleResource(IntermediaryDataManager intermediaryDataManager) public ShuffleResource(IntermediaryDataManager intermediaryDataManager, Optional<ShuffleMetrics> shuffleMetrics)
{ {
this.intermediaryDataManager = intermediaryDataManager; this.intermediaryDataManager = intermediaryDataManager;
this.shuffleMetrics = shuffleMetrics;
} }
@GET @GET
@ -96,6 +98,7 @@ public class ShuffleResource
); );
return Response.status(Status.NOT_FOUND).entity(errorMessage).build(); return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
} else { } else {
shuffleMetrics.ifPresent(metrics -> metrics.shuffleRequested(supervisorTaskId, partitionFile.length()));
return Response.ok( return Response.ok(
(StreamingOutput) output -> { (StreamingOutput) output -> {
try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) { try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {

View File

@ -64,8 +64,8 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.indexing.worker; package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.indexing.worker; package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.indexing.worker; package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.io.Files; import com.google.common.io.Files;

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
public class ShuffleMetricsTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testShuffleRequested()
{
final ShuffleMetrics metrics = new ShuffleMetrics();
final String supervisorTask1 = "supervisor1";
final String supervisorTask2 = "supervisor2";
final String supervisorTask3 = "supervisor3";
metrics.shuffleRequested(supervisorTask1, 1024);
metrics.shuffleRequested(supervisorTask2, 10);
metrics.shuffleRequested(supervisorTask1, 512);
metrics.shuffleRequested(supervisorTask3, 10000);
metrics.shuffleRequested(supervisorTask2, 30);
final Map<String, PerDatasourceShuffleMetrics> snapshot = metrics.snapshotAndReset();
Assert.assertEquals(ImmutableSet.of(supervisorTask1, supervisorTask2, supervisorTask3), snapshot.keySet());
PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = snapshot.get(supervisorTask1);
Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests());
Assert.assertEquals(1536, perDatasourceShuffleMetrics.getShuffleBytes());
perDatasourceShuffleMetrics = snapshot.get(supervisorTask2);
Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests());
Assert.assertEquals(40, perDatasourceShuffleMetrics.getShuffleBytes());
perDatasourceShuffleMetrics = snapshot.get(supervisorTask3);
Assert.assertEquals(1, perDatasourceShuffleMetrics.getShuffleRequests());
Assert.assertEquals(10000, perDatasourceShuffleMetrics.getShuffleBytes());
}
@Test
public void testSnapshotUnmodifiable()
{
expectedException.expect(UnsupportedOperationException.class);
new ShuffleMetrics().snapshotAndReset().put("k", new PerDatasourceShuffleMetrics());
}
@Test
public void testResetDatasourceMetricsAfterSnapshot()
{
final ShuffleMetrics shuffleMetrics = new ShuffleMetrics();
shuffleMetrics.shuffleRequested("supervisor", 10);
shuffleMetrics.shuffleRequested("supervisor", 10);
shuffleMetrics.shuffleRequested("supervisor2", 10);
shuffleMetrics.snapshotAndReset();
Assert.assertEquals(Collections.emptyMap(), shuffleMetrics.getDatasourceMetrics());
}
@Test(timeout = 5000L)
public void testConcurrency() throws ExecutionException, InterruptedException
{
final ExecutorService exec = Execs.multiThreaded(3, "shuffle-metrics-test-%d"); // 2 for write, 1 for read
try {
final ShuffleMetrics metrics = new ShuffleMetrics();
final String supervisorTask1 = "supervisor1";
final String supervisorTask2 = "supervisor2";
final CountDownLatch firstUpdatelatch = new CountDownLatch(2);
final List<Future<Void>> futures = new ArrayList<>();
futures.add(
exec.submit(() -> {
metrics.shuffleRequested(supervisorTask1, 1024);
metrics.shuffleRequested(supervisorTask2, 30);
firstUpdatelatch.countDown();
Thread.sleep(ThreadLocalRandom.current().nextInt(10));
metrics.shuffleRequested(supervisorTask2, 10);
return null;
})
);
futures.add(
exec.submit(() -> {
metrics.shuffleRequested(supervisorTask2, 30);
metrics.shuffleRequested(supervisorTask1, 1024);
firstUpdatelatch.countDown();
Thread.sleep(ThreadLocalRandom.current().nextInt(10));
metrics.shuffleRequested(supervisorTask1, 32);
return null;
})
);
final Map<String, PerDatasourceShuffleMetrics> firstSnapshot = exec.submit(() -> {
firstUpdatelatch.await();
Thread.sleep(ThreadLocalRandom.current().nextInt(10));
return metrics.snapshotAndReset();
}).get();
int expectedSecondSnapshotSize = 0;
boolean task1ShouldBeInSecondSnapshot = false;
boolean task2ShouldBeInSecondSnapshot = false;
Assert.assertEquals(2, firstSnapshot.size());
Assert.assertNotNull(firstSnapshot.get(supervisorTask1));
Assert.assertTrue(
2048 == firstSnapshot.get(supervisorTask1).getShuffleBytes()
|| 2080 == firstSnapshot.get(supervisorTask1).getShuffleBytes()
);
Assert.assertTrue(
2 == firstSnapshot.get(supervisorTask1).getShuffleRequests()
|| 3 == firstSnapshot.get(supervisorTask1).getShuffleRequests()
);
if (firstSnapshot.get(supervisorTask1).getShuffleRequests() == 2) {
expectedSecondSnapshotSize++;
task1ShouldBeInSecondSnapshot = true;
}
Assert.assertNotNull(firstSnapshot.get(supervisorTask2));
Assert.assertTrue(
60 == firstSnapshot.get(supervisorTask2).getShuffleBytes()
|| 70 == firstSnapshot.get(supervisorTask2).getShuffleBytes()
);
Assert.assertTrue(
2 == firstSnapshot.get(supervisorTask2).getShuffleRequests()
|| 3 == firstSnapshot.get(supervisorTask2).getShuffleRequests()
);
if (firstSnapshot.get(supervisorTask2).getShuffleRequests() == 2) {
expectedSecondSnapshotSize++;
task2ShouldBeInSecondSnapshot = true;
}
for (Future<Void> future : futures) {
future.get();
}
final Map<String, PerDatasourceShuffleMetrics> secondSnapshot = metrics.snapshotAndReset();
Assert.assertEquals(expectedSecondSnapshotSize, secondSnapshot.size());
Assert.assertEquals(task1ShouldBeInSecondSnapshot, secondSnapshot.containsKey(supervisorTask1));
if (task1ShouldBeInSecondSnapshot) {
Assert.assertEquals(32, secondSnapshot.get(supervisorTask1).getShuffleBytes());
Assert.assertEquals(1, secondSnapshot.get(supervisorTask1).getShuffleRequests());
}
Assert.assertEquals(task2ShouldBeInSecondSnapshot, secondSnapshot.containsKey(supervisorTask2));
if (task2ShouldBeInSecondSnapshot) {
Assert.assertEquals(10, secondSnapshot.get(supervisorTask2).getShuffleBytes());
Assert.assertEquals(1, secondSnapshot.get(supervisorTask2).getShuffleRequests());
}
}
finally {
exec.shutdown();
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.Optional;
public class ShuffleModuleTest
{
private ShuffleModule shuffleModule;
@Before
public void setup()
{
shuffleModule = new ShuffleModule();
}
@Test
public void testGetShuffleMetricsWhenShuffleMonitorExists()
{
final ShuffleMonitor shuffleMonitor = new ShuffleMonitor();
final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class);
Mockito.when(monitorScheduler.findMonitor(ShuffleMonitor.class))
.thenReturn(Optional.of(shuffleMonitor));
final Injector injector = createInjector(monitorScheduler);
final Optional<ShuffleMetrics> optional = injector.getInstance(
Key.get(new TypeLiteral<Optional<ShuffleMetrics>>() {})
);
Assert.assertTrue(optional.isPresent());
}
@Test
public void testGetShuffleMetricsWithNoShuffleMonitor()
{
final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class);
Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class)))
.thenReturn(Optional.empty());
final Injector injector = createInjector(monitorScheduler);
final Optional<ShuffleMetrics> optional = injector.getInstance(
Key.get(new TypeLiteral<Optional<ShuffleMetrics>>() {})
);
Assert.assertFalse(optional.isPresent());
}
private Injector createInjector(MonitorScheduler monitorScheduler)
{
return Guice.createInjector(
binder -> {
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
binder.bind(MonitorScheduler.class).toInstance(monitorScheduler);
binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class));
},
shuffleModule
);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.List;
public class ShuffleMonitorTest
{
@Test
public void testDoMonitor()
{
final ShuffleMetrics shuffleMetrics = Mockito.mock(ShuffleMetrics.class);
final PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = new PerDatasourceShuffleMetrics();
perDatasourceShuffleMetrics.accumulate(100);
perDatasourceShuffleMetrics.accumulate(200);
perDatasourceShuffleMetrics.accumulate(10);
Mockito.when(shuffleMetrics.snapshotAndReset())
.thenReturn(ImmutableMap.of("supervisor", perDatasourceShuffleMetrics));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
final ShuffleMonitor monitor = new ShuffleMonitor();
monitor.setShuffleMetrics(shuffleMetrics);
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> events = emitter.getEvents();
Assert.assertEquals(2, events.size());
Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass());
ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric());
Assert.assertEquals(310L, event.getValue());
Assert.assertEquals(
ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
event.getUserDims()
);
Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass());
event = (ServiceMetricEvent) events.get(1);
Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, event.getMetric());
Assert.assertEquals(3, event.getValue());
Assert.assertEquals(
ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
event.getUserDims()
);
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatus;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class ShuffleResourceTest
{
private static final String DATASOURCE = "datasource";
@Rule
public TemporaryFolder tempDir = new TemporaryFolder();
private IntermediaryDataManager intermediaryDataManager;
private ShuffleMetrics shuffleMetrics;
private ShuffleResource shuffleResource;
@Before
public void setup() throws IOException
{
final WorkerConfig workerConfig = new WorkerConfig()
{
@Override
public long getIntermediaryPartitionDiscoveryPeriodSec()
{
return 1;
}
@Override
public long getIntermediaryPartitionCleanupPeriodSec()
{
return 2;
}
@Override
public Period getIntermediaryPartitionTimeout()
{
return new Period("PT2S");
}
};
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
{
@Override
public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
{
final Map<String, TaskStatus> result = new HashMap<>();
for (String taskId : taskIds) {
result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
}
return result;
}
};
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
shuffleMetrics = new ShuffleMetrics();
shuffleResource = new ShuffleResource(intermediaryDataManager, Optional.of(shuffleMetrics));
}
@Test
public void testGetUnknownPartitionReturnNotFound()
{
final Response response = shuffleResource.getPartition(
"unknownSupervisorTask",
"unknownSubtask",
"2020-01-01",
"2020-01-02",
0
);
Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
Assert.assertNotNull(response.getEntity());
final String errorMessage = (String) response.getEntity();
Assert.assertTrue(errorMessage.contains("Can't find the partition for supervisorTask"));
}
@Test
public void testGetPartitionWithValidParamsReturnOk() throws IOException
{
final String supervisorTaskId = "supervisorTask";
final String subtaskId = "subtaskId";
final Interval interval = Intervals.of("2020-01-01/P1D");
final DataSegment segment = newSegment(interval);
final File segmentDir = generateSegmentDir("test");
intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, segmentDir);
final Response response = shuffleResource.getPartition(
supervisorTaskId,
subtaskId,
interval.getStart().toString(),
interval.getEnd().toString(),
segment.getId().getPartitionNum()
);
final Map<String, PerDatasourceShuffleMetrics> snapshot = shuffleMetrics.snapshotAndReset();
Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
Assert.assertEquals(1, snapshot.get(supervisorTaskId).getShuffleRequests());
Assert.assertEquals(134, snapshot.get(supervisorTaskId).getShuffleBytes());
}
@Test
public void testDeleteUnknownPartitionReturnOk()
{
final Response response = shuffleResource.deletePartitions("unknownSupervisorTask");
Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
@Test
public void testDeletePartitionWithValidParamsReturnOk() throws IOException
{
final String supervisorTaskId = "supervisorTask";
final String subtaskId = "subtaskId";
final Interval interval = Intervals.of("2020-01-01/P1D");
final DataSegment segment = newSegment(interval);
final File segmentDir = generateSegmentDir("test");
intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, segmentDir);
final Response response = shuffleResource.deletePartitions(supervisorTaskId);
Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
@Test
public void testDeletePartitionThrowingExceptionReturnIntervalServerError() throws IOException
{
final IntermediaryDataManager exceptionThrowingManager = EasyMock.niceMock(IntermediaryDataManager.class);
exceptionThrowingManager.deletePartitions(EasyMock.anyString());
EasyMock.expectLastCall().andThrow(new IOException("test"));
EasyMock.replay(exceptionThrowingManager);
final ShuffleResource shuffleResource = new ShuffleResource(exceptionThrowingManager, Optional.of(shuffleMetrics));
final Response response = shuffleResource.deletePartitions("supervisorTask");
Assert.assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
}
private static DataSegment newSegment(Interval interval)
{
return new DataSegment(
DATASOURCE,
interval,
"version",
null,
null,
null,
new NumberedShardSpec(0, 0),
0,
10
);
}
private File generateSegmentDir(String fileName) throws IOException
{
// Each file size is 138 bytes after compression
final File segmentDir = tempDir.newFolder();
FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
return segmentDir;
}
}

View File

@ -57,7 +57,7 @@ import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.ThreadingTaskRunner; import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.http.ShuffleResource; import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QuerySegmentWalker;
@ -143,7 +143,6 @@ public class CliIndexer extends ServerRunnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class); Jerseys.addResource(binder, SegmentListerResource.class);
Jerseys.addResource(binder, ShuffleResource.class);
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class); LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
@ -201,6 +200,7 @@ public class CliIndexer extends ServerRunnable
); );
} }
}, },
new ShuffleModule(),
new IndexingServiceFirehoseModule(), new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(), new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(), new IndexingServiceTaskLogsModule(),

View File

@ -56,9 +56,9 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskMonitor; import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.TaskManagementResource;
import org.apache.druid.indexing.worker.http.WorkerResource; import org.apache.druid.indexing.worker.http.WorkerResource;
import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.query.lookup.LookupSerdeModule;
@ -142,8 +142,6 @@ public class CliMiddleManager extends ServerRunnable
.to(DummyForInjectionAppenderatorsManager.class) .to(DummyForInjectionAppenderatorsManager.class)
.in(LazySingleton.class); .in(LazySingleton.class);
Jerseys.addResource(binder, ShuffleResource.class);
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);
bindNodeRoleAndAnnouncer( bindNodeRoleAndAnnouncer(
@ -184,6 +182,7 @@ public class CliMiddleManager extends ServerRunnable
); );
} }
}, },
new ShuffleModule(),
new IndexingServiceFirehoseModule(), new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(), new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(), new IndexingServiceTaskLogsModule(),

View File

@ -382,6 +382,7 @@ subsecond
substring substring
subtask subtask
subtasks subtasks
supervisorTaskId
symlink symlink
tiering tiering
timeseries timeseries