diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
new file mode 100644
index 00000000000..920bceb952b
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.discovery.DataServerClient;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
+import org.apache.druid.msq.input.table.DataServerSelector;
+import org.apache.druid.msq.input.table.RichSegmentDescriptor;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Queries;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.MetricManipulationFn;
+import org.apache.druid.query.aggregation.MetricManipulatorFns;
+import org.apache.druid.query.context.DefaultResponseContext;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.rpc.RpcException;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Class responsible for querying dataservers and retriving results for a given query. Also queries the coordinator
+ * to check if a segment has been handed off.
+ */
+public class DataServerQueryHandler
+{
+ private static final Logger log = new Logger(DataServerQueryHandler.class);
+ private static final int DEFAULT_NUM_TRIES = 3;
+ private static final int PER_SERVER_QUERY_NUM_TRIES = 5;
+ private final String dataSource;
+ private final ChannelCounters channelCounters;
+ private final ServiceClientFactory serviceClientFactory;
+ private final CoordinatorClient coordinatorClient;
+ private final ObjectMapper objectMapper;
+ private final QueryToolChestWarehouse warehouse;
+ private final ScheduledExecutorService queryCancellationExecutor;
+ private final DataServerRequestDescriptor dataServerRequestDescriptor;
+
+ public DataServerQueryHandler(
+ String dataSource,
+ ChannelCounters channelCounters,
+ ServiceClientFactory serviceClientFactory,
+ CoordinatorClient coordinatorClient,
+ ObjectMapper objectMapper,
+ QueryToolChestWarehouse warehouse,
+ ScheduledExecutorService queryCancellationExecutor,
+ DataServerRequestDescriptor dataServerRequestDescriptor
+ )
+ {
+ this.dataSource = dataSource;
+ this.channelCounters = channelCounters;
+ this.serviceClientFactory = serviceClientFactory;
+ this.coordinatorClient = coordinatorClient;
+ this.objectMapper = objectMapper;
+ this.warehouse = warehouse;
+ this.queryCancellationExecutor = queryCancellationExecutor;
+ this.dataServerRequestDescriptor = dataServerRequestDescriptor;
+ }
+
+ @VisibleForTesting
+ DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
+ {
+ return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper, queryCancellationExecutor);
+ }
+
+ /**
+ * Performs some necessary transforms to the query, so that the dataserver is able to understand it first.
+ * - Changing the datasource to a {@link TableDataSource}
+ * - Limiting the query to the required segments with {@link Queries#withSpecificSegments(Query, List)}
+ *
+ * Then queries a data server and returns a {@link Yielder} for the results, retrying if needed. If a dataserver
+ * indicates that some segments were not found, checks with the coordinator to see if the segment was handed off.
+ * - If all the segments were handed off, returns a {@link DataServerQueryResult} with the yielder and list of handed
+ * off segments.
+ * - If some segments were not handed off, checks with the coordinator fetch an updated list of servers. This step is
+ * repeated up to {@link #DEFAULT_NUM_TRIES} times.
+ * - If the servers could not be found, checks if the segment was handed-off. If it was, returns a
+ * {@link DataServerQueryResult} with the yielder and list of handed off segments. Otherwise, throws an exception.
+ *
+ * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, MetricManipulationFn)} and reports channel
+ * metrics on the returned results.
+ *
+ * @param result return type for the query from the data server
+ * @param type of the result rows after parsing from QueryType object
+ */
+ public DataServerQueryResult fetchRowsFromDataServer(
+ Query query,
+ Function, Sequence> mappingFunction,
+ Closer closer
+ )
+ {
+ // MSQ changes the datasource to a number datasource. This needs to be changed back for data servers to understand.
+ final Query preparedQuery = query.withDataSource(new TableDataSource(dataSource));
+ final List> yielders = new ArrayList<>();
+ final List handedOffSegments = new ArrayList<>();
+
+ List pendingRequests = ImmutableList.of(dataServerRequestDescriptor);
+
+ final int maxRetries = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES);
+ int retryCount = 0;
+
+ while (!pendingRequests.isEmpty()) {
+ final ResponseContext responseContext = new DefaultResponseContext();
+ final Set processedSegments = new HashSet<>();
+ for (DataServerRequestDescriptor descriptor : pendingRequests) {
+ log.info("Querying server [%s] for segments[%s]", descriptor.getServerMetadata(), descriptor.getSegments());
+ processedSegments.addAll(descriptor.getSegments());
+ Yielder yielder = fetchRowsFromDataServerInternal(descriptor, responseContext, closer, preparedQuery, mappingFunction);
+
+ // Add results
+ if (yielder != null && !yielder.isDone()) {
+ yielders.add(yielder);
+ }
+ }
+
+ // Check for missing segments
+ List missingSegments = getMissingSegments(responseContext);
+ if (missingSegments.isEmpty()) {
+ // No segments remaining.
+ break;
+ }
+
+ final List handedOffSegmentDescriptors = checkSegmentHandoff(missingSegments);
+
+ Set missingRichSegmentDescriptors = new HashSet<>();
+ for (RichSegmentDescriptor richSegmentDescriptor : processedSegments) {
+ SegmentDescriptor segmentDescriptor = toSegmentDescriptorWithFullInterval(richSegmentDescriptor);
+ if (missingSegments.contains(segmentDescriptor)) {
+ if (handedOffSegmentDescriptors.contains(segmentDescriptor)) {
+ handedOffSegments.add(richSegmentDescriptor);
+ } else {
+ missingRichSegmentDescriptors.add(richSegmentDescriptor);
+ }
+ }
+ }
+
+ pendingRequests = createNextPendingRequests(
+ missingRichSegmentDescriptors,
+ MultiStageQueryContext.getSegmentSources(query.context()),
+ DataServerSelector.RANDOM
+ );
+
+ if (!pendingRequests.isEmpty()) {
+ retryCount++;
+ if (retryCount > maxRetries) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build("Unable to fetch results from dataservers in [%d] retries.", retryCount);
+ }
+ }
+ }
+
+ return new DataServerQueryResult<>(yielders, handedOffSegments, dataSource);
+ }
+
+ private Yielder fetchRowsFromDataServerInternal(
+ final DataServerRequestDescriptor requestDescriptor,
+ final ResponseContext responseContext,
+ final Closer closer,
+ final Query query,
+ final Function, Sequence> mappingFunction
+ )
+ {
+ final ServiceLocation serviceLocation = ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
+ final DataServerClient dataServerClient = makeDataServerClient(serviceLocation);
+ final QueryToolChest> toolChest = warehouse.getToolChest(query);
+ final Function preComputeManipulatorFn =
+ toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing());
+ final JavaType queryResultType = toolChest.getBaseResultType();
+ final List segmentDescriptors = requestDescriptor.getSegments()
+ .stream()
+ .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
+ .collect(Collectors.toList());
+
+ try {
+ return RetryUtils.retry(
+ () -> closer.register(createYielder(
+ dataServerClient.run(
+ Queries.withSpecificSegments(
+ query,
+ requestDescriptor.getSegments()
+ .stream()
+ .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
+ .collect(Collectors.toList())
+ ), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)),
+ throwable -> !(throwable instanceof QueryInterruptedException
+ && throwable.getCause() instanceof InterruptedException),
+ PER_SERVER_QUERY_NUM_TRIES
+ );
+ }
+ catch (QueryInterruptedException e) {
+ if (e.getCause() instanceof RpcException) {
+ // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down),
+ // we would also be unable to fetch the segment.
+ responseContext.addMissingSegments(segmentDescriptors);
+ return Yielders.each(Sequences.empty());
+ } else {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation);
+ }
+ }
+ catch (Exception e) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation);
+ }
+ }
+
+ private Yielder createYielder(
+ final Sequence sequence,
+ final Function, Sequence> mappingFunction
+ )
+ {
+ return Yielders.each(
+ mappingFunction.apply(sequence)
+ .map(row -> {
+ channelCounters.incrementRowCount();
+ return row;
+ })
+ );
+ }
+
+ private List createNextPendingRequests(
+ final Set richSegmentDescriptors,
+ final SegmentSource includeSegmentSource,
+ final DataServerSelector dataServerSelector
+ )
+ {
+ final Map> serverVsSegmentsMap = new HashMap<>();
+
+ Iterable immutableSegmentLoadInfos =
+ coordinatorClient.fetchServerViewSegments(
+ dataSource,
+ richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList())
+ );
+
+ Map segmentVsServerMap = new HashMap<>();
+ immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> {
+ segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo);
+ });
+
+ for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) {
+ SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(richSegmentDescriptor);
+ if (!segmentVsServerMap.containsKey(segmentDescriptorWithFullInterval)) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build("Could not find a server for segment[%s]", richSegmentDescriptor);
+ }
+
+ ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(segmentDescriptorWithFullInterval);
+ if (segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval)) {
+ Set servers = segmentLoadInfo.getServers()
+ .stream()
+ .filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes()
+ .contains(druidServerMetadata.getType()))
+ .collect(Collectors.toSet());
+ if (servers.isEmpty()) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build("Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", includeSegmentSource, richSegmentDescriptor, servers);
+ }
+
+ DruidServerMetadata druidServerMetadata = dataServerSelector.getSelectServerFunction().apply(servers);
+ serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>());
+ SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor();
+ serverVsSegmentsMap.get(druidServerMetadata)
+ .add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber()));
+ }
+ }
+
+ final List requestDescriptors = new ArrayList<>();
+ for (Map.Entry> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) {
+ DataServerRequestDescriptor dataServerRequest = new DataServerRequestDescriptor(
+ druidServerMetadataSetEntry.getKey(),
+ ImmutableList.copyOf(druidServerMetadataSetEntry.getValue())
+ );
+ requestDescriptors.add(dataServerRequest);
+ }
+
+ return requestDescriptors;
+ }
+
+ /**
+ * Retreives the list of missing segments from the response context.
+ */
+ private static List getMissingSegments(final ResponseContext responseContext)
+ {
+ List missingSegments = responseContext.getMissingSegments();
+ if (missingSegments == null) {
+ return ImmutableList.of();
+ }
+ return missingSegments;
+ }
+
+ /**
+ * Queries the coordinator to check if a list of segments has been handed off.
+ * Returns a list of segments which have been handed off.
+ *
+ * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)}
+ */
+ private List checkSegmentHandoff(List segmentDescriptors)
+ {
+ try {
+ List handedOffSegments = new ArrayList<>();
+
+ for (SegmentDescriptor segmentDescriptor : segmentDescriptors) {
+ Boolean wasHandedOff = FutureUtils.get(
+ coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor),
+ true
+ );
+ if (Boolean.TRUE.equals(wasHandedOff)) {
+ handedOffSegments.add(segmentDescriptor);
+ }
+ }
+ return handedOffSegments;
+ }
+ catch (Exception e) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Could not contact coordinator");
+ }
+ }
+
+ static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor)
+ {
+ return new SegmentDescriptor(
+ richSegmentDescriptor.getFullInterval(),
+ richSegmentDescriptor.getVersion(),
+ richSegmentDescriptor.getPartitionNumber()
+ );
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProviderFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
similarity index 82%
rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProviderFactory.java
rename to extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
index 48ed57be870..1caed919ef0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProviderFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
@@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.rpc.ServiceClientFactory;
@@ -33,11 +34,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * Creates new instances of {@link LoadedSegmentDataProvider} and manages the cancellation threadpool.
+ * Creates new instances of {@link DataServerQueryHandler} and manages the cancellation threadpool.
*/
-public class LoadedSegmentDataProviderFactory implements Closeable
+public class DataServerQueryHandlerFactory implements Closeable
{
- private static final Logger log = new Logger(LoadedSegmentDataProviderFactory.class);
+ private static final Logger log = new Logger(DataServerQueryHandlerFactory.class);
private static final int DEFAULT_THREAD_COUNT = 4;
private final CoordinatorClient coordinatorClient;
private final ServiceClientFactory serviceClientFactory;
@@ -45,7 +46,7 @@ public class LoadedSegmentDataProviderFactory implements Closeable
private final QueryToolChestWarehouse warehouse;
private final ScheduledExecutorService queryCancellationExecutor;
- public LoadedSegmentDataProviderFactory(
+ public DataServerQueryHandlerFactory(
CoordinatorClient coordinatorClient,
ServiceClientFactory serviceClientFactory,
ObjectMapper objectMapper,
@@ -59,19 +60,21 @@ public class LoadedSegmentDataProviderFactory implements Closeable
this.queryCancellationExecutor = ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor");
}
- public LoadedSegmentDataProvider createLoadedSegmentDataProvider(
+ public DataServerQueryHandler createDataServerQueryHandler(
String dataSource,
- ChannelCounters channelCounters
+ ChannelCounters channelCounters,
+ DataServerRequestDescriptor dataServerRequestDescriptor
)
{
- return new LoadedSegmentDataProvider(
+ return new DataServerQueryHandler(
dataSource,
channelCounters,
serviceClientFactory,
coordinatorClient,
objectMapper,
warehouse,
- queryCancellationExecutor
+ queryCancellationExecutor,
+ dataServerRequestDescriptor
);
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryResult.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryResult.java
new file mode 100644
index 00000000000..44d2fbaf799
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryResult.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.msq.input.table.RichSegmentDescriptor;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
+
+import java.util.List;
+
+/**
+ * Contains the results for a query to a dataserver. {@link #resultsYielders} contains the results fetched and
+ * {@link #segmentsInputSlice} is an {@link SegmentsInputSlice} containing the segments which have already been handed
+ * off, so that it can be fetched from deep storage.
+ */
+public class DataServerQueryResult
+{
+
+ private final List> resultsYielders;
+
+ private final SegmentsInputSlice segmentsInputSlice;
+
+ public DataServerQueryResult(
+ List> resultsYielders,
+ List handedOffSegments,
+ String dataSource
+ )
+ {
+ this.resultsYielders = resultsYielders;
+ this.segmentsInputSlice = new SegmentsInputSlice(dataSource, handedOffSegments, ImmutableList.of());
+ }
+
+ public List> getResultsYielders()
+ {
+ return resultsYielders;
+ }
+
+ public SegmentsInputSlice getHandedOffSegments()
+ {
+ return segmentsInputSlice;
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
deleted file mode 100644
index d9d789e3d2b..00000000000
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.exec;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.discovery.DataServerClient;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.IOE;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.RetryUtils;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.counters.ChannelCounters;
-import org.apache.druid.msq.input.table.RichSegmentDescriptor;
-import org.apache.druid.query.Queries;
-import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
-import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.aggregation.MetricManipulationFn;
-import org.apache.druid.query.aggregation.MetricManipulatorFns;
-import org.apache.druid.query.context.DefaultResponseContext;
-import org.apache.druid.query.context.ResponseContext;
-import org.apache.druid.rpc.FixedSetServiceLocator;
-import org.apache.druid.rpc.RpcException;
-import org.apache.druid.rpc.ServiceClientFactory;
-import org.apache.druid.rpc.ServiceLocation;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.utils.CollectionUtils;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Function;
-
-/**
- * Class responsible for querying dataservers and retriving results for a given query. Also queries the coordinator
- * to check if a segment has been handed off.
- */
-public class LoadedSegmentDataProvider
-{
- private static final Logger log = new Logger(LoadedSegmentDataProvider.class);
- private static final int DEFAULT_NUM_TRIES = 5;
- private final String dataSource;
- private final ChannelCounters channelCounters;
- private final ServiceClientFactory serviceClientFactory;
- private final CoordinatorClient coordinatorClient;
- private final ObjectMapper objectMapper;
- private final QueryToolChestWarehouse warehouse;
- private final ScheduledExecutorService queryCancellationExecutor;
-
- public LoadedSegmentDataProvider(
- String dataSource,
- ChannelCounters channelCounters,
- ServiceClientFactory serviceClientFactory,
- CoordinatorClient coordinatorClient,
- ObjectMapper objectMapper,
- QueryToolChestWarehouse warehouse,
- ScheduledExecutorService queryCancellationExecutor
- )
- {
- this.dataSource = dataSource;
- this.channelCounters = channelCounters;
- this.serviceClientFactory = serviceClientFactory;
- this.coordinatorClient = coordinatorClient;
- this.objectMapper = objectMapper;
- this.warehouse = warehouse;
- this.queryCancellationExecutor = queryCancellationExecutor;
- }
-
- @VisibleForTesting
- DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
- {
- return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper, queryCancellationExecutor);
- }
-
- /**
- * Performs some necessary transforms to the query, so that the dataserver is able to understand it first.
- * - Changing the datasource to a {@link TableDataSource}
- * - Limiting the query to a single required segment with {@link Queries#withSpecificSegments(Query, List)}
- *
- * Then queries a data server and returns a {@link Yielder} for the results, retrying if needed. If a dataserver
- * indicates that the segment was not found, checks with the coordinator to see if the segment was handed off.
- * - If the segment was handed off, returns with a {@link DataServerQueryStatus#HANDOFF} status.
- * - If the segment was not handed off, retries with the known list of servers and throws an exception if the retry
- * count is exceeded.
- * - If the servers could not be found, checks if the segment was handed-off. If it was, returns with a
- * {@link DataServerQueryStatus#HANDOFF} status. Otherwise, throws an exception.
- *
- * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, MetricManipulationFn)} and reports channel
- * metrics on the returned results.
- *
- * @param result return type for the query from the data server
- * @param type of the result rows after parsing from QueryType object
- */
- public Pair> fetchRowsFromDataServer(
- Query query,
- RichSegmentDescriptor segmentDescriptor,
- Function, Sequence> mappingFunction,
- Closer closer
- ) throws IOException
- {
- final Query preparedQuery = Queries.withSpecificSegments(
- query.withDataSource(new TableDataSource(dataSource)),
- ImmutableList.of(segmentDescriptor)
- );
-
- final Set servers = segmentDescriptor.getServers();
- final FixedSetServiceLocator fixedSetServiceLocator = FixedSetServiceLocator.forDruidServerMetadata(servers);
- final QueryToolChest> toolChest = warehouse.getToolChest(query);
- final Function preComputeManipulatorFn =
- toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing());
-
- final JavaType queryResultType = toolChest.getBaseResultType();
- final int numRetriesOnMissingSegments = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES);
-
- log.debug("Querying severs[%s] for segment[%s], retries:[%d]", servers, segmentDescriptor, numRetriesOnMissingSegments);
- final ResponseContext responseContext = new DefaultResponseContext();
-
- Pair> statusSequencePair;
- try {
- // We need to check for handoff to decide if we need to retry. Therefore, we handle it here instead of inside
- // the client.
- statusSequencePair = RetryUtils.retry(
- () -> {
- ServiceLocation serviceLocation = CollectionUtils.getOnlyElement(
- fixedSetServiceLocator.locate().get().getLocations(),
- serviceLocations -> {
- throw DruidException.defensive("Should only have one location");
- }
- );
- DataServerClient dataServerClient = makeDataServerClient(serviceLocation);
- Sequence sequence = dataServerClient.run(preparedQuery, responseContext, queryResultType, closer)
- .map(preComputeManipulatorFn);
- final List missingSegments = getMissingSegments(responseContext);
- // Only one segment is fetched, so this should be empty if it was fetched
- if (missingSegments.isEmpty()) {
- log.debug("Successfully fetched rows from server for segment[%s]", segmentDescriptor);
- // Segment was found
- Yielder yielder = closer.register(
- Yielders.each(mappingFunction.apply(sequence)
- .map(row -> {
- channelCounters.incrementRowCount();
- return row;
- }))
- );
- return Pair.of(DataServerQueryStatus.SUCCESS, yielder);
- } else {
- Boolean wasHandedOff = checkSegmentHandoff(coordinatorClient, dataSource, segmentDescriptor);
- if (Boolean.TRUE.equals(wasHandedOff)) {
- log.debug("Segment[%s] was handed off.", segmentDescriptor);
- return Pair.of(DataServerQueryStatus.HANDOFF, null);
- } else {
- log.error("Segment[%s] could not be found on data server, but segment was not handed off.", segmentDescriptor);
- throw new IOE(
- "Segment[%s] could not be found on data server, but segment was not handed off.",
- segmentDescriptor
- );
- }
- }
- },
- throwable -> !(throwable instanceof QueryInterruptedException && throwable.getCause() instanceof InterruptedException),
- numRetriesOnMissingSegments
- );
-
- return statusSequencePair;
- }
- catch (QueryInterruptedException e) {
- if (e.getCause() instanceof RpcException) {
- // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down),
- // we would also be unable to fetch the segment. Check if the segment was handed off, just in case, instead of
- // failing the query.
- boolean wasHandedOff = checkSegmentHandoff(coordinatorClient, dataSource, segmentDescriptor);
- if (wasHandedOff) {
- log.debug("Segment[%s] was handed off.", segmentDescriptor);
- return Pair.of(DataServerQueryStatus.HANDOFF, null);
- }
- }
- throw DruidException.forPersona(DruidException.Persona.OPERATOR)
- .ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(e, "Exception while fetching rows for query from dataservers[%s]", servers);
- }
- catch (Exception e) {
- Throwables.propagateIfPossible(e, IOE.class);
- throw DruidException.forPersona(DruidException.Persona.OPERATOR)
- .ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(e, "Exception while fetching rows for query from dataservers[%s]", servers);
- }
- }
-
- /**
- * Retreives the list of missing segments from the response context.
- */
- private static List getMissingSegments(final ResponseContext responseContext)
- {
- List missingSegments = responseContext.getMissingSegments();
- if (missingSegments == null) {
- return ImmutableList.of();
- }
- return missingSegments;
- }
-
- /**
- * Queries the coordinator to check if a segment has been handed off.
- *
- * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)}
- */
- private static boolean checkSegmentHandoff(
- CoordinatorClient coordinatorClient,
- String dataSource,
- SegmentDescriptor segmentDescriptor
- ) throws IOE
- {
- Boolean wasHandedOff;
- try {
- wasHandedOff = FutureUtils.get(coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor), true);
- }
- catch (Exception e) {
- throw new IOE(e, "Could not contact coordinator for segment[%s]", segmentDescriptor);
- }
- return Boolean.TRUE.equals(wasHandedOff);
- }
-
- /**
- * Represents the status of fetching a segment from a data server
- */
- public enum DataServerQueryStatus
- {
- /**
- * Segment was found on the data server and fetched successfully.
- */
- SUCCESS,
- /**
- * Segment was not found on the realtime server as it has been handed off to a historical. Only returned while
- * querying a realtime server.
- */
- HANDOFF
- }
-}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index a3d4fde6c1a..f5e86039c23 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -74,7 +74,7 @@ public interface WorkerContext
DruidNode selfNode();
Bouncer processorBouncer();
- LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory();
+ DataServerQueryHandlerFactory dataServerQueryHandlerFactory();
default File tempDir(int stageNumber, String id)
{
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 6ee45bc158e..3f2ef39b5bf 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -295,7 +295,7 @@ public class WorkerImpl implements Worker
{
this.controllerClient = context.makeControllerClient(task.getControllerTaskId());
closer.register(controllerClient::close);
- closer.register(context.loadedSegmentDataProviderFactory());
+ closer.register(context.dataServerQueryHandlerFactory());
context.registerWorker(this, closer); // Uses controllerClient, so must be called after that is initialized
this.workerClient = new ExceptionWrappingWorkerClient(context.makeWorkerClient());
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
index d522a8a7d88..e0de5bdc27e 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
@@ -20,7 +20,7 @@
package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
+import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.DataSegmentProvider;
@@ -39,20 +39,20 @@ public class IndexerFrameContext implements FrameContext
private final IndexIO indexIO;
private final DataSegmentProvider dataSegmentProvider;
private final WorkerMemoryParameters memoryParameters;
- private final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory;
+ private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
public IndexerFrameContext(
IndexerWorkerContext context,
IndexIO indexIO,
DataSegmentProvider dataSegmentProvider,
- LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory,
+ DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
WorkerMemoryParameters memoryParameters
)
{
this.context = context;
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
- this.loadedSegmentDataProviderFactory = loadedSegmentDataProviderFactory;
+ this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
this.memoryParameters = memoryParameters;
}
@@ -81,9 +81,9 @@ public class IndexerFrameContext implements FrameContext
}
@Override
- public LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory()
+ public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
- return loadedSegmentDataProviderFactory;
+ return dataServerQueryHandlerFactory;
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 709b019891f..53cd6e942ea 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -34,7 +34,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.ControllerClient;
-import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
+import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.TaskDataSegmentProvider;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
@@ -71,7 +71,7 @@ public class IndexerWorkerContext implements WorkerContext
private final Injector injector;
private final IndexIO indexIO;
private final TaskDataSegmentProvider dataSegmentProvider;
- private final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory;
+ private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
private final ServiceClientFactory clientFactory;
@GuardedBy("this")
@@ -85,7 +85,7 @@ public class IndexerWorkerContext implements WorkerContext
final Injector injector,
final IndexIO indexIO,
final TaskDataSegmentProvider dataSegmentProvider,
- final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory,
+ final DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
final ServiceClientFactory clientFactory
)
{
@@ -93,7 +93,7 @@ public class IndexerWorkerContext implements WorkerContext
this.injector = injector;
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
- this.loadedSegmentDataProviderFactory = loadedSegmentDataProviderFactory;
+ this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
this.clientFactory = clientFactory;
}
@@ -117,7 +117,7 @@ public class IndexerWorkerContext implements WorkerContext
segmentCacheManager,
indexIO
),
- new LoadedSegmentDataProviderFactory(
+ new DataServerQueryHandlerFactory(
toolbox.getCoordinatorClient(),
serviceClientFactory,
smileMapper,
@@ -245,7 +245,7 @@ public class IndexerWorkerContext implements WorkerContext
this,
indexIO,
dataSegmentProvider,
- loadedSegmentDataProviderFactory,
+ dataServerQueryHandlerFactory,
WorkerMemoryParameters.createProductionInstanceForWorker(injector, queryDef, stageNumber)
);
}
@@ -269,9 +269,9 @@ public class IndexerWorkerContext implements WorkerContext
}
@Override
- public LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory()
+ public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
- return loadedSegmentDataProviderFactory;
+ return dataServerQueryHandlerFactory;
}
private synchronized OverlordClient makeOverlordClient()
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ReadableInput.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ReadableInput.java
index b125dcfe8fd..2e6975acbf2 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ReadableInput.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ReadableInput.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.kernel.StagePartition;
@@ -30,7 +31,7 @@ import javax.annotation.Nullable;
/**
* A single item of readable input. Generated by {@link InputSliceReader#attach} from an {@link InputSlice}.
- *
+ *
* Each item is either readable as a {@link org.apache.druid.segment.Segment} or as a {@link ReadableFrameChannel},
* but not both. Check {@link #hasSegment()} and {@link #hasChannel()} to see which one you have.
*/
@@ -39,6 +40,9 @@ public class ReadableInput
@Nullable
private final SegmentWithDescriptor segment;
+ @Nullable
+ private final DataServerQueryHandler dataServerQuery;
+
@Nullable
private final ReadableFrameChannel channel;
@@ -50,18 +54,20 @@ public class ReadableInput
private ReadableInput(
@Nullable SegmentWithDescriptor segment,
+ @Nullable DataServerQueryHandler dataServerQuery,
@Nullable ReadableFrameChannel channel,
@Nullable FrameReader frameReader,
@Nullable StagePartition stagePartition
)
{
this.segment = segment;
+ this.dataServerQuery = dataServerQuery;
this.channel = channel;
this.frameReader = frameReader;
this.stagePartition = stagePartition;
- if ((segment == null) == (channel == null)) {
- throw new ISE("Provide either 'segment' or 'channel'");
+ if ((segment == null) && (channel == null) && (dataServerQuery == null)) {
+ throw new ISE("Provide 'segment', 'dataServerQuery' or 'channel'");
}
}
@@ -72,7 +78,17 @@ public class ReadableInput
*/
public static ReadableInput segment(final SegmentWithDescriptor segment)
{
- return new ReadableInput(Preconditions.checkNotNull(segment, "segment"), null, null, null);
+ return new ReadableInput(Preconditions.checkNotNull(segment, "segment"), null, null, null, null);
+ }
+
+ /**
+ * Create an input associated with a query to a data server
+ *
+ * @param dataServerQueryHandler the data server query handler
+ */
+ public static ReadableInput dataServerQuery(final DataServerQueryHandler dataServerQueryHandler)
+ {
+ return new ReadableInput(null, Preconditions.checkNotNull(dataServerQueryHandler, "dataServerQuery"), null, null, null);
}
/**
@@ -90,6 +106,7 @@ public class ReadableInput
)
{
return new ReadableInput(
+ null,
null,
Preconditions.checkNotNull(channel, "channel"),
Preconditions.checkNotNull(frameReader, "frameReader"),
@@ -98,13 +115,21 @@ public class ReadableInput
}
/**
- * Whether this input is a segment (from {@link #segment(SegmentWithDescriptor)}.
+ * Whether this input is a segment (from {@link #segment(SegmentWithDescriptor)}).
*/
public boolean hasSegment()
{
return segment != null;
}
+ /**
+ * Whether this input is a dataserver query (from {@link #dataServerQuery(DataServerQueryHandler)}).
+ */
+ public boolean hasDataServerQuery()
+ {
+ return dataServerQuery != null;
+ }
+
/**
* Whether this input is a channel (from {@link #channel(ReadableFrameChannel, FrameReader, StagePartition)}.
*/
@@ -122,6 +147,15 @@ public class ReadableInput
return segment;
}
+ /**
+ * The data server query for this input. Only valid if {@link #hasDataServerQuery()}}.
+ */
+ public DataServerQueryHandler getDataServerQuery()
+ {
+ checkIsDataServerQuery();
+ return dataServerQuery;
+ }
+
/**
* The channel for this input. Only valid if {@link #hasChannel()}.
*/
@@ -158,7 +192,14 @@ public class ReadableInput
private void checkIsSegment()
{
if (!hasSegment()) {
- throw new ISE("Not a channel input; cannot call this method");
+ throw new ISE("Not a segment; cannot call this method");
+ }
+ }
+
+ private void checkIsDataServerQuery()
+ {
+ if (!hasDataServerQuery()) {
+ throw new ISE("Not a data server query; cannot call this method");
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
index 714e8dc3a63..03aa7cd0fe4 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
@@ -159,8 +159,7 @@ public class ExternalInputSliceReader implements InputSliceReader
);
return new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
- null,
- new RichSegmentDescriptor(segmentId.toDescriptor(), null, null)
+ new RichSegmentDescriptor(segmentId.toDescriptor(), null)
);
}
);
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java
index 25f06c7cd40..ef58c7723b3 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java
@@ -44,7 +44,7 @@ public class InlineInputSliceReader implements InputSliceReader
{
public static final String SEGMENT_ID = "__inline";
private static final RichSegmentDescriptor DUMMY_SEGMENT_DESCRIPTOR
- = new RichSegmentDescriptor(SegmentId.dummy(SEGMENT_ID).toDescriptor(), null, null);
+ = new RichSegmentDescriptor(SegmentId.dummy(SEGMENT_ID).toDescriptor(), null);
private final SegmentWrangler segmentWrangler;
@@ -75,7 +75,6 @@ public class InlineInputSliceReader implements InputSliceReader
segment -> ReadableInput.segment(
new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
- null,
DUMMY_SEGMENT_DESCRIPTOR
)
)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java
index b601b043ac1..2b327f216f7 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java
@@ -100,8 +100,7 @@ public class LookupInputSliceReader implements InputSliceReader
return ResourceHolder.fromCloseable(segment);
},
- null,
- new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null, null)
+ new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null)
)
)
)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataServerRequestDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataServerRequestDescriptor.java
new file mode 100644
index 00000000000..12901cc9ae8
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataServerRequestDescriptor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Contains information on a set of segments, and the {@link DruidServerMetadata} of a data server serving
+ * those segments. Used by MSQ to query dataservers directly.
+ */
+public class DataServerRequestDescriptor
+{
+ private final DruidServerMetadata serverMetadata;
+ private final List segments;
+
+ @JsonCreator
+ public DataServerRequestDescriptor(
+ @JsonProperty("serverMetadata") DruidServerMetadata serverMetadata,
+ @JsonProperty("segments") List segments
+ )
+ {
+ this.segments = segments;
+ this.serverMetadata = serverMetadata;
+ }
+
+ @JsonProperty("serverMetadata")
+ public DruidServerMetadata getServerMetadata()
+ {
+ return serverMetadata;
+ }
+
+ @JsonProperty("segments")
+ public List getSegments()
+ {
+ return segments;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DataServerRequestDescriptor that = (DataServerRequestDescriptor) o;
+ return Objects.equals(serverMetadata, that.serverMetadata) && Objects.equals(
+ segments,
+ that.segments
+ );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(serverMetadata, segments);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DataServerRequestDescriptor{" +
+ "serverMetadata=" + serverMetadata +
+ ", segments=" + segments +
+ '}';
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataServerSelector.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataServerSelector.java
new file mode 100644
index 00000000000..2e5e1f5dbff
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataServerSelector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input.table;
+
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.jboss.netty.util.internal.ThreadLocalRandom;
+
+import java.util.Set;
+import java.util.function.Function;
+
+public enum DataServerSelector
+{
+ RANDOM(servers -> servers.stream()
+ .skip(ThreadLocalRandom.current().nextInt(servers.size()))
+ .findFirst()
+ .orElse(null));
+
+ private final Function, DruidServerMetadata> selectServer;
+
+ DataServerSelector(Function, DruidServerMetadata> selectServer)
+ {
+ this.selectServer = selectServer;
+ }
+
+ public Function, DruidServerMetadata> getSelectServerFunction()
+ {
+ return selectServer;
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
index 04e4e601b07..27f5202b6ce 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
@@ -23,54 +23,45 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Objects;
-import java.util.Set;
/**
- * Like {@link SegmentDescriptor}, but provides both the full interval and the clipped interval for a segment
- * (SegmentDescriptor only provides the clipped interval.), as well as the metadata of the servers it is loaded on.
+ * Like {@link SegmentDescriptor}, but provides both the full interval and the clipped interval for a segment.
+ * (SegmentDescriptor only provides the clipped interval.)
*
* To keep the serialized form lightweight, the full interval is only serialized if it is different from the
* clipped interval.
*
* It is possible to deserialize this class as {@link SegmentDescriptor}. However, going the other direction is
- * not a good idea, because the {@link #fullInterval} and {@link #servers} will not end up being set correctly.
+ * not a good idea, because the {@link #fullInterval} will not end up being set correctly.
*/
public class RichSegmentDescriptor extends SegmentDescriptor
{
@Nullable
private final Interval fullInterval;
- private final Set servers;
public RichSegmentDescriptor(
final Interval fullInterval,
final Interval interval,
final String version,
- final int partitionNumber,
- final Set servers
+ final int partitionNumber
)
{
super(interval, version, partitionNumber);
this.fullInterval = interval.equals(Preconditions.checkNotNull(fullInterval, "fullInterval")) ? null : fullInterval;
- this.servers = servers;
}
public RichSegmentDescriptor(
SegmentDescriptor segmentDescriptor,
- @Nullable Interval fullInterval,
- Set servers
+ @Nullable Interval fullInterval
)
{
super(segmentDescriptor.getInterval(), segmentDescriptor.getVersion(), segmentDescriptor.getPartitionNumber());
this.fullInterval = fullInterval;
- this.servers = servers;
}
@JsonCreator
@@ -78,33 +69,17 @@ public class RichSegmentDescriptor extends SegmentDescriptor
@JsonProperty("fi") @Nullable final Interval fullInterval,
@JsonProperty("itvl") final Interval interval,
@JsonProperty("ver") final String version,
- @JsonProperty("part") final int partitionNumber,
- @JsonProperty("servers") @Nullable final Set servers
+ @JsonProperty("part") final int partitionNumber
)
{
return new RichSegmentDescriptor(
fullInterval != null ? fullInterval : interval,
interval,
version,
- partitionNumber,
- servers == null ? ImmutableSet.of() : servers
+ partitionNumber
);
}
- /**
- * Returns true if the location the segment is loaded is available, and false if it is not.
- */
- public boolean isLoadedOnServer()
- {
- return !CollectionUtils.isNullOrEmpty(getServers());
- }
-
- @JsonProperty("servers")
- public Set getServers()
- {
- return servers;
- }
-
public Interval getFullInterval()
{
return fullInterval == null ? getInterval() : fullInterval;
@@ -131,13 +106,13 @@ public class RichSegmentDescriptor extends SegmentDescriptor
return false;
}
RichSegmentDescriptor that = (RichSegmentDescriptor) o;
- return Objects.equals(fullInterval, that.fullInterval) && Objects.equals(servers, that.servers);
+ return Objects.equals(fullInterval, that.fullInterval);
}
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), fullInterval, servers);
+ return Objects.hash(super.hashCode(), fullInterval);
}
@Override
@@ -145,7 +120,6 @@ public class RichSegmentDescriptor extends SegmentDescriptor
{
return "RichSegmentDescriptor{" +
"fullInterval=" + (fullInterval == null ? getInterval() : fullInterval) +
- ", servers=" + getServers() +
", interval=" + getInterval() +
", version='" + getVersion() + '\'' +
", partitionNumber=" + getPartitionNumber() +
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
index 137129ed338..b9026c7b9fb 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
@@ -21,19 +21,9 @@ package org.apache.druid.msq.input.table;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.msq.exec.LoadedSegmentDataProvider;
-import org.apache.druid.query.Query;
import org.apache.druid.segment.Segment;
-import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.Objects;
-import java.util.function.Function;
import java.util.function.Supplier;
/**
@@ -42,8 +32,6 @@ import java.util.function.Supplier;
public class SegmentWithDescriptor
{
private final Supplier extends ResourceHolder> segmentSupplier;
- @Nullable
- private final LoadedSegmentDataProvider loadedSegmentDataProvider;
private final RichSegmentDescriptor descriptor;
/**
@@ -51,18 +39,14 @@ public class SegmentWithDescriptor
*
* @param segmentSupplier supplier of a {@link ResourceHolder} of segment. The {@link ResourceHolder#close()}
* logic must include a delegated call to {@link Segment#close()}.
- * @param loadedSegmentDataProvider {@link LoadedSegmentDataProvider} which fetches the corresponding results from a
- * data server where the segment is loaded. The call will fetch the
* @param descriptor segment descriptor
*/
public SegmentWithDescriptor(
final Supplier extends ResourceHolder> segmentSupplier,
- final @Nullable LoadedSegmentDataProvider loadedSegmentDataProvider,
final RichSegmentDescriptor descriptor
)
{
this.segmentSupplier = Preconditions.checkNotNull(segmentSupplier, "segment");
- this.loadedSegmentDataProvider = loadedSegmentDataProvider;
this.descriptor = Preconditions.checkNotNull(descriptor, "descriptor");
}
@@ -80,19 +64,6 @@ public class SegmentWithDescriptor
return segmentSupplier.get();
}
- public Pair> fetchRowsFromDataServer(
- Query query,
- Function, Sequence> mappingFunction,
- Closer closer
- ) throws IOException
- {
- if (loadedSegmentDataProvider == null) {
- throw DruidException.defensive("loadedSegmentDataProvider was null. Fetching segments from servers is not "
- + "supported for segment[%s]", descriptor);
- }
- return loadedSegmentDataProvider.fetchRowsFromDataServer(query, descriptor, mappingFunction, closer);
- }
-
/**
* The segment descriptor associated with this physical segment.
*/
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java
index dadaf0254da..8cc6cf4ca83 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java
@@ -29,11 +29,11 @@ import java.util.Objects;
/**
* Input slice representing a set of segments to read.
- *
+ *
* Sliced from {@link TableInputSpec} by {@link TableInputSpecSlicer}.
- *
+ *
* Similar to {@link org.apache.druid.query.spec.MultipleSpecificSegmentSpec} from native queries.
- *
+ *
* These use {@link RichSegmentDescriptor}, not {@link org.apache.druid.timeline.DataSegment}, to minimize overhead
* in scenarios where the target server already has the segment cached. If the segment isn't cached, the target
* server does need to fetch the full {@link org.apache.druid.timeline.DataSegment} object, so it can get the
@@ -44,15 +44,18 @@ public class SegmentsInputSlice implements InputSlice
{
private final String dataSource;
private final List descriptors;
+ private final List servedSegments;
@JsonCreator
public SegmentsInputSlice(
@JsonProperty("dataSource") String dataSource,
- @JsonProperty("segments") List descriptors
+ @JsonProperty("segments") List descriptors,
+ @JsonProperty("servedSegments") List servedSegments
)
{
this.dataSource = dataSource;
this.descriptors = descriptors;
+ this.servedSegments = servedSegments;
}
@JsonProperty
@@ -67,6 +70,12 @@ public class SegmentsInputSlice implements InputSlice
return descriptors;
}
+ @JsonProperty("servedSegments")
+ public List getServedSegments()
+ {
+ return servedSegments;
+ }
+
@Override
public int fileCount()
{
@@ -83,13 +92,16 @@ public class SegmentsInputSlice implements InputSlice
return false;
}
SegmentsInputSlice that = (SegmentsInputSlice) o;
- return Objects.equals(dataSource, that.dataSource) && Objects.equals(descriptors, that.descriptors);
+ return Objects.equals(dataSource, that.dataSource) && Objects.equals(
+ descriptors,
+ that.descriptors
+ ) && Objects.equals(servedSegments, that.servedSegments);
}
@Override
public int hashCode()
{
- return Objects.hash(dataSource, descriptors);
+ return Objects.hash(dataSource, descriptors, servedSegments);
}
@Override
@@ -98,6 +110,7 @@ public class SegmentsInputSlice implements InputSlice
return "SegmentsInputSlice{" +
"dataSource='" + dataSource + '\'' +
", descriptors=" + descriptors +
+ ", servedSegments=" + servedSegments +
'}';
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
index 8bc67dbb4e8..648bd95c6c3 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
@@ -23,7 +23,8 @@ import com.google.common.collect.Iterators;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterTracker;
-import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
+import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
@@ -42,13 +43,13 @@ import java.util.function.Consumer;
public class SegmentsInputSliceReader implements InputSliceReader
{
private final DataSegmentProvider dataSegmentProvider;
- private final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory;
+ private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
private final boolean isReindex;
public SegmentsInputSliceReader(final FrameContext frameContext, final boolean isReindex)
{
this.dataSegmentProvider = frameContext.dataSegmentProvider();
- this.loadedSegmentDataProviderFactory = frameContext.loadedSegmentDataProviderFactory();
+ this.dataServerQueryHandlerFactory = frameContext.dataServerQueryHandlerFactory();
this.isReindex = isReindex;
}
@@ -56,7 +57,7 @@ public class SegmentsInputSliceReader implements InputSliceReader
public int numReadableInputs(InputSlice slice)
{
final SegmentsInputSlice segmentsInputSlice = (SegmentsInputSlice) slice;
- return segmentsInputSlice.getDescriptors().size();
+ return segmentsInputSlice.getDescriptors().size() + segmentsInputSlice.getServedSegments().size();
}
@Override
@@ -69,16 +70,23 @@ public class SegmentsInputSliceReader implements InputSliceReader
{
final SegmentsInputSlice segmentsInputSlice = (SegmentsInputSlice) slice;
- return ReadableInputs.segments(
- () -> Iterators.transform(
+ Iterator segmentIterator =
+ Iterators.transform(
dataSegmentIterator(
segmentsInputSlice.getDataSource(),
segmentsInputSlice.getDescriptors(),
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
- ),
- ReadableInput::segment
- )
- );
+ ), ReadableInput::segment);
+
+ Iterator dataServerIterator =
+ Iterators.transform(
+ dataServerIterator(
+ segmentsInputSlice.getDataSource(),
+ segmentsInputSlice.getServedSegments(),
+ counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
+ ), ReadableInput::dataServerQuery);
+
+ return ReadableInputs.segments(() -> Iterators.concat(dataServerIterator, segmentIterator));
}
private Iterator dataSegmentIterator(
@@ -98,10 +106,24 @@ public class SegmentsInputSliceReader implements InputSliceReader
return new SegmentWithDescriptor(
dataSegmentProvider.fetchSegment(segmentId, channelCounters, isReindex),
- descriptor.isLoadedOnServer() ? loadedSegmentDataProviderFactory.createLoadedSegmentDataProvider(dataSource, channelCounters) : null,
descriptor
);
}
).iterator();
}
+
+ private Iterator dataServerIterator(
+ final String dataSource,
+ final List servedSegments,
+ final ChannelCounters channelCounters
+ )
+ {
+ return servedSegments.stream().map(
+ dataServerRequestDescriptor -> dataServerQueryHandlerFactory.createDataServerQueryHandler(
+ dataSource,
+ channelCounters,
+ dataServerRequestDescriptor
+ )
+ ).iterator();
+ }
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
index 1cd82f726ed..7e93324ce68 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java
@@ -20,6 +20,8 @@
package org.apache.druid.msq.input.table;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
@@ -27,6 +29,7 @@ import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils;
import org.apache.druid.msq.querykit.DataSegmentTimelineView;
import org.apache.druid.query.filter.DimFilterUtils;
+import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.joda.time.Interval;
@@ -34,9 +37,12 @@ import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
@@ -61,10 +67,24 @@ public class TableInputSpecSlicer implements InputSpecSlicer
public List sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
- final List> assignments =
+
+ final List prunedPublishedSegments = new ArrayList<>();
+ final List prunedServedSegments = new ArrayList<>();
+
+ for (DataSegmentWithInterval dataSegmentWithInterval : getPrunedSegmentSet(tableInputSpec)) {
+ if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
+ prunedServedSegments.add(dataSegmentWithInterval);
+ } else {
+ prunedPublishedSegments.add(dataSegmentWithInterval);
+ }
+ }
+
+ final List groupedServedSegments = createWeightedSegmentSet(prunedServedSegments);
+
+ final List> assignments =
SlicerUtils.makeSlicesStatic(
- getPrunedSegmentSet(tableInputSpec).iterator(),
- segment -> segment.getSegment().getSize(),
+ Iterators.concat(groupedServedSegments.iterator(), prunedPublishedSegments.iterator()),
+ WeightedInputInstance::getWeight,
maxNumSlices
);
return makeSlices(tableInputSpec, assignments);
@@ -79,10 +99,25 @@ public class TableInputSpecSlicer implements InputSpecSlicer
)
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
- final List> assignments =
+
+ final List prunedSegments = new ArrayList<>();
+ final List prunedServedSegments = new ArrayList<>();
+
+ for (DataSegmentWithInterval dataSegmentWithInterval : getPrunedSegmentSet(tableInputSpec)) {
+ if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
+ prunedServedSegments.add(dataSegmentWithInterval);
+ } else {
+ prunedSegments.add(dataSegmentWithInterval);
+ }
+ }
+ List groupedServedSegments = createWeightedSegmentSet(prunedServedSegments);
+
+ prunedSegments.addAll(groupedServedSegments);
+
+ final List> assignments =
SlicerUtils.makeSlicesDynamic(
- getPrunedSegmentSet(tableInputSpec).iterator(),
- segment -> segment.getSegment().getSize(),
+ prunedSegments.iterator(),
+ WeightedInputInstance::getWeight,
maxNumSlices,
maxFilesPerSlice,
maxBytesPerSlice
@@ -126,28 +161,75 @@ public class TableInputSpecSlicer implements InputSpecSlicer
private static List makeSlices(
final TableInputSpec tableInputSpec,
- final List> assignments
+ final List> assignments
)
{
final List retVal = new ArrayList<>(assignments.size());
- for (final List assignment : assignments) {
+ for (final List assignment : assignments) {
+
final List descriptors = new ArrayList<>();
- for (final DataSegmentWithInterval dataSegmentWithInterval : assignment) {
- descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+ final List dataServerRequests = new ArrayList<>();
+
+ for (final WeightedInputInstance weightedSegment : assignment) {
+ if (weightedSegment instanceof DataSegmentWithInterval) {
+ DataSegmentWithInterval dataSegmentWithInterval = (DataSegmentWithInterval) weightedSegment;
+ descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+ } else {
+ DataServerRequest serverRequest = (DataServerRequest) weightedSegment;
+ dataServerRequests.add(serverRequest.toDataServerRequestDescriptor());
+ }
}
- if (descriptors.isEmpty()) {
+ if (descriptors.isEmpty() && dataServerRequests.isEmpty()) {
retVal.add(NilInputSlice.INSTANCE);
} else {
- retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(), descriptors));
+ retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(), descriptors, dataServerRequests));
}
}
return retVal;
}
- private static class DataSegmentWithInterval
+ /**
+ * Creates a list of {@link WeightedInputInstance} from the prunedServedSegments parameter.
+ * The function selects a data server from the servers hosting the segment, and then groups segments on the basis of
+ * data servers.
+ * The returned value is a list of {@link WeightedInputInstance}, each of which denotes either a {@link DataSegmentWithInterval},
+ * in the case of a segment or a {@link DataServerRequest} for a request to a data server. A data server request fetches
+ * the results of all relevent segments from the data server.
+ */
+ private static List createWeightedSegmentSet(List prunedServedSegments)
+ {
+ // Create a map of server to segment for loaded segments.
+ final Map> serverVsSegmentsMap = new HashMap<>();
+ for (DataSegmentWithInterval dataSegmentWithInterval : prunedServedSegments) {
+ DataSegmentWithLocation segmentWithLocation = (DataSegmentWithLocation) dataSegmentWithInterval.segment;
+ // Choose a server out of the ones available.
+ DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(segmentWithLocation.getServers());
+
+ serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>());
+ serverVsSegmentsMap.get(druidServerMetadata).add(dataSegmentWithInterval);
+ }
+
+ List retVal = new ArrayList<>();
+ for (Map.Entry> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) {
+ DataServerRequest dataServerRequest = new DataServerRequest(
+ druidServerMetadataSetEntry.getKey(),
+ ImmutableList.copyOf(druidServerMetadataSetEntry.getValue())
+ );
+ retVal.add(dataServerRequest);
+ }
+
+ return retVal;
+ }
+
+ private interface WeightedInputInstance
+ {
+ long getWeight();
+ }
+
+ private static class DataSegmentWithInterval implements WeightedInputInstance
{
private final DataSegment segment;
private final Interval interval;
@@ -169,9 +251,42 @@ public class TableInputSpecSlicer implements InputSpecSlicer
segment.getInterval(),
interval,
segment.getVersion(),
- segment.getShardSpec().getPartitionNum(),
- segment instanceof DataSegmentWithLocation ? ((DataSegmentWithLocation) segment).getServers() : null
+ segment.getShardSpec().getPartitionNum()
);
}
+
+ @Override
+ public long getWeight()
+ {
+ return segment.getSize();
+ }
+ }
+
+ private static class DataServerRequest implements WeightedInputInstance
+ {
+ private static final long DATA_SERVER_WEIGHT_ESTIMATION = 5000L;
+ private final List segments;
+ private final DruidServerMetadata serverMetadata;
+
+ public DataServerRequest(DruidServerMetadata serverMetadata, List segments)
+ {
+ this.segments = Preconditions.checkNotNull(segments, "segments");
+ this.serverMetadata = Preconditions.checkNotNull(serverMetadata, "server");
+ }
+
+ @Override
+ public long getWeight()
+ {
+ // Estimate the size of a realtime segment as DATA_SERVER_WEIGHT_ESTIMATION, since we don't store accurate row count in
+ // the coordinator.
+ return segments.size() * DATA_SERVER_WEIGHT_ESTIMATION;
+ }
+
+ public DataServerRequestDescriptor toDataServerRequestDescriptor()
+ {
+ return new DataServerRequestDescriptor(
+ serverMetadata,
+ segments.stream().map(DataSegmentWithInterval::toRichSegmentDescriptor).collect(Collectors.toList()));
+ }
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java
index 49871cecc1d..7db2fa1a9dd 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java
@@ -20,7 +20,7 @@
package org.apache.druid.msq.kernel;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
+import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.query.groupby.GroupingEngine;
@@ -44,7 +44,8 @@ public interface FrameContext
RowIngestionMeters rowIngestionMeters();
DataSegmentProvider dataSegmentProvider();
- LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory();
+
+ DataServerQueryHandlerFactory dataServerQueryHandlerFactory();
File tempDir();
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index f67f30d0c5c..b3349a9be70 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -29,8 +29,10 @@ import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
@@ -63,7 +65,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor