Configure maxBytesPerWorker directly instead of using StageDefinition (#14257)

* Configure maxBytesPerWorker directly instead of using StageDefinition
This commit is contained in:
Adarsh Sanjeev 2023-05-15 16:51:57 +05:30 committed by GitHub
parent e9913abbbf
commit 10bce22e68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 345 additions and 71 deletions

View File

@ -1599,14 +1599,10 @@ public class ControllerImpl implements Controller
final DataSchema dataSchema =
generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper);
final long maxInputBytesPerWorker =
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getQuery().context());
builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.maxInputBytesPerWorker(maxInputBytesPerWorker)
.processorFactory(
new SegmentGeneratorFrameProcessorFactory(
dataSchema,
@ -2406,10 +2402,14 @@ public class ControllerImpl implements Controller
*/
private void startStages() throws IOException, InterruptedException
{
final long maxInputBytesPerWorker =
MultiStageQueryContext.getMaxInputBytesPerWorker(task.getQuerySpec().getQuery().context());
logKernelStatus(queryDef.getQueryId(), queryKernel);
final List<StageId> newStageIds = queryKernel.createAndGetNewStageIds(
inputSpecSlicerFactory,
task.getQuerySpec().getAssignmentStrategy()
task.getQuerySpec().getAssignmentStrategy(),
maxInputBytesPerWorker
);
for (final StageId stageId : newStageIds) {

View File

@ -40,7 +40,6 @@ import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@ -93,7 +92,6 @@ public class StageDefinition
private final FrameProcessorFactory processorFactory;
private final RowSignature signature;
private final int maxWorkerCount;
private final long maxInputBytesPerWorker;
private final boolean shuffleCheckHasMultipleValues;
@Nullable
@ -111,8 +109,7 @@ public class StageDefinition
@JsonProperty("signature") final RowSignature signature,
@Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec,
@JsonProperty("maxWorkerCount") final int maxWorkerCount,
@JsonProperty("shuffleCheckHasMultipleValues") final boolean shuffleCheckHasMultipleValues,
@JsonProperty("maxInputBytesPerWorker") final Long maxInputBytesPerWorker
@JsonProperty("shuffleCheckHasMultipleValues") final boolean shuffleCheckHasMultipleValues
)
{
this.id = Preconditions.checkNotNull(id, "id");
@ -132,8 +129,6 @@ public class StageDefinition
this.maxWorkerCount = maxWorkerCount;
this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
this.frameReader = Suppliers.memoize(() -> FrameReader.create(signature))::get;
this.maxInputBytesPerWorker = maxInputBytesPerWorker == null ?
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER : maxInputBytesPerWorker;
if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().getColumns().isEmpty()) {
throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", shuffleSpec);
@ -280,12 +275,6 @@ public class StageDefinition
return maxWorkerCount;
}
@JsonProperty
public long getMaxInputBytesPerWorker()
{
return maxInputBytesPerWorker;
}
@JsonProperty("shuffleCheckHasMultipleValues")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
boolean getShuffleCheckHasMultipleValues()
@ -412,8 +401,7 @@ public class StageDefinition
&& Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers)
&& Objects.equals(processorFactory, that.processorFactory)
&& Objects.equals(signature, that.signature)
&& Objects.equals(shuffleSpec, that.shuffleSpec)
&& Objects.equals(maxInputBytesPerWorker, that.maxInputBytesPerWorker);
&& Objects.equals(shuffleSpec, that.shuffleSpec);
}
@Override
@ -427,8 +415,7 @@ public class StageDefinition
signature,
maxWorkerCount,
shuffleCheckHasMultipleValues,
shuffleSpec,
maxInputBytesPerWorker
shuffleSpec
);
}
@ -444,7 +431,6 @@ public class StageDefinition
", maxWorkerCount=" + maxWorkerCount +
", shuffleSpec=" + shuffleSpec +
(shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues=" + shuffleCheckHasMultipleValues : "") +
", maxInputBytesPerWorker=" + maxInputBytesPerWorker +
'}';
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.msq.kernel;
import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.segment.column.RowSignature;
@ -43,7 +42,6 @@ public class StageDefinitionBuilder
private int maxWorkerCount = 1;
private ShuffleSpec shuffleSpec = null;
private boolean shuffleCheckHasMultipleValues = false;
private long maxInputBytesPerWorker = Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER;
/**
* Package-private: callers should prefer {@link StageDefinition#builder(int)} rather than this constructor.
@ -107,12 +105,6 @@ public class StageDefinitionBuilder
return this;
}
public StageDefinitionBuilder maxInputBytesPerWorker(final long maxInputBytesPerWorker)
{
this.maxInputBytesPerWorker = maxInputBytesPerWorker;
return this;
}
int getStageNumber()
{
return stageNumber;
@ -133,8 +125,7 @@ public class StageDefinitionBuilder
signature,
shuffleSpec,
maxWorkerCount,
shuffleCheckHasMultipleValues,
maxInputBytesPerWorker
shuffleCheckHasMultipleValues
);
}
}

View File

@ -45,10 +45,11 @@ public enum WorkerAssignmentStrategy
MAX {
@Override
public List<InputSlice> assign(
StageDefinition stageDef,
InputSpec inputSpec,
Int2IntMap stageWorkerCountMap,
InputSpecSlicer slicer
final StageDefinition stageDef,
final InputSpec inputSpec,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
final long maxInputBytesPerSlice
)
{
return slicer.sliceStatic(inputSpec, stageDef.getMaxWorkerCount());
@ -57,7 +58,7 @@ public enum WorkerAssignmentStrategy
/**
* Use the lowest possible number of tasks, while keeping each task's workload under
* {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link StageDefinition#getMaxInputBytesPerWorker()} bytes.
* {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code maxInputBytesPerWorker} bytes.
*
* Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
*/
@ -67,7 +68,8 @@ public enum WorkerAssignmentStrategy
final StageDefinition stageDef,
final InputSpec inputSpec,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer
final InputSpecSlicer slicer,
final long maxInputBytesPerSlice
)
{
if (slicer.canSliceDynamic(inputSpec)) {
@ -75,7 +77,7 @@ public enum WorkerAssignmentStrategy
inputSpec,
stageDef.getMaxWorkerCount(),
Limits.MAX_INPUT_FILES_PER_WORKER,
stageDef.getMaxInputBytesPerWorker()
maxInputBytesPerSlice
);
} else {
// In auto mode, if we can't slice inputs dynamically, we instead carry forwards the number of workers from
@ -110,10 +112,19 @@ public enum WorkerAssignmentStrategy
return StringUtils.toLowerCase(name());
}
/**
* @param stageDef current stage definition. Contains information on max workers, input stage numbers
* @param inputSpec inputSpec containing information on where the input is read from
* @param stageWorkerCountMap map of past stage number vs number of worker inputs
* @param slicer creates slices of input spec based on other parameters
* @param maxInputBytesPerSlice maximum suggested bytes per input slice
* @return list containing input slices
*/
public abstract List<InputSlice> assign(
StageDefinition stageDef,
InputSpec inputSpec,
Int2IntMap stageWorkerCountMap,
InputSpecSlicer slicer
InputSpecSlicer slicer,
long maxInputBytesPerSlice
);
}

View File

@ -162,7 +162,8 @@ public class ControllerQueryKernel
*/
public List<StageId> createAndGetNewStageIds(
final InputSpecSlicerFactory slicerFactory,
final WorkerAssignmentStrategy assignmentStrategy
final WorkerAssignmentStrategy assignmentStrategy,
final long maxInputBytesPerWorker
)
{
final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap();
@ -177,7 +178,7 @@ public class ControllerQueryKernel
}
}
createNewKernels(stageWorkerCountMap, slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy);
createNewKernels(stageWorkerCountMap, slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy, maxInputBytesPerWorker);
return stageTracker.values()
.stream()
.filter(controllerStageTracker -> controllerStageTracker.getPhase() == ControllerStagePhase.NEW)
@ -292,7 +293,8 @@ public class ControllerQueryKernel
private void createNewKernels(
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
final WorkerAssignmentStrategy assignmentStrategy
final WorkerAssignmentStrategy assignmentStrategy,
final long maxInputBytesPerWorker
)
{
for (final StageId nextStage : readyToRunStages) {
@ -303,7 +305,8 @@ public class ControllerQueryKernel
stageWorkerCountMap,
slicer,
assignmentStrategy,
maxRetainedPartitionSketchBytes
maxRetainedPartitionSketchBytes,
maxInputBytesPerWorker
);
stageTracker.put(nextStage, stageKernel);
}

View File

@ -167,10 +167,11 @@ class ControllerStageTracker
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
final WorkerAssignmentStrategy assignmentStrategy,
final int maxRetainedPartitionSketchBytes
final int maxRetainedPartitionSketchBytes,
final long maxInputBytesPerWorker
)
{
final WorkerInputs workerInputs = WorkerInputs.create(stageDef, stageWorkerCountMap, slicer, assignmentStrategy);
final WorkerInputs workerInputs = WorkerInputs.create(stageDef, stageWorkerCountMap, slicer, assignmentStrategy, maxInputBytesPerWorker);
return new ControllerStageTracker(
stageDef,
workerInputs,

View File

@ -59,7 +59,8 @@ public class WorkerInputs
final StageDefinition stageDef,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
final WorkerAssignmentStrategy assignmentStrategy
final WorkerAssignmentStrategy assignmentStrategy,
final long maxInputBytesPerWorker
)
{
// Split each inputSpec and assign to workers. This list maps worker number -> input number -> input slice.
@ -91,7 +92,13 @@ public class WorkerInputs
}
} else {
// Non-broadcast case: split slices across workers.
List<InputSlice> slices = assignmentStrategy.assign(stageDef, inputSpec, stageWorkerCountMap, slicer);
List<InputSlice> slices = assignmentStrategy.assign(
stageDef,
inputSpec,
stageWorkerCountMap,
slicer,
maxInputBytesPerWorker
);
if (slices.isEmpty()) {
// Need at least one slice, so we can have at least one worker. It's OK if it has nothing to read.

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
@ -983,10 +984,9 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testInsertQueryWithInvalidSubtaskCount()
{
Map<String, Object> localContext = ImmutableMap.<String, Object>builder()
.putAll(context)
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1)
.build();
Map<String, Object> localContext = new HashMap<>(context);
localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1);
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1")
.setQueryContext(localContext)
@ -1065,6 +1065,106 @@ public class MSQInsertTest extends MSQTestBase
.verifyPlanningErrors();
}
@Test
public void testCorrectNumberOfWorkersUsedAutoModeWithoutBytesLimit() throws IOException
{
Map<String, Object> localContext = new HashMap<>(context);
localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, WorkerAssignmentStrategy.AUTO.name());
localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
final File toRead1 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-1.json");
final String toReadFileNameAsJson1 = queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath());
final File toRead2 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-2.json");
final String toReadFileNameAsJson2 = queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath());
final File toRead3 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-3.json");
final String toReadFileNameAsJson3 = queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("cnt", ColumnType.LONG)
.build();
testIngestQuery().setSql(
"insert into foo1 select "
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " count(*) as cnt\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadFileNameAsJson1 + "," + toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1 PARTITIONED by day ")
.setExpectedDataSource("foo1")
.setQueryContext(localContext)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of(SegmentId.of(
"foo1",
Intervals.of("2016-06-27/P1D"),
"test",
0
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L}))
.setExpectedWorkerCount(
ImmutableMap.of(
0, 1
))
.verifyResults();
}
@Test
public void testCorrectNumberOfWorkersUsedAutoModeWithBytesLimit() throws IOException
{
Map<String, Object> localContext = new HashMap<>(context);
localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, WorkerAssignmentStrategy.AUTO.name());
localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
localContext.put(MultiStageQueryContext.CTX_MAX_INPUT_BYTES_PER_WORKER, 10);
final File toRead1 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-1.json");
final String toReadFileNameAsJson1 = queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath());
final File toRead2 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-2.json");
final String toReadFileNameAsJson2 = queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath());
final File toRead3 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-3.json");
final String toReadFileNameAsJson3 = queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("cnt", ColumnType.LONG)
.build();
testIngestQuery().setSql(
"insert into foo1 select "
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " count(*) as cnt\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadFileNameAsJson1 + "," + toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") group by 1 PARTITIONED by day ")
.setExpectedDataSource("foo1")
.setQueryContext(localContext)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of(SegmentId.of(
"foo1",
Intervals.of("2016-06-27/P1D"),
"test",
0
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L}))
.setExpectedWorkerCount(
ImmutableMap.of(
0, 3
))
.verifyResults();
}
@Test
public void testInsertArraysAutoType() throws IOException
{
@ -1120,7 +1220,6 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();
}
@Nonnull

View File

@ -26,7 +26,6 @@ import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
@ -57,8 +56,7 @@ public class StageDefinitionTest
RowSignature.empty(),
null,
0,
false,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
false
);
Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionBoundariesForShuffle(null));
@ -79,8 +77,7 @@ public class StageDefinitionTest
false
),
1,
false,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
false
);
Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionBoundariesForShuffle(null));
@ -101,8 +98,7 @@ public class StageDefinitionTest
false
),
1,
false,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
false
);
Assert.assertThrows(

View File

@ -27,6 +27,7 @@ import org.apache.druid.frame.key.RowKey;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.input.InputSpecSlicerFactory;
@ -232,7 +233,8 @@ public class BaseControllerQueryKernelTest extends InitializedNullHandlingTest
return mapStageIdsToStageNumbers(
controllerQueryKernel.createAndGetNewStageIds(
inputSlicerFactory,
WorkerAssignmentStrategy.MAX
WorkerAssignmentStrategy.MAX,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
)
);
}

View File

@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
@ -35,12 +36,19 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
public class WorkerInputsTest
{
private static final String QUERY_ID = "myQuery";
@ -59,7 +67,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.MAX
WorkerAssignmentStrategy.MAX,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -87,7 +96,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.MAX
WorkerAssignmentStrategy.MAX,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -115,7 +125,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -140,7 +151,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -166,7 +178,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -191,7 +204,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -216,7 +230,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -242,7 +257,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -274,7 +290,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@ -288,6 +305,127 @@ public class WorkerInputsTest
);
}
@Test
public void test_max_shouldAlwaysSplitStatic()
{
TestInputSpec inputSpecToSplit = new TestInputSpec(4_000_000_000L);
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(inputSpecToSplit)
.maxWorkerCount(3)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
TestInputSpecSlicer testInputSpecSlicer = spy(new TestInputSpecSlicer(true));
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
testInputSpecSlicer,
WorkerAssignmentStrategy.MAX,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Mockito.verify(testInputSpecSlicer, times(0)).canSliceDynamic(inputSpecToSplit);
Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), anyInt());
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(
0,
Collections.singletonList(new TestInputSlice(4_000_000_000L))
)
.put(
1,
Collections.singletonList(new TestInputSlice())
)
.put(
2,
Collections.singletonList(new TestInputSlice())
)
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_shouldSplitDynamicIfPossible()
{
TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 1_000_000_000L, 1_000_000_000L);
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(inputSpecToSplit)
.maxWorkerCount(3)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
TestInputSpecSlicer testInputSpecSlicer = spy(new TestInputSpecSlicer(true));
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
testInputSpecSlicer,
WorkerAssignmentStrategy.AUTO,
100
);
Mockito.verify(testInputSpecSlicer, times(1)).canSliceDynamic(inputSpecToSplit);
Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(), anyInt(), anyInt(), anyLong());
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(
0,
Collections.singletonList(new TestInputSlice(1_000_000_000L))
)
.put(
1,
Collections.singletonList(new TestInputSlice(1_000_000_000L))
)
.put(
2,
Collections.singletonList(new TestInputSlice(1_000_000_000L))
)
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_shouldUseLeastWorkersPossible()
{
TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 1_000_000_000L, 1_000_000_000L);
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(inputSpecToSplit)
.maxWorkerCount(3)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
TestInputSpecSlicer testInputSpecSlicer = spy(new TestInputSpecSlicer(true));
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
testInputSpecSlicer,
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(
0,
Collections.singletonList(new TestInputSlice(1_000_000_000L, 1_000_000_000L, 1_000_000_000L))
)
.build(),
inputs.assignmentsMap()
);
Mockito.verify(testInputSpecSlicer, times(1)).canSliceDynamic(inputSpecToSplit);
}
@Test
public void testEquals()
{

View File

@ -225,6 +225,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(QueryContexts.CTX_SQL_QUERY_ID, "test-query")
.put(QueryContexts.FINALIZE_KEY, true)
.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.build();
@ -799,6 +800,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
protected MSQFault expectedMSQFault = null;
protected Class<? extends MSQFault> expectedMSQFaultClass = null;
protected Map<Integer, Integer> expectedStageVsWorkerCount = new HashMap<>();
protected final Map<Integer, Map<Integer, Map<String, CounterSnapshotMatcher>>>
expectedStageWorkerChannelToCounters = new HashMap<>();
@ -893,6 +895,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
public Builder setExpectedWorkerCount(Map<Integer, Integer> stageVsWorkerCount)
{
this.expectedStageVsWorkerCount = stageVsWorkerCount;
return asBuilder();
}
public Builder setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher counterSnapshot,
int stage,
@ -925,6 +933,14 @@ public class MSQTestBase extends BaseCalciteQueryTest
MatcherAssert.assertThat(e, expectedValidationErrorMatcher);
}
protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree)
{
Map<Integer, Map<Integer, CounterSnapshots>> counterMap = counterSnapshotsTree.copyMap();
for (Map.Entry<Integer, Integer> stageWorkerCount : expectedStageVsWorkerCount.entrySet()) {
Assert.assertEquals(stageWorkerCount.getValue().intValue(), counterMap.get(stageWorkerCount.getKey()).size());
}
}
protected void verifyCounters(CounterSnapshotsTree counterSnapshotsTree)
{
Assert.assertNotNull(counterSnapshotsTree);
@ -1061,6 +1077,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
return;
}
MSQTaskReportPayload reportPayload = getPayloadOrThrow(controllerId);
verifyWorkerCount(reportPayload.getCounters());
verifyCounters(reportPayload.getCounters());
MSQSpec foundSpec = indexingServiceClient.getQuerySpecForTask(controllerId);
@ -1270,6 +1287,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
MSQTaskReportPayload payload = getPayloadOrThrow(controllerId);
verifyCounters(payload.getCounters());
verifyWorkerCount(payload.getCounters());
if (payload.getStatus().getErrorReport() != null) {
throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString());

View File

@ -63,10 +63,12 @@ import java.util.stream.Collectors;
public class MSQTestControllerContext implements ControllerContext
{
private static final Logger log = new Logger(MSQTestControllerContext.class);
private static final int NUM_WORKERS = 4;
private final TaskActionClient taskActionClient;
private final Map<String, Worker> inMemoryWorkers = new HashMap<>();
private final ConcurrentMap<String, TaskStatus> statusMap = new ConcurrentHashMap<>();
private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.singleThreaded(
private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded(
NUM_WORKERS,
"MultiStageQuery-test-controller-client"));
private final CoordinatorClient coordinatorClient;
private final DruidNode node = new DruidNode(

View File

@ -0,0 +1,7 @@
{"isRobot":true,"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","flags":"NB","isUnpatrolled":false,"page":"Salo Toraut","diffUrl":"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918","added":31,"comment":"Botskapande Indonesien omdirigering","commentLength":35,"isNew":true,"isMinor":false,"delta":31,"isAnonymous":false,"user":"Lsjbot","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
{"isRobot":false,"channel":"#en.wikipedia","cityName":"Buenos Aires","timestamp":"2016-06-27T00:00:34.959Z","flags":"","isUnpatrolled":false,"page":"Bailando 2015","countryName":"Argentina","regionIsoCode":"C","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144213&oldid=727144184","added":2,"metroCode":null,"comment":"/* Scores */","commentLength":12,"isNew":false,"isMinor":false,"delta":2,"countryIsoCode":"AR","isAnonymous":true,"user":"181.230.118.178","regionName":"Buenos Aires F.D.","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:00:36.027Z","flags":"M","isUnpatrolled":false,"page":"Richie Rich's Christmas Wish","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144214&oldid=716794477","added":0,"comment":"standard term is [[title character]]","commentLength":36,"isNew":false,"isMinor":true,"delta":-2,"isAnonymous":false,"user":"JasonAQuest","deltaBucket":-100.0,"deleted":2,"namespace":"Main"}
{"isRobot":true,"channel":"#pl.wikipedia","timestamp":"2016-06-27T00:00:58.599Z","flags":"NB","isUnpatrolled":false,"page":"Kategoria:Dyskusje nad usunięciem artykułu zakończone bez konsensusu lipiec 2016","diffUrl":"https://pl.wikipedia.org/w/index.php?oldid=46204477&rcid=68522573","added":270,"comment":"utworzenie kategorii","commentLength":20,"isNew":true,"isMinor":false,"delta":270,"isAnonymous":false,"user":"Beau.bot","deltaBucket":200.0,"deleted":0,"namespace":"Kategoria"}
{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:01:03.685Z","flags":"MB","isUnpatrolled":false,"page":"El Terco, Bachíniva","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388566&oldid=29282572&rcid=34712923","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"}
{"isRobot":true,"channel":"#ceb.wikipedia","timestamp":"2016-06-27T00:01:07.347Z","flags":"NB","isUnpatrolled":false,"page":"Neqerssuaq","diffUrl":"https://ceb.wikipedia.org/w/index.php?oldid=9563239&rcid=36193146","added":4150,"comment":"Paghimo ni bot Greenland","commentLength":24,"isNew":true,"isMinor":false,"delta":4150,"isAnonymous":false,"user":"Lsjbot","deltaBucket":4100.0,"deleted":0,"namespace":"Main"}
{"isRobot":false,"channel":"#es.wikipedia","cityName":null,"timestamp":"2016-06-27T00:01:14.343Z","flags":"","isUnpatrolled":false,"page":"Sumo (banda)","countryName":"Argentina","regionIsoCode":null,"diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937261&oldid=91937229","added":0,"metroCode":null,"comment":"/* Línea de tiempo */","commentLength":21,"isNew":false,"isMinor":false,"delta":-173,"countryIsoCode":"AR","isAnonymous":true,"user":"181.110.165.189","regionName":null,"deltaBucket":-200.0,"deleted":173,"namespace":"Main"}

View File

@ -0,0 +1,7 @@
{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:01:59.599Z","flags":"","isUnpatrolled":false,"page":"Panama Canal","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144327&oldid=727144068","added":496,"comment":"expanding lead + missing RS","commentLength":27,"isNew":false,"isMinor":false,"delta":496,"isAnonymous":false,"user":"Mariordo","deltaBucket":400.0,"deleted":0,"namespace":"Main"}
{"isRobot":false,"channel":"#ru.wikipedia","timestamp":"2016-06-27T00:02:09.238Z","flags":"","isUnpatrolled":false,"page":"Википедия:Опросы/Унификация шаблонов «Не переведено»","diffUrl":"https://ru.wikipedia.org/w/index.php?diff=79213888&oldid=79213880","added":196,"comment":"/* Нет */","commentLength":9,"isNew":false,"isMinor":false,"delta":196,"isAnonymous":false,"user":"Wanderer777","deltaBucket":100.0,"deleted":0,"namespace":"Википедия"}
{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:02:10.857Z","flags":"MB","isUnpatrolled":false,"page":"Hermanos Díaz, Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388624&oldid=29282630&rcid=34712981","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"}
{"isRobot":false,"channel":"#es.wikipedia","timestamp":"2016-06-27T01:02:13.153Z","flags":"","isUnpatrolled":false,"page":"Clasificación para la Eurocopa Sub-21 de 2017","diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937277&oldid=91937272","added":4,"comment":"/* Máximos Asistentes */","commentLength":24,"isNew":false,"isMinor":false,"delta":4,"isAnonymous":false,"user":"Guly600","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T01:02:13.815Z","flags":"","isUnpatrolled":false,"page":"中共十八大以来的反腐败工作","diffUrl":"https://zh.wikipedia.org/w/index.php?diff=40605390&oldid=40605381","added":18,"comment":"/* 违反中共中央八项规定官员(副部级及以上) */","commentLength":26,"isNew":false,"isMinor":false,"delta":18,"isAnonymous":false,"user":"2001:DA8:207:E132:94DC:BA03:DFDF:8F9F","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T01:02:15.952Z","flags":"MB","isUnpatrolled":false,"page":"El Sicomoro, Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388628&oldid=29282634&rcid=34712985","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"}
{"isRobot":false,"channel":"#id.wikipedia","timestamp":"2016-06-27T01:02:20.008Z","flags":"!","isUnpatrolled":true,"page":"Ibnu Sina","diffUrl":"https://id.wikipedia.org/w/index.php?diff=11687177&oldid=11444059&rcid=20812462","added":106,"comment":"gambar","commentLength":6,"isNew":false,"isMinor":false,"delta":106,"isAnonymous":false,"user":"Ftihikam","deltaBucket":100.0,"deleted":0,"namespace":"Main"}

View File

@ -0,0 +1,6 @@
{"isRobot":false,"channel":"#pt.wikipedia","timestamp":"2016-06-27T02:02:22.868Z","flags":"N","isUnpatrolled":false,"page":"Dobromir Zhechev","diffUrl":"https://pt.wikipedia.org/w/index.php?oldid=46012430&rcid=67145794","added":1926,"comment":"Novo","commentLength":4,"isNew":true,"isMinor":false,"delta":1926,"isAnonymous":false,"user":"Ceresta","deltaBucket":1900.0,"deleted":0,"namespace":"Main"}
{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:02:29.228Z","flags":"B","isUnpatrolled":false,"page":"Benutzer Diskussion:Squasher/Archiv/2016","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658493&oldid=155085489","added":2560,"comment":"1 Abschnitt aus [[Benutzer Diskussion:Squasher]] archiviert","commentLength":60,"isNew":false,"isMinor":false,"delta":2560,"isAnonymous":false,"user":"TaxonBot","deltaBucket":2500.0,"deleted":0,"namespace":"Benutzer Diskussion"}
{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T02:02:50.958Z","flags":"MB","isUnpatrolled":false,"page":"Trinidad Jiménez G., Benemérito de las Américas","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388657&oldid=29282663&rcid=34713014","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"}
{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T02:02:59.521Z","flags":"N","isUnpatrolled":false,"page":"Wikipedia:頁面存廢討論/記錄/2016/06/27","diffUrl":"https://zh.wikipedia.org/w/index.php?oldid=40605393&rcid=62748523","added":1986,"comment":"添加[[李洛能八大弟子]]","commentLength":13,"isNew":true,"isMinor":false,"delta":1986,"isAnonymous":false,"user":"Tigerzeng","deltaBucket":1900.0,"deleted":0,"namespace":"Wikipedia"}
{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T02:03:01.090Z","flags":"","isUnpatrolled":false,"page":"File:Paint.net 4.0.6 screenshot.png","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144426&oldid=713167833","added":0,"comment":"/* Summary */","commentLength":13,"isNew":false,"isMinor":false,"delta":-463,"isAnonymous":false,"user":"Calvin Hogg","deltaBucket":-500.0,"deleted":463,"namespace":"File"}
{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:03:05.240Z","flags":"B","isUnpatrolled":false,"page":"Benutzer Diskussion:HerrSonderbar","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658496&oldid=155657380","added":364,"comment":"Neuer Abschnitt /* Benachrichtigung über inaktive Mentees am 27. 6. 2016 */","commentLength":75,"isNew":false,"isMinor":false,"delta":364,"isAnonymous":false,"user":"GiftBot","deltaBucket":300.0,"deleted":0,"namespace":"Benutzer Diskussion"}