diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index a7aeffe0b17..b39249d8111 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1599,14 +1599,10 @@ public class ControllerImpl implements Controller final DataSchema dataSchema = generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); - final long maxInputBytesPerWorker = - MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getQuery().context()); - builder.add( StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) .maxWorkerCount(tuningConfig.getMaxNumWorkers()) - .maxInputBytesPerWorker(maxInputBytesPerWorker) .processorFactory( new SegmentGeneratorFrameProcessorFactory( dataSchema, @@ -2406,10 +2402,14 @@ public class ControllerImpl implements Controller */ private void startStages() throws IOException, InterruptedException { + final long maxInputBytesPerWorker = + MultiStageQueryContext.getMaxInputBytesPerWorker(task.getQuerySpec().getQuery().context()); + logKernelStatus(queryDef.getQueryId(), queryKernel); final List newStageIds = queryKernel.createAndGetNewStageIds( inputSpecSlicerFactory, - task.getQuerySpec().getAssignmentStrategy() + task.getQuerySpec().getAssignmentStrategy(), + maxInputBytesPerWorker ); for (final StageId stageId : newStageIds) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java index 9892eae8241..d333edb7d2d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java @@ -40,7 +40,6 @@ import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; @@ -93,7 +92,6 @@ public class StageDefinition private final FrameProcessorFactory processorFactory; private final RowSignature signature; private final int maxWorkerCount; - private final long maxInputBytesPerWorker; private final boolean shuffleCheckHasMultipleValues; @Nullable @@ -111,8 +109,7 @@ public class StageDefinition @JsonProperty("signature") final RowSignature signature, @Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec, @JsonProperty("maxWorkerCount") final int maxWorkerCount, - @JsonProperty("shuffleCheckHasMultipleValues") final boolean shuffleCheckHasMultipleValues, - @JsonProperty("maxInputBytesPerWorker") final Long maxInputBytesPerWorker + @JsonProperty("shuffleCheckHasMultipleValues") final boolean shuffleCheckHasMultipleValues ) { this.id = Preconditions.checkNotNull(id, "id"); @@ -132,8 +129,6 @@ public class StageDefinition this.maxWorkerCount = maxWorkerCount; this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues; this.frameReader = Suppliers.memoize(() -> FrameReader.create(signature))::get; - this.maxInputBytesPerWorker = maxInputBytesPerWorker == null ? - Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER : maxInputBytesPerWorker; if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().getColumns().isEmpty()) { throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", shuffleSpec); @@ -280,12 +275,6 @@ public class StageDefinition return maxWorkerCount; } - @JsonProperty - public long getMaxInputBytesPerWorker() - { - return maxInputBytesPerWorker; - } - @JsonProperty("shuffleCheckHasMultipleValues") @JsonInclude(JsonInclude.Include.NON_DEFAULT) boolean getShuffleCheckHasMultipleValues() @@ -412,8 +401,7 @@ public class StageDefinition && Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers) && Objects.equals(processorFactory, that.processorFactory) && Objects.equals(signature, that.signature) - && Objects.equals(shuffleSpec, that.shuffleSpec) - && Objects.equals(maxInputBytesPerWorker, that.maxInputBytesPerWorker); + && Objects.equals(shuffleSpec, that.shuffleSpec); } @Override @@ -427,8 +415,7 @@ public class StageDefinition signature, maxWorkerCount, shuffleCheckHasMultipleValues, - shuffleSpec, - maxInputBytesPerWorker + shuffleSpec ); } @@ -444,7 +431,6 @@ public class StageDefinition ", maxWorkerCount=" + maxWorkerCount + ", shuffleSpec=" + shuffleSpec + (shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues=" + shuffleCheckHasMultipleValues : "") + - ", maxInputBytesPerWorker=" + maxInputBytesPerWorker + '}'; } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java index 3ff9594123e..ebf5cfd7095 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java @@ -21,7 +21,6 @@ package org.apache.druid.msq.kernel; import it.unimi.dsi.fastutil.ints.IntRBTreeSet; import it.unimi.dsi.fastutil.ints.IntSet; -import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.segment.column.RowSignature; @@ -43,7 +42,6 @@ public class StageDefinitionBuilder private int maxWorkerCount = 1; private ShuffleSpec shuffleSpec = null; private boolean shuffleCheckHasMultipleValues = false; - private long maxInputBytesPerWorker = Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER; /** * Package-private: callers should prefer {@link StageDefinition#builder(int)} rather than this constructor. @@ -107,12 +105,6 @@ public class StageDefinitionBuilder return this; } - public StageDefinitionBuilder maxInputBytesPerWorker(final long maxInputBytesPerWorker) - { - this.maxInputBytesPerWorker = maxInputBytesPerWorker; - return this; - } - int getStageNumber() { return stageNumber; @@ -133,8 +125,7 @@ public class StageDefinitionBuilder signature, shuffleSpec, maxWorkerCount, - shuffleCheckHasMultipleValues, - maxInputBytesPerWorker + shuffleCheckHasMultipleValues ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java index c813778f459..8d1832be17f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java @@ -45,10 +45,11 @@ public enum WorkerAssignmentStrategy MAX { @Override public List assign( - StageDefinition stageDef, - InputSpec inputSpec, - Int2IntMap stageWorkerCountMap, - InputSpecSlicer slicer + final StageDefinition stageDef, + final InputSpec inputSpec, + final Int2IntMap stageWorkerCountMap, + final InputSpecSlicer slicer, + final long maxInputBytesPerSlice ) { return slicer.sliceStatic(inputSpec, stageDef.getMaxWorkerCount()); @@ -57,7 +58,7 @@ public enum WorkerAssignmentStrategy /** * Use the lowest possible number of tasks, while keeping each task's workload under - * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link StageDefinition#getMaxInputBytesPerWorker()} bytes. + * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code maxInputBytesPerWorker} bytes. * * Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible. */ @@ -67,7 +68,8 @@ public enum WorkerAssignmentStrategy final StageDefinition stageDef, final InputSpec inputSpec, final Int2IntMap stageWorkerCountMap, - final InputSpecSlicer slicer + final InputSpecSlicer slicer, + final long maxInputBytesPerSlice ) { if (slicer.canSliceDynamic(inputSpec)) { @@ -75,7 +77,7 @@ public enum WorkerAssignmentStrategy inputSpec, stageDef.getMaxWorkerCount(), Limits.MAX_INPUT_FILES_PER_WORKER, - stageDef.getMaxInputBytesPerWorker() + maxInputBytesPerSlice ); } else { // In auto mode, if we can't slice inputs dynamically, we instead carry forwards the number of workers from @@ -110,10 +112,19 @@ public enum WorkerAssignmentStrategy return StringUtils.toLowerCase(name()); } + /** + * @param stageDef current stage definition. Contains information on max workers, input stage numbers + * @param inputSpec inputSpec containing information on where the input is read from + * @param stageWorkerCountMap map of past stage number vs number of worker inputs + * @param slicer creates slices of input spec based on other parameters + * @param maxInputBytesPerSlice maximum suggested bytes per input slice + * @return list containing input slices + */ public abstract List assign( StageDefinition stageDef, InputSpec inputSpec, Int2IntMap stageWorkerCountMap, - InputSpecSlicer slicer + InputSpecSlicer slicer, + long maxInputBytesPerSlice ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index ddd7b633b39..62a45c4eaa8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -162,7 +162,8 @@ public class ControllerQueryKernel */ public List createAndGetNewStageIds( final InputSpecSlicerFactory slicerFactory, - final WorkerAssignmentStrategy assignmentStrategy + final WorkerAssignmentStrategy assignmentStrategy, + final long maxInputBytesPerWorker ) { final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap(); @@ -177,7 +178,7 @@ public class ControllerQueryKernel } } - createNewKernels(stageWorkerCountMap, slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy); + createNewKernels(stageWorkerCountMap, slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy, maxInputBytesPerWorker); return stageTracker.values() .stream() .filter(controllerStageTracker -> controllerStageTracker.getPhase() == ControllerStagePhase.NEW) @@ -292,7 +293,8 @@ public class ControllerQueryKernel private void createNewKernels( final Int2IntMap stageWorkerCountMap, final InputSpecSlicer slicer, - final WorkerAssignmentStrategy assignmentStrategy + final WorkerAssignmentStrategy assignmentStrategy, + final long maxInputBytesPerWorker ) { for (final StageId nextStage : readyToRunStages) { @@ -303,7 +305,8 @@ public class ControllerQueryKernel stageWorkerCountMap, slicer, assignmentStrategy, - maxRetainedPartitionSketchBytes + maxRetainedPartitionSketchBytes, + maxInputBytesPerWorker ); stageTracker.put(nextStage, stageKernel); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java index 6619f389f60..e0190bfacb3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java @@ -167,10 +167,11 @@ class ControllerStageTracker final Int2IntMap stageWorkerCountMap, final InputSpecSlicer slicer, final WorkerAssignmentStrategy assignmentStrategy, - final int maxRetainedPartitionSketchBytes + final int maxRetainedPartitionSketchBytes, + final long maxInputBytesPerWorker ) { - final WorkerInputs workerInputs = WorkerInputs.create(stageDef, stageWorkerCountMap, slicer, assignmentStrategy); + final WorkerInputs workerInputs = WorkerInputs.create(stageDef, stageWorkerCountMap, slicer, assignmentStrategy, maxInputBytesPerWorker); return new ControllerStageTracker( stageDef, workerInputs, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java index da27ceba650..83d7a602bc1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java @@ -59,7 +59,8 @@ public class WorkerInputs final StageDefinition stageDef, final Int2IntMap stageWorkerCountMap, final InputSpecSlicer slicer, - final WorkerAssignmentStrategy assignmentStrategy + final WorkerAssignmentStrategy assignmentStrategy, + final long maxInputBytesPerWorker ) { // Split each inputSpec and assign to workers. This list maps worker number -> input number -> input slice. @@ -91,7 +92,13 @@ public class WorkerInputs } } else { // Non-broadcast case: split slices across workers. - List slices = assignmentStrategy.assign(stageDef, inputSpec, stageWorkerCountMap, slicer); + List slices = assignmentStrategy.assign( + stageDef, + inputSpec, + stageWorkerCountMap, + slicer, + maxInputBytesPerWorker + ); if (slices.isEmpty()) { // Need at least one slice, so we can have at least one worker. It's OK if it has nothing to read. diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 62960657626..479bc4de0d2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault; import org.apache.druid.msq.indexing.error.RowTooLargeFault; +import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; @@ -983,10 +984,9 @@ public class MSQInsertTest extends MSQTestBase @Test public void testInsertQueryWithInvalidSubtaskCount() { - Map localContext = ImmutableMap.builder() - .putAll(context) - .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1) - .build(); + Map localContext = new HashMap<>(context); + localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1); + testIngestQuery().setSql( "insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1") .setQueryContext(localContext) @@ -1065,6 +1065,106 @@ public class MSQInsertTest extends MSQTestBase .verifyPlanningErrors(); } + @Test + public void testCorrectNumberOfWorkersUsedAutoModeWithoutBytesLimit() throws IOException + { + Map localContext = new HashMap<>(context); + localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, WorkerAssignmentStrategy.AUTO.name()); + localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4); + + final File toRead1 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-1.json"); + final String toReadFileNameAsJson1 = queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath()); + + final File toRead2 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-2.json"); + final String toReadFileNameAsJson2 = queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath()); + + final File toRead3 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-3.json"); + final String toReadFileNameAsJson3 = queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath()); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("cnt", ColumnType.LONG) + .build(); + + testIngestQuery().setSql( + "insert into foo1 select " + + " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n" + + " count(*) as cnt\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + toReadFileNameAsJson1 + "," + toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n" + + " )\n" + + ") group by 1 PARTITIONED by day ") + .setExpectedDataSource("foo1") + .setQueryContext(localContext) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foo1", + Intervals.of("2016-06-27/P1D"), + "test", + 0 + ))) + .setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L})) + .setExpectedWorkerCount( + ImmutableMap.of( + 0, 1 + )) + .verifyResults(); + + } + + @Test + public void testCorrectNumberOfWorkersUsedAutoModeWithBytesLimit() throws IOException + { + Map localContext = new HashMap<>(context); + localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, WorkerAssignmentStrategy.AUTO.name()); + localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4); + localContext.put(MultiStageQueryContext.CTX_MAX_INPUT_BYTES_PER_WORKER, 10); + + final File toRead1 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-1.json"); + final String toReadFileNameAsJson1 = queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath()); + + final File toRead2 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-2.json"); + final String toReadFileNameAsJson2 = queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath()); + + final File toRead3 = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/multipleFiles/wikipedia-sampled-3.json"); + final String toReadFileNameAsJson3 = queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath()); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("cnt", ColumnType.LONG) + .build(); + + testIngestQuery().setSql( + "insert into foo1 select " + + " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n" + + " count(*) as cnt\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + toReadFileNameAsJson1 + "," + toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n" + + " )\n" + + ") group by 1 PARTITIONED by day ") + .setExpectedDataSource("foo1") + .setQueryContext(localContext) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foo1", + Intervals.of("2016-06-27/P1D"), + "test", + 0 + ))) + .setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L})) + .setExpectedWorkerCount( + ImmutableMap.of( + 0, 3 + )) + .verifyResults(); + } + @Test public void testInsertArraysAutoType() throws IOException { @@ -1120,7 +1220,6 @@ public class MSQInsertTest extends MSQTestBase .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) .verifyResults(); - } @Nonnull diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java index 793c4293f11..20110b63329 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java @@ -26,7 +26,6 @@ import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.frame.key.KeyOrder; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl; @@ -57,8 +56,7 @@ public class StageDefinitionTest RowSignature.empty(), null, 0, - false, - Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER + false ); Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionBoundariesForShuffle(null)); @@ -79,8 +77,7 @@ public class StageDefinitionTest false ), 1, - false, - Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER + false ); Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionBoundariesForShuffle(null)); @@ -101,8 +98,7 @@ public class StageDefinitionTest false ), 1, - false, - Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER + false ); Assert.assertThrows( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java index 1e0177ae120..6ae18dda1e1 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java @@ -27,6 +27,7 @@ import org.apache.druid.frame.key.RowKey; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.indexing.error.MSQFault; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.input.InputSpecSlicerFactory; @@ -232,7 +233,8 @@ public class BaseControllerQueryKernelTest extends InitializedNullHandlingTest return mapStageIdsToStageNumbers( controllerQueryKernel.createAndGetNewStageIds( inputSlicerFactory, - WorkerAssignmentStrategy.MAX + WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ) ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java index 6a97e0b0bcb..f59ae121d4b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java @@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntSet; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; @@ -35,12 +36,19 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; + public class WorkerInputsTest { private static final String QUERY_ID = "myQuery"; @@ -59,7 +67,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.MAX + WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -87,7 +96,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.MAX + WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -115,7 +125,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.AUTO + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -140,7 +151,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.AUTO + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -166,7 +178,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.AUTO + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -191,7 +204,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.AUTO + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -216,7 +230,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.AUTO + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -242,7 +257,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.AUTO + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -274,7 +290,8 @@ public class WorkerInputsTest stageDef, Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(true), - WorkerAssignmentStrategy.AUTO + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); Assert.assertEquals( @@ -288,6 +305,127 @@ public class WorkerInputsTest ); } + @Test + public void test_max_shouldAlwaysSplitStatic() + { + TestInputSpec inputSpecToSplit = new TestInputSpec(4_000_000_000L); + final StageDefinition stageDef = + StageDefinition.builder(0) + .inputs(inputSpecToSplit) + .maxWorkerCount(3) + .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L)) + .build(QUERY_ID); + + TestInputSpecSlicer testInputSpecSlicer = spy(new TestInputSpecSlicer(true)); + + final WorkerInputs inputs = WorkerInputs.create( + stageDef, + Int2IntMaps.EMPTY_MAP, + testInputSpecSlicer, + WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER + ); + + Mockito.verify(testInputSpecSlicer, times(0)).canSliceDynamic(inputSpecToSplit); + Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), anyInt()); + + Assert.assertEquals( + ImmutableMap.>builder() + .put( + 0, + Collections.singletonList(new TestInputSlice(4_000_000_000L)) + ) + .put( + 1, + Collections.singletonList(new TestInputSlice()) + ) + .put( + 2, + Collections.singletonList(new TestInputSlice()) + ) + .build(), + inputs.assignmentsMap() + ); + + } + + @Test + public void test_auto_shouldSplitDynamicIfPossible() + { + TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 1_000_000_000L, 1_000_000_000L); + final StageDefinition stageDef = + StageDefinition.builder(0) + .inputs(inputSpecToSplit) + .maxWorkerCount(3) + .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L)) + .build(QUERY_ID); + + TestInputSpecSlicer testInputSpecSlicer = spy(new TestInputSpecSlicer(true)); + + final WorkerInputs inputs = WorkerInputs.create( + stageDef, + Int2IntMaps.EMPTY_MAP, + testInputSpecSlicer, + WorkerAssignmentStrategy.AUTO, + 100 + ); + + Mockito.verify(testInputSpecSlicer, times(1)).canSliceDynamic(inputSpecToSplit); + Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(), anyInt(), anyInt(), anyLong()); + + Assert.assertEquals( + ImmutableMap.>builder() + .put( + 0, + Collections.singletonList(new TestInputSlice(1_000_000_000L)) + ) + .put( + 1, + Collections.singletonList(new TestInputSlice(1_000_000_000L)) + ) + .put( + 2, + Collections.singletonList(new TestInputSlice(1_000_000_000L)) + ) + .build(), + inputs.assignmentsMap() + ); + } + + @Test + public void test_auto_shouldUseLeastWorkersPossible() + { + TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 1_000_000_000L, 1_000_000_000L); + final StageDefinition stageDef = + StageDefinition.builder(0) + .inputs(inputSpecToSplit) + .maxWorkerCount(3) + .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L)) + .build(QUERY_ID); + + TestInputSpecSlicer testInputSpecSlicer = spy(new TestInputSpecSlicer(true)); + + final WorkerInputs inputs = WorkerInputs.create( + stageDef, + Int2IntMaps.EMPTY_MAP, + testInputSpecSlicer, + WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER + ); + + Assert.assertEquals( + ImmutableMap.>builder() + .put( + 0, + Collections.singletonList(new TestInputSlice(1_000_000_000L, 1_000_000_000L, 1_000_000_000L)) + ) + .build(), + inputs.assignmentsMap() + ); + + Mockito.verify(testInputSpecSlicer, times(1)).canSliceDynamic(inputSpecToSplit); + } + @Test public void testEquals() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index d768c73d80d..e44d1974b8a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -225,6 +225,7 @@ public class MSQTestBase extends BaseCalciteQueryTest .put(QueryContexts.CTX_SQL_QUERY_ID, "test-query") .put(QueryContexts.FINALIZE_KEY, true) .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false) + .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2) .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0) .build(); @@ -799,6 +800,7 @@ public class MSQTestBase extends BaseCalciteQueryTest protected Matcher expectedExecutionErrorMatcher = null; protected MSQFault expectedMSQFault = null; protected Class expectedMSQFaultClass = null; + protected Map expectedStageVsWorkerCount = new HashMap<>(); protected final Map>> expectedStageWorkerChannelToCounters = new HashMap<>(); @@ -893,6 +895,12 @@ public class MSQTestBase extends BaseCalciteQueryTest return asBuilder(); } + public Builder setExpectedWorkerCount(Map stageVsWorkerCount) + { + this.expectedStageVsWorkerCount = stageVsWorkerCount; + return asBuilder(); + } + public Builder setExpectedSegmentGenerationProgressCountersForStageWorker( CounterSnapshotMatcher counterSnapshot, int stage, @@ -925,6 +933,14 @@ public class MSQTestBase extends BaseCalciteQueryTest MatcherAssert.assertThat(e, expectedValidationErrorMatcher); } + protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree) + { + Map> counterMap = counterSnapshotsTree.copyMap(); + for (Map.Entry stageWorkerCount : expectedStageVsWorkerCount.entrySet()) { + Assert.assertEquals(stageWorkerCount.getValue().intValue(), counterMap.get(stageWorkerCount.getKey()).size()); + } + } + protected void verifyCounters(CounterSnapshotsTree counterSnapshotsTree) { Assert.assertNotNull(counterSnapshotsTree); @@ -1061,6 +1077,7 @@ public class MSQTestBase extends BaseCalciteQueryTest return; } MSQTaskReportPayload reportPayload = getPayloadOrThrow(controllerId); + verifyWorkerCount(reportPayload.getCounters()); verifyCounters(reportPayload.getCounters()); MSQSpec foundSpec = indexingServiceClient.getQuerySpecForTask(controllerId); @@ -1270,6 +1287,7 @@ public class MSQTestBase extends BaseCalciteQueryTest MSQTaskReportPayload payload = getPayloadOrThrow(controllerId); verifyCounters(payload.getCounters()); + verifyWorkerCount(payload.getCounters()); if (payload.getStatus().getErrorReport() != null) { throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 73cd900a5be..58ee01f0087 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -63,10 +63,12 @@ import java.util.stream.Collectors; public class MSQTestControllerContext implements ControllerContext { private static final Logger log = new Logger(MSQTestControllerContext.class); + private static final int NUM_WORKERS = 4; private final TaskActionClient taskActionClient; private final Map inMemoryWorkers = new HashMap<>(); private final ConcurrentMap statusMap = new ConcurrentHashMap<>(); - private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.singleThreaded( + private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded( + NUM_WORKERS, "MultiStageQuery-test-controller-client")); private final CoordinatorClient coordinatorClient; private final DruidNode node = new DruidNode( diff --git a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json new file mode 100644 index 00000000000..26002ac687a --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json @@ -0,0 +1,7 @@ +{"isRobot":true,"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","flags":"NB","isUnpatrolled":false,"page":"Salo Toraut","diffUrl":"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918","added":31,"comment":"Botskapande Indonesien omdirigering","commentLength":35,"isNew":true,"isMinor":false,"delta":31,"isAnonymous":false,"user":"Lsjbot","deltaBucket":0.0,"deleted":0,"namespace":"Main"} +{"isRobot":false,"channel":"#en.wikipedia","cityName":"Buenos Aires","timestamp":"2016-06-27T00:00:34.959Z","flags":"","isUnpatrolled":false,"page":"Bailando 2015","countryName":"Argentina","regionIsoCode":"C","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144213&oldid=727144184","added":2,"metroCode":null,"comment":"/* Scores */","commentLength":12,"isNew":false,"isMinor":false,"delta":2,"countryIsoCode":"AR","isAnonymous":true,"user":"181.230.118.178","regionName":"Buenos Aires F.D.","deltaBucket":0.0,"deleted":0,"namespace":"Main"} +{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:00:36.027Z","flags":"M","isUnpatrolled":false,"page":"Richie Rich's Christmas Wish","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144214&oldid=716794477","added":0,"comment":"standard term is [[title character]]","commentLength":36,"isNew":false,"isMinor":true,"delta":-2,"isAnonymous":false,"user":"JasonAQuest","deltaBucket":-100.0,"deleted":2,"namespace":"Main"} +{"isRobot":true,"channel":"#pl.wikipedia","timestamp":"2016-06-27T00:00:58.599Z","flags":"NB","isUnpatrolled":false,"page":"Kategoria:Dyskusje nad usunięciem artykułu zakończone bez konsensusu − lipiec 2016","diffUrl":"https://pl.wikipedia.org/w/index.php?oldid=46204477&rcid=68522573","added":270,"comment":"utworzenie kategorii","commentLength":20,"isNew":true,"isMinor":false,"delta":270,"isAnonymous":false,"user":"Beau.bot","deltaBucket":200.0,"deleted":0,"namespace":"Kategoria"} +{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:01:03.685Z","flags":"MB","isUnpatrolled":false,"page":"El Terco, Bachíniva","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388566&oldid=29282572&rcid=34712923","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"} +{"isRobot":true,"channel":"#ceb.wikipedia","timestamp":"2016-06-27T00:01:07.347Z","flags":"NB","isUnpatrolled":false,"page":"Neqerssuaq","diffUrl":"https://ceb.wikipedia.org/w/index.php?oldid=9563239&rcid=36193146","added":4150,"comment":"Paghimo ni bot Greenland","commentLength":24,"isNew":true,"isMinor":false,"delta":4150,"isAnonymous":false,"user":"Lsjbot","deltaBucket":4100.0,"deleted":0,"namespace":"Main"} +{"isRobot":false,"channel":"#es.wikipedia","cityName":null,"timestamp":"2016-06-27T00:01:14.343Z","flags":"","isUnpatrolled":false,"page":"Sumo (banda)","countryName":"Argentina","regionIsoCode":null,"diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937261&oldid=91937229","added":0,"metroCode":null,"comment":"/* Línea de tiempo */","commentLength":21,"isNew":false,"isMinor":false,"delta":-173,"countryIsoCode":"AR","isAnonymous":true,"user":"181.110.165.189","regionName":null,"deltaBucket":-200.0,"deleted":173,"namespace":"Main"} diff --git a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json new file mode 100644 index 00000000000..ae560bf8f20 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json @@ -0,0 +1,7 @@ +{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:01:59.599Z","flags":"","isUnpatrolled":false,"page":"Panama Canal","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144327&oldid=727144068","added":496,"comment":"expanding lead + missing RS","commentLength":27,"isNew":false,"isMinor":false,"delta":496,"isAnonymous":false,"user":"Mariordo","deltaBucket":400.0,"deleted":0,"namespace":"Main"} +{"isRobot":false,"channel":"#ru.wikipedia","timestamp":"2016-06-27T00:02:09.238Z","flags":"","isUnpatrolled":false,"page":"Википедия:Опросы/Унификация шаблонов «Не переведено»","diffUrl":"https://ru.wikipedia.org/w/index.php?diff=79213888&oldid=79213880","added":196,"comment":"/* Нет */","commentLength":9,"isNew":false,"isMinor":false,"delta":196,"isAnonymous":false,"user":"Wanderer777","deltaBucket":100.0,"deleted":0,"namespace":"Википедия"} +{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:02:10.857Z","flags":"MB","isUnpatrolled":false,"page":"Hermanos Díaz, Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388624&oldid=29282630&rcid=34712981","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"} +{"isRobot":false,"channel":"#es.wikipedia","timestamp":"2016-06-27T01:02:13.153Z","flags":"","isUnpatrolled":false,"page":"Clasificación para la Eurocopa Sub-21 de 2017","diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937277&oldid=91937272","added":4,"comment":"/* Máximos Asistentes */","commentLength":24,"isNew":false,"isMinor":false,"delta":4,"isAnonymous":false,"user":"Guly600","deltaBucket":0.0,"deleted":0,"namespace":"Main"} +{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T01:02:13.815Z","flags":"","isUnpatrolled":false,"page":"中共十八大以来的反腐败工作","diffUrl":"https://zh.wikipedia.org/w/index.php?diff=40605390&oldid=40605381","added":18,"comment":"/* 违反中共中央八项规定官员(副部级及以上) */","commentLength":26,"isNew":false,"isMinor":false,"delta":18,"isAnonymous":false,"user":"2001:DA8:207:E132:94DC:BA03:DFDF:8F9F","deltaBucket":0.0,"deleted":0,"namespace":"Main"} +{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T01:02:15.952Z","flags":"MB","isUnpatrolled":false,"page":"El Sicomoro, Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388628&oldid=29282634&rcid=34712985","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"} +{"isRobot":false,"channel":"#id.wikipedia","timestamp":"2016-06-27T01:02:20.008Z","flags":"!","isUnpatrolled":true,"page":"Ibnu Sina","diffUrl":"https://id.wikipedia.org/w/index.php?diff=11687177&oldid=11444059&rcid=20812462","added":106,"comment":"gambar","commentLength":6,"isNew":false,"isMinor":false,"delta":106,"isAnonymous":false,"user":"Ftihikam","deltaBucket":100.0,"deleted":0,"namespace":"Main"} diff --git a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json new file mode 100644 index 00000000000..6604b51fb1c --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json @@ -0,0 +1,6 @@ +{"isRobot":false,"channel":"#pt.wikipedia","timestamp":"2016-06-27T02:02:22.868Z","flags":"N","isUnpatrolled":false,"page":"Dobromir Zhechev","diffUrl":"https://pt.wikipedia.org/w/index.php?oldid=46012430&rcid=67145794","added":1926,"comment":"Novo","commentLength":4,"isNew":true,"isMinor":false,"delta":1926,"isAnonymous":false,"user":"Ceresta","deltaBucket":1900.0,"deleted":0,"namespace":"Main"} +{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:02:29.228Z","flags":"B","isUnpatrolled":false,"page":"Benutzer Diskussion:Squasher/Archiv/2016","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658493&oldid=155085489","added":2560,"comment":"1 Abschnitt aus [[Benutzer Diskussion:Squasher]] archiviert","commentLength":60,"isNew":false,"isMinor":false,"delta":2560,"isAnonymous":false,"user":"TaxonBot","deltaBucket":2500.0,"deleted":0,"namespace":"Benutzer Diskussion"} +{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T02:02:50.958Z","flags":"MB","isUnpatrolled":false,"page":"Trinidad Jiménez G., Benemérito de las Américas","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388657&oldid=29282663&rcid=34713014","added":0,"comment":"Bot: Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]] +[[Administrativna podjela Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":-100.0,"deleted":1,"namespace":"Main"} +{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T02:02:59.521Z","flags":"N","isUnpatrolled":false,"page":"Wikipedia:頁面存廢討論/記錄/2016/06/27","diffUrl":"https://zh.wikipedia.org/w/index.php?oldid=40605393&rcid=62748523","added":1986,"comment":"添加[[李洛能八大弟子]]","commentLength":13,"isNew":true,"isMinor":false,"delta":1986,"isAnonymous":false,"user":"Tigerzeng","deltaBucket":1900.0,"deleted":0,"namespace":"Wikipedia"} +{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T02:03:01.090Z","flags":"","isUnpatrolled":false,"page":"File:Paint.net 4.0.6 screenshot.png","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144426&oldid=713167833","added":0,"comment":"/* Summary */","commentLength":13,"isNew":false,"isMinor":false,"delta":-463,"isAnonymous":false,"user":"Calvin Hogg","deltaBucket":-500.0,"deleted":463,"namespace":"File"} +{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:03:05.240Z","flags":"B","isUnpatrolled":false,"page":"Benutzer Diskussion:HerrSonderbar","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658496&oldid=155657380","added":364,"comment":"Neuer Abschnitt /* Benachrichtigung über inaktive Mentees am 27. 6. 2016 */","commentLength":75,"isNew":false,"isMinor":false,"delta":364,"isAnonymous":false,"user":"GiftBot","deltaBucket":300.0,"deleted":0,"namespace":"Benutzer Diskussion"}