mirror of https://github.com/apache/druid.git
* MSQ: Use leaf worker count for stages that have any leaf inputs. (#17312) (cherry picked from commitb27712933e
) * MSQ: Call "onQueryComplete" after the query is closed. (#17313) (cherry picked from commit4092f3fe47
) * Dart: Only use historicals as workers. (#17319) (cherry picked from commit074944e02c
) --------- Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This commit is contained in:
parent
f72dbfd9cd
commit
06c1a6a31e
|
@ -48,6 +48,7 @@ import org.apache.druid.query.Query;
|
||||||
import org.apache.druid.query.QueryContext;
|
import org.apache.druid.query.QueryContext;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.server.coordination.DruidServerMetadata;
|
import org.apache.druid.server.coordination.DruidServerMetadata;
|
||||||
|
import org.apache.druid.server.coordination.ServerType;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -122,8 +123,10 @@ public class DartControllerContext implements ControllerContext
|
||||||
// since the serverView is referenced shortly after the worker list is created.
|
// since the serverView is referenced shortly after the worker list is created.
|
||||||
final List<String> workerIds = new ArrayList<>(servers.size());
|
final List<String> workerIds = new ArrayList<>(servers.size());
|
||||||
for (final DruidServerMetadata server : servers) {
|
for (final DruidServerMetadata server : servers) {
|
||||||
|
if (server.getType() == ServerType.HISTORICAL) {
|
||||||
workerIds.add(WorkerId.fromDruidServerMetadata(server, queryId).toString());
|
workerIds.add(WorkerId.fromDruidServerMetadata(server, queryId).toString());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Shuffle workerIds, so we don't bias towards specific servers when running multiple queries concurrently. For any
|
// Shuffle workerIds, so we don't bias towards specific servers when running multiple queries concurrently. For any
|
||||||
// given query, lower-numbered workers tend to do more work, because the controller prefers using lower-numbered
|
// given query, lower-numbered workers tend to do more work, because the controller prefers using lower-numbered
|
||||||
|
|
|
@ -323,9 +323,12 @@ public class ControllerImpl implements Controller
|
||||||
@Override
|
@Override
|
||||||
public void run(final QueryListener queryListener) throws Exception
|
public void run(final QueryListener queryListener) throws Exception
|
||||||
{
|
{
|
||||||
|
final MSQTaskReportPayload reportPayload;
|
||||||
try (final Closer closer = Closer.create()) {
|
try (final Closer closer = Closer.create()) {
|
||||||
runInternal(queryListener, closer);
|
reportPayload = runInternal(queryListener, closer);
|
||||||
}
|
}
|
||||||
|
// Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing.
|
||||||
|
queryListener.onQueryComplete(reportPayload);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -348,7 +351,7 @@ public class ControllerImpl implements Controller
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runInternal(final QueryListener queryListener, final Closer closer)
|
private MSQTaskReportPayload runInternal(final QueryListener queryListener, final Closer closer)
|
||||||
{
|
{
|
||||||
QueryDefinition queryDef = null;
|
QueryDefinition queryDef = null;
|
||||||
ControllerQueryKernel queryKernel = null;
|
ControllerQueryKernel queryKernel = null;
|
||||||
|
@ -511,7 +514,7 @@ public class ControllerImpl implements Controller
|
||||||
stagesReport = null;
|
stagesReport = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload(
|
return new MSQTaskReportPayload(
|
||||||
makeStatusReport(
|
makeStatusReport(
|
||||||
taskStateForReport,
|
taskStateForReport,
|
||||||
errorForReport,
|
errorForReport,
|
||||||
|
@ -526,8 +529,6 @@ public class ControllerImpl implements Controller
|
||||||
countersSnapshot,
|
countersSnapshot,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
queryListener.onQueryComplete(taskReportPayload);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.msq.input;
|
||||||
import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
|
import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
|
||||||
import it.unimi.dsi.fastutil.ints.IntSet;
|
import it.unimi.dsi.fastutil.ints.IntSet;
|
||||||
import org.apache.druid.msq.input.stage.StageInputSpec;
|
import org.apache.druid.msq.input.stage.StageInputSpec;
|
||||||
|
import org.apache.druid.msq.kernel.StageDefinition;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -50,4 +51,23 @@ public class InputSpecs
|
||||||
|
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether any of the provided input specs are leafs. Leafs are anything that is not broadcast and not another
|
||||||
|
* stage. For example, regular tables and external files are leafs.
|
||||||
|
*
|
||||||
|
* @param inputSpecs list of input specs, corresponds to {@link StageDefinition#getInputSpecs()}
|
||||||
|
* @param broadcastInputs positions in "inputSpecs" which are broadcast specs, corresponds to
|
||||||
|
* {@link StageDefinition#getBroadcastInputNumbers()}
|
||||||
|
*/
|
||||||
|
public static boolean hasLeafInputs(final List<InputSpec> inputSpecs, final IntSet broadcastInputs)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < inputSpecs.size(); i++) {
|
||||||
|
final InputSpec spec = inputSpecs.get(i);
|
||||||
|
if (!broadcastInputs.contains(i) && !(spec instanceof StageInputSpec)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.java.util.common.UOE;
|
import org.apache.druid.java.util.common.UOE;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.msq.input.InputSpec;
|
import org.apache.druid.msq.input.InputSpec;
|
||||||
|
import org.apache.druid.msq.input.InputSpecs;
|
||||||
import org.apache.druid.msq.input.external.ExternalInputSpec;
|
import org.apache.druid.msq.input.external.ExternalInputSpec;
|
||||||
import org.apache.druid.msq.input.inline.InlineInputSpec;
|
import org.apache.druid.msq.input.inline.InlineInputSpec;
|
||||||
import org.apache.druid.msq.input.lookup.LookupInputSpec;
|
import org.apache.druid.msq.input.lookup.LookupInputSpec;
|
||||||
|
@ -134,7 +135,6 @@ public class DataSourcePlan
|
||||||
* @param minStageNumber starting stage number for subqueries
|
* @param minStageNumber starting stage number for subqueries
|
||||||
* @param broadcast whether the plan should broadcast data for this datasource
|
* @param broadcast whether the plan should broadcast data for this datasource
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("rawtypes")
|
|
||||||
public static DataSourcePlan forDataSource(
|
public static DataSourcePlan forDataSource(
|
||||||
final QueryKitSpec queryKitSpec,
|
final QueryKitSpec queryKitSpec,
|
||||||
final QueryContext queryContext,
|
final QueryContext queryContext,
|
||||||
|
@ -274,6 +274,20 @@ public class DataSourcePlan
|
||||||
return broadcastInputs;
|
return broadcastInputs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Figure for {@link StageDefinition#getMaxWorkerCount()} that should be used when processing.
|
||||||
|
*/
|
||||||
|
public int getMaxWorkerCount(final QueryKitSpec queryKitSpec)
|
||||||
|
{
|
||||||
|
if (isSingleWorker()) {
|
||||||
|
return 1;
|
||||||
|
} else if (InputSpecs.hasLeafInputs(inputSpecs, broadcastInputs)) {
|
||||||
|
return queryKitSpec.getMaxLeafWorkerCount();
|
||||||
|
} else {
|
||||||
|
return queryKitSpec.getMaxNonLeafWorkerCount();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link QueryDefinitionBuilder} that includes any {@link StageInputSpec} from {@link #getInputSpecs()}.
|
* Returns a {@link QueryDefinitionBuilder} that includes any {@link StageInputSpec} from {@link #getInputSpecs()}.
|
||||||
* Absent if this plan does not involve reading from prior stages.
|
* Absent if this plan does not involve reading from prior stages.
|
||||||
|
|
|
@ -19,13 +19,11 @@
|
||||||
|
|
||||||
package org.apache.druid.msq.querykit;
|
package org.apache.druid.msq.querykit;
|
||||||
|
|
||||||
import org.apache.druid.msq.input.InputSpec;
|
|
||||||
import org.apache.druid.msq.input.InputSpecs;
|
import org.apache.druid.msq.input.InputSpecs;
|
||||||
import org.apache.druid.msq.kernel.QueryDefinition;
|
import org.apache.druid.msq.kernel.QueryDefinition;
|
||||||
|
import org.apache.druid.msq.kernel.StageDefinition;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collection of parameters for {@link QueryKit#makeQueryDefinition}.
|
* Collection of parameters for {@link QueryKit#makeQueryDefinition}.
|
||||||
*/
|
*/
|
||||||
|
@ -42,9 +40,9 @@ public class QueryKitSpec
|
||||||
* {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
|
* {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
|
||||||
* @param queryId queryId of the resulting {@link QueryDefinition}
|
* @param queryId queryId of the resulting {@link QueryDefinition}
|
||||||
* @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes
|
* @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes
|
||||||
* {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}
|
* {@link StageDefinition#getMaxWorkerCount()}
|
||||||
* @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes
|
* @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes
|
||||||
* {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}
|
* {@link StageDefinition#getMaxWorkerCount()}
|
||||||
* @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries
|
* @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries
|
||||||
*/
|
*/
|
||||||
public QueryKitSpec(
|
public QueryKitSpec(
|
||||||
|
@ -79,20 +77,15 @@ public class QueryKitSpec
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maximum worker count for a stage with the given inputs. Will use {@link #maxNonLeafWorkerCount} if there are
|
* Maximum number of workers for leaf stages. See {@link InputSpecs#hasLeafInputs}.
|
||||||
* any stage inputs, {@link #maxLeafWorkerCount} otherwise.
|
|
||||||
*/
|
*/
|
||||||
public int getMaxWorkerCount(final List<InputSpec> inputSpecs)
|
public int getMaxLeafWorkerCount()
|
||||||
{
|
{
|
||||||
if (InputSpecs.getStageNumbers(inputSpecs).isEmpty()) {
|
|
||||||
return maxLeafWorkerCount;
|
return maxLeafWorkerCount;
|
||||||
} else {
|
|
||||||
return maxNonLeafWorkerCount;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maximum number of workers for non-leaf stages (where there are some stage inputs).
|
* Maximum number of workers for non-leaf stages. See {@link InputSpecs#hasLeafInputs}.
|
||||||
*/
|
*/
|
||||||
public int getMaxNonLeafWorkerCount()
|
public int getMaxNonLeafWorkerCount()
|
||||||
{
|
{
|
||||||
|
|
|
@ -163,10 +163,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
|
||||||
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
|
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
|
||||||
.signature(intermediateSignature)
|
.signature(intermediateSignature)
|
||||||
.shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true))
|
.shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true))
|
||||||
.maxWorkerCount(
|
.maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec))
|
||||||
dataSourcePlan.isSingleWorker()
|
|
||||||
? 1
|
|
||||||
: queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs()))
|
|
||||||
.processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun))
|
.processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -173,10 +173,7 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
|
||||||
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
|
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
|
||||||
.shuffleSpec(scanShuffleSpec)
|
.shuffleSpec(scanShuffleSpec)
|
||||||
.signature(signatureToUse)
|
.signature(signatureToUse)
|
||||||
.maxWorkerCount(
|
.maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec))
|
||||||
dataSourcePlan.isSingleWorker()
|
|
||||||
? 1
|
|
||||||
: queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs()))
|
|
||||||
.processorFactory(new ScanQueryFrameProcessorFactory(queryToRun))
|
.processorFactory(new ScanQueryFrameProcessorFactory(queryToRun))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.msq.dart.controller;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.druid.client.BrokerServerView;
|
||||||
|
import org.apache.druid.msq.dart.worker.WorkerId;
|
||||||
|
import org.apache.druid.msq.exec.MemoryIntrospector;
|
||||||
|
import org.apache.druid.msq.exec.MemoryIntrospectorImpl;
|
||||||
|
import org.apache.druid.msq.indexing.MSQSpec;
|
||||||
|
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
|
||||||
|
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
|
||||||
|
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||||
|
import org.apache.druid.query.Query;
|
||||||
|
import org.apache.druid.query.QueryContext;
|
||||||
|
import org.apache.druid.server.DruidNode;
|
||||||
|
import org.apache.druid.server.coordination.DruidServerMetadata;
|
||||||
|
import org.apache.druid.server.coordination.ServerType;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class DartControllerContextTest
|
||||||
|
{
|
||||||
|
private static final List<DruidServerMetadata> SERVERS = ImmutableList.of(
|
||||||
|
new DruidServerMetadata("no", "localhost:1001", null, 1, ServerType.HISTORICAL, "__default", 2), // plaintext
|
||||||
|
new DruidServerMetadata("no", null, "localhost:1002", 1, ServerType.HISTORICAL, "__default", 1), // TLS
|
||||||
|
new DruidServerMetadata("no", "localhost:1003", null, 1, ServerType.REALTIME, "__default", 0)
|
||||||
|
);
|
||||||
|
private static final DruidNode SELF_NODE = new DruidNode("none", "localhost", false, 8080, -1, true, false);
|
||||||
|
private static final String QUERY_ID = "abc";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context returned by {@link #query}. Overrides "maxConcurrentStages".
|
||||||
|
*/
|
||||||
|
private QueryContext queryContext =
|
||||||
|
QueryContext.of(ImmutableMap.of(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, 3));
|
||||||
|
private MemoryIntrospector memoryIntrospector;
|
||||||
|
private AutoCloseable mockCloser;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server view that returns {@link #SERVERS}.
|
||||||
|
*/
|
||||||
|
@Mock
|
||||||
|
private BrokerServerView serverView;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query spec that exists mainly to test {@link DartControllerContext#queryKernelConfig}.
|
||||||
|
*/
|
||||||
|
@Mock
|
||||||
|
private MSQSpec querySpec;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query returned by {@link #querySpec}.
|
||||||
|
*/
|
||||||
|
@Mock
|
||||||
|
private Query query;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
mockCloser = MockitoAnnotations.openMocks(this);
|
||||||
|
memoryIntrospector = new MemoryIntrospectorImpl(100_000_000, 0.75, 1, 1, null);
|
||||||
|
Mockito.when(serverView.getDruidServerMetadatas()).thenReturn(SERVERS);
|
||||||
|
Mockito.when(querySpec.getQuery()).thenReturn(query);
|
||||||
|
Mockito.when(querySpec.getDestination()).thenReturn(TaskReportMSQDestination.instance());
|
||||||
|
Mockito.when(query.context()).thenReturn(queryContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
mockCloser.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_queryKernelConfig()
|
||||||
|
{
|
||||||
|
final DartControllerContext controllerContext =
|
||||||
|
new DartControllerContext(null, null, SELF_NODE, null, memoryIntrospector, serverView, null);
|
||||||
|
final ControllerQueryKernelConfig queryKernelConfig = controllerContext.queryKernelConfig(QUERY_ID, querySpec);
|
||||||
|
|
||||||
|
Assertions.assertFalse(queryKernelConfig.isFaultTolerant());
|
||||||
|
Assertions.assertFalse(queryKernelConfig.isDurableStorage());
|
||||||
|
Assertions.assertEquals(3, queryKernelConfig.getMaxConcurrentStages());
|
||||||
|
Assertions.assertEquals(TaskReportMSQDestination.instance(), queryKernelConfig.getDestination());
|
||||||
|
Assertions.assertTrue(queryKernelConfig.isPipeline());
|
||||||
|
|
||||||
|
// Check workerIds after sorting, because they've been shuffled.
|
||||||
|
Assertions.assertEquals(
|
||||||
|
ImmutableList.of(
|
||||||
|
// Only the HISTORICAL servers
|
||||||
|
WorkerId.fromDruidServerMetadata(SERVERS.get(0), QUERY_ID).toString(),
|
||||||
|
WorkerId.fromDruidServerMetadata(SERVERS.get(1), QUERY_ID).toString()
|
||||||
|
),
|
||||||
|
queryKernelConfig.getWorkerIds().stream().sorted().collect(Collectors.toList())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,7 +21,10 @@ package org.apache.druid.msq.input;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import it.unimi.dsi.fastutil.ints.IntSet;
|
||||||
|
import it.unimi.dsi.fastutil.ints.IntSets;
|
||||||
import org.apache.druid.msq.input.stage.StageInputSpec;
|
import org.apache.druid.msq.input.stage.StageInputSpec;
|
||||||
|
import org.apache.druid.msq.input.table.TableInputSpec;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -40,4 +43,85 @@ public class InputSpecsTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_getHasLeafInputs_allStages()
|
||||||
|
{
|
||||||
|
Assert.assertFalse(
|
||||||
|
InputSpecs.hasLeafInputs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new StageInputSpec(1),
|
||||||
|
new StageInputSpec(2)
|
||||||
|
),
|
||||||
|
IntSets.emptySet()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_getHasLeafInputs_broadcastTable()
|
||||||
|
{
|
||||||
|
Assert.assertFalse(
|
||||||
|
InputSpecs.hasLeafInputs(
|
||||||
|
ImmutableList.of(new TableInputSpec("tbl", null, null, null)),
|
||||||
|
IntSet.of(0)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_getHasLeafInputs_oneTableOneStage()
|
||||||
|
{
|
||||||
|
Assert.assertTrue(
|
||||||
|
InputSpecs.hasLeafInputs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new TableInputSpec("tbl", null, null, null),
|
||||||
|
new StageInputSpec(0)
|
||||||
|
),
|
||||||
|
IntSets.emptySet()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_getHasLeafInputs_oneTableOneBroadcastStage()
|
||||||
|
{
|
||||||
|
Assert.assertTrue(
|
||||||
|
InputSpecs.hasLeafInputs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new TableInputSpec("tbl", null, null, null),
|
||||||
|
new StageInputSpec(0)
|
||||||
|
),
|
||||||
|
IntSet.of(1)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_getHasLeafInputs_oneBroadcastTableOneStage()
|
||||||
|
{
|
||||||
|
Assert.assertFalse(
|
||||||
|
InputSpecs.hasLeafInputs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new TableInputSpec("tbl", null, null, null),
|
||||||
|
new StageInputSpec(0)
|
||||||
|
),
|
||||||
|
IntSet.of(0)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_getHasLeafInputs_oneTableOneBroadcastTable()
|
||||||
|
{
|
||||||
|
Assert.assertTrue(
|
||||||
|
InputSpecs.hasLeafInputs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new TableInputSpec("tbl", null, null, null),
|
||||||
|
new TableInputSpec("tbl2", null, null, null)
|
||||||
|
),
|
||||||
|
IntSet.of(1)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue