MSQ: Change default clusterStatisticsMergeMode to SEQUENTIAL. (#14310)

* MSQ: Change default clusterStatisticsMergeMode to SEQUENTIAL.

This is an undocumented parameter that controls how cluster-by statistics
are merged. In PARALLEL mode, statistics are gathered from workers all
at once. In SEQUENTIAL mode, statistics are gathered time chunk by time
chunk. This improves accuracy for jobs with many time chunks, and reduces
memory usage.

The main downside of SEQUENTIAL is that it can take longer, but in most
situations I've seen, PARALLEL is only really usable in cases where the
sketches are small enough that SEQUENTIAL would also run relatively
quickly. So it seems like SEQUENTIAL is a better default.

* Switch off-test from SEQUENTIAL to PARALLEL.

* Fix sequential merge for situations where there are no time chunks at all.

* Add a couple more tests.
This commit is contained in:
Gian Merlino 2023-06-26 10:54:28 -07:00 committed by GitHub
parent b7434be99e
commit 8211379de6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 89 additions and 8 deletions

View File

@ -244,6 +244,24 @@ public class WorkerSketchFetcher implements AutoCloseable
throw new ISE("All worker partial key information not received for stage[%d]", stageId.getStageNumber());
}
if (completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().isEmpty()) {
// No time chunks at all: skip fetching.
kernelActions.accept(
kernel -> {
for (final String taskId : tasks) {
final int workerNumber = MSQTasks.workerFromTaskId(taskId);
kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
stageId,
workerNumber,
ClusterByStatisticsSnapshot.empty()
);
}
}
);
return;
}
completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().forEach((timeChunk, wks) -> {
for (String taskId : tasks) {

View File

@ -62,7 +62,7 @@ import java.util.stream.Collectors;
*
* <li><b>clusterStatisticsMergeMode</b>: Whether to use parallel or sequential mode for merging of the worker sketches.
* Can be <b>PARALLEL</b>, <b>SEQUENTIAL</b> or <b>AUTO</b>. See {@link ClusterStatisticsMergeMode} for more information on each mode.
* Default value is <b>PARALLEL</b></li>
* Default value is <b>SEQUENTIAL</b></li>
*
* <li><b>useAutoColumnSchemas</b>: Temporary flag to allow experimentation using
* {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation,
@ -93,7 +93,7 @@ public class MultiStageQueryContext
public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = "maxInputBytesPerWorker";
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode";
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.PARALLEL.toString();
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.SEQUENTIAL.toString();
public static final String CTX_DESTINATION = "destination";
private static final String DEFAULT_DESTINATION = null;

View File

@ -81,7 +81,7 @@ public class MSQInsertTest extends MSQTestBase
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{SEQUENTIAL_MERGE, SEQUENTIAL_MERGE_MSQ_CONTEXT}
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
};
return Arrays.asList(data);
}

View File

@ -61,7 +61,7 @@ public class MSQReplaceTest extends MSQTestBase
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{SEQUENTIAL_MERGE, SEQUENTIAL_MERGE_MSQ_CONTEXT}
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
};
return Arrays.asList(data);
}

View File

@ -101,7 +101,7 @@ public class MSQSelectTest extends MSQTestBase
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{SEQUENTIAL_MERGE, SEQUENTIAL_MERGE_MSQ_CONTEXT}
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
};
return Arrays.asList(data);
}
@ -354,6 +354,69 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
@Test
public void testSelectOnFooWhereMatchesNoData()
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();
testSelectQuery()
.setSql("select cnt,dim1 from foo where dim2 = 'nonexistent'")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("cnt", "dim1")
.filters(selector("dim2", "nonexistent", null))
.context(defaultScanQueryContext(context, resultSignature))
.build()
)
.columnMappings(ColumnMappings.identity(resultSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build()
)
.setQueryContext(context)
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
}
@Test
public void testSelectAndOrderByOnFooWhereMatchesNoData()
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();
testSelectQuery()
.setSql("select cnt,dim1 from foo where dim2 = 'nonexistent' order by dim1")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("cnt", "dim1")
.filters(selector("dim2", "nonexistent", null))
.context(defaultScanQueryContext(context, resultSignature))
.orderBy(ImmutableList.of(new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING)))
.build()
)
.columnMappings(ColumnMappings.identity(resultSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build()
)
.setQueryContext(context)
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
}
@Test
public void testGroupByOnFoo()
{

View File

@ -241,12 +241,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(MultiStageQueryContext.CTX_FAULT_TOLERANCE, true)
.build();
public static final Map<String, Object> SEQUENTIAL_MERGE_MSQ_CONTEXT =
public static final Map<String, Object> PARALLEL_MERGE_MSQ_CONTEXT =
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
ClusterStatisticsMergeMode.SEQUENTIAL.toString()
ClusterStatisticsMergeMode.PARALLEL.toString()
)
.build();
@ -259,7 +259,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
public static final String FAULT_TOLERANCE = "fault_tolerance";
public static final String DURABLE_STORAGE = "durable_storage";
public static final String DEFAULT = "default";
public static final String SEQUENTIAL_MERGE = "sequential_merge";
public static final String PARALLEL_MERGE = "parallel_merge";
public final boolean useDefault = NullHandling.replaceWithDefault();