Optimize MSQ realtime queries (#15399)

Currently, while reading results from realtime tasks, requests are sent on a segment level. This is slightly wasteful, as when contacting a data servers, it is possible to transfer results for all segments which it is hosting, instead of only one segment at a time.

One change this PR makes is to group the segments on the basis of servers. This reduces the number of queries to data servers made. Since we don't have access to the number of rows for realtime segments, the grouping is done with a fixed estimated number of rows for each realtime segment.
This commit is contained in:
Adarsh Sanjeev 2024-02-28 11:32:14 +05:30 committed by GitHub
parent 3df161f73c
commit d2c2036ea2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 1743 additions and 757 deletions

View File

@ -0,0 +1,381 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.discovery.DataServerClient;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.msq.input.table.DataServerSelector;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.rpc.RpcException;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.server.coordination.DruidServerMetadata;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Class responsible for querying dataservers and retriving results for a given query. Also queries the coordinator
* to check if a segment has been handed off.
*/
public class DataServerQueryHandler
{
private static final Logger log = new Logger(DataServerQueryHandler.class);
private static final int DEFAULT_NUM_TRIES = 3;
private static final int PER_SERVER_QUERY_NUM_TRIES = 5;
private final String dataSource;
private final ChannelCounters channelCounters;
private final ServiceClientFactory serviceClientFactory;
private final CoordinatorClient coordinatorClient;
private final ObjectMapper objectMapper;
private final QueryToolChestWarehouse warehouse;
private final ScheduledExecutorService queryCancellationExecutor;
private final DataServerRequestDescriptor dataServerRequestDescriptor;
public DataServerQueryHandler(
String dataSource,
ChannelCounters channelCounters,
ServiceClientFactory serviceClientFactory,
CoordinatorClient coordinatorClient,
ObjectMapper objectMapper,
QueryToolChestWarehouse warehouse,
ScheduledExecutorService queryCancellationExecutor,
DataServerRequestDescriptor dataServerRequestDescriptor
)
{
this.dataSource = dataSource;
this.channelCounters = channelCounters;
this.serviceClientFactory = serviceClientFactory;
this.coordinatorClient = coordinatorClient;
this.objectMapper = objectMapper;
this.warehouse = warehouse;
this.queryCancellationExecutor = queryCancellationExecutor;
this.dataServerRequestDescriptor = dataServerRequestDescriptor;
}
@VisibleForTesting
DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
{
return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper, queryCancellationExecutor);
}
/**
* Performs some necessary transforms to the query, so that the dataserver is able to understand it first.
* - Changing the datasource to a {@link TableDataSource}
* - Limiting the query to the required segments with {@link Queries#withSpecificSegments(Query, List)}
* <br>
* Then queries a data server and returns a {@link Yielder} for the results, retrying if needed. If a dataserver
* indicates that some segments were not found, checks with the coordinator to see if the segment was handed off.
* - If all the segments were handed off, returns a {@link DataServerQueryResult} with the yielder and list of handed
* off segments.
* - If some segments were not handed off, checks with the coordinator fetch an updated list of servers. This step is
* repeated up to {@link #DEFAULT_NUM_TRIES} times.
* - If the servers could not be found, checks if the segment was handed-off. If it was, returns a
* {@link DataServerQueryResult} with the yielder and list of handed off segments. Otherwise, throws an exception.
* <br>
* Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, MetricManipulationFn)} and reports channel
* metrics on the returned results.
*
* @param <QueryType> result return type for the query from the data server
* @param <RowType> type of the result rows after parsing from QueryType object
*/
public <RowType, QueryType> DataServerQueryResult<RowType> fetchRowsFromDataServer(
Query<QueryType> query,
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
Closer closer
)
{
// MSQ changes the datasource to a number datasource. This needs to be changed back for data servers to understand.
final Query<QueryType> preparedQuery = query.withDataSource(new TableDataSource(dataSource));
final List<Yielder<RowType>> yielders = new ArrayList<>();
final List<RichSegmentDescriptor> handedOffSegments = new ArrayList<>();
List<DataServerRequestDescriptor> pendingRequests = ImmutableList.of(dataServerRequestDescriptor);
final int maxRetries = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES);
int retryCount = 0;
while (!pendingRequests.isEmpty()) {
final ResponseContext responseContext = new DefaultResponseContext();
final Set<RichSegmentDescriptor> processedSegments = new HashSet<>();
for (DataServerRequestDescriptor descriptor : pendingRequests) {
log.info("Querying server [%s] for segments[%s]", descriptor.getServerMetadata(), descriptor.getSegments());
processedSegments.addAll(descriptor.getSegments());
Yielder<RowType> yielder = fetchRowsFromDataServerInternal(descriptor, responseContext, closer, preparedQuery, mappingFunction);
// Add results
if (yielder != null && !yielder.isDone()) {
yielders.add(yielder);
}
}
// Check for missing segments
List<SegmentDescriptor> missingSegments = getMissingSegments(responseContext);
if (missingSegments.isEmpty()) {
// No segments remaining.
break;
}
final List<SegmentDescriptor> handedOffSegmentDescriptors = checkSegmentHandoff(missingSegments);
Set<RichSegmentDescriptor> missingRichSegmentDescriptors = new HashSet<>();
for (RichSegmentDescriptor richSegmentDescriptor : processedSegments) {
SegmentDescriptor segmentDescriptor = toSegmentDescriptorWithFullInterval(richSegmentDescriptor);
if (missingSegments.contains(segmentDescriptor)) {
if (handedOffSegmentDescriptors.contains(segmentDescriptor)) {
handedOffSegments.add(richSegmentDescriptor);
} else {
missingRichSegmentDescriptors.add(richSegmentDescriptor);
}
}
}
pendingRequests = createNextPendingRequests(
missingRichSegmentDescriptors,
MultiStageQueryContext.getSegmentSources(query.context()),
DataServerSelector.RANDOM
);
if (!pendingRequests.isEmpty()) {
retryCount++;
if (retryCount > maxRetries) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Unable to fetch results from dataservers in [%d] retries.", retryCount);
}
}
}
return new DataServerQueryResult<>(yielders, handedOffSegments, dataSource);
}
private <QueryType, RowType> Yielder<RowType> fetchRowsFromDataServerInternal(
final DataServerRequestDescriptor requestDescriptor,
final ResponseContext responseContext,
final Closer closer,
final Query<QueryType> query,
final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction
)
{
final ServiceLocation serviceLocation = ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
final DataServerClient dataServerClient = makeDataServerClient(serviceLocation);
final QueryToolChest<QueryType, Query<QueryType>> toolChest = warehouse.getToolChest(query);
final Function<QueryType, QueryType> preComputeManipulatorFn =
toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing());
final JavaType queryResultType = toolChest.getBaseResultType();
final List<SegmentDescriptor> segmentDescriptors = requestDescriptor.getSegments()
.stream()
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
.collect(Collectors.toList());
try {
return RetryUtils.retry(
() -> closer.register(createYielder(
dataServerClient.run(
Queries.withSpecificSegments(
query,
requestDescriptor.getSegments()
.stream()
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
.collect(Collectors.toList())
), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)),
throwable -> !(throwable instanceof QueryInterruptedException
&& throwable.getCause() instanceof InterruptedException),
PER_SERVER_QUERY_NUM_TRIES
);
}
catch (QueryInterruptedException e) {
if (e.getCause() instanceof RpcException) {
// In the case that all the realtime servers for a segment are gone (for example, if they were scaled down),
// we would also be unable to fetch the segment.
responseContext.addMissingSegments(segmentDescriptors);
return Yielders.each(Sequences.empty());
} else {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation);
}
}
catch (Exception e) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation);
}
}
private <RowType, QueryType> Yielder<RowType> createYielder(
final Sequence<QueryType> sequence,
final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction
)
{
return Yielders.each(
mappingFunction.apply(sequence)
.map(row -> {
channelCounters.incrementRowCount();
return row;
})
);
}
private List<DataServerRequestDescriptor> createNextPendingRequests(
final Set<RichSegmentDescriptor> richSegmentDescriptors,
final SegmentSource includeSegmentSource,
final DataServerSelector dataServerSelector
)
{
final Map<DruidServerMetadata, Set<RichSegmentDescriptor>> serverVsSegmentsMap = new HashMap<>();
Iterable<ImmutableSegmentLoadInfo> immutableSegmentLoadInfos =
coordinatorClient.fetchServerViewSegments(
dataSource,
richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList())
);
Map<SegmentDescriptor, ImmutableSegmentLoadInfo> segmentVsServerMap = new HashMap<>();
immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> {
segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo);
});
for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) {
SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(richSegmentDescriptor);
if (!segmentVsServerMap.containsKey(segmentDescriptorWithFullInterval)) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Could not find a server for segment[%s]", richSegmentDescriptor);
}
ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(segmentDescriptorWithFullInterval);
if (segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval)) {
Set<DruidServerMetadata> servers = segmentLoadInfo.getServers()
.stream()
.filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes()
.contains(druidServerMetadata.getType()))
.collect(Collectors.toSet());
if (servers.isEmpty()) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", includeSegmentSource, richSegmentDescriptor, servers);
}
DruidServerMetadata druidServerMetadata = dataServerSelector.getSelectServerFunction().apply(servers);
serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>());
SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor();
serverVsSegmentsMap.get(druidServerMetadata)
.add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber()));
}
}
final List<DataServerRequestDescriptor> requestDescriptors = new ArrayList<>();
for (Map.Entry<DruidServerMetadata, Set<RichSegmentDescriptor>> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) {
DataServerRequestDescriptor dataServerRequest = new DataServerRequestDescriptor(
druidServerMetadataSetEntry.getKey(),
ImmutableList.copyOf(druidServerMetadataSetEntry.getValue())
);
requestDescriptors.add(dataServerRequest);
}
return requestDescriptors;
}
/**
* Retreives the list of missing segments from the response context.
*/
private static List<SegmentDescriptor> getMissingSegments(final ResponseContext responseContext)
{
List<SegmentDescriptor> missingSegments = responseContext.getMissingSegments();
if (missingSegments == null) {
return ImmutableList.of();
}
return missingSegments;
}
/**
* Queries the coordinator to check if a list of segments has been handed off.
* Returns a list of segments which have been handed off.
* <br>
* See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)}
*/
private List<SegmentDescriptor> checkSegmentHandoff(List<SegmentDescriptor> segmentDescriptors)
{
try {
List<SegmentDescriptor> handedOffSegments = new ArrayList<>();
for (SegmentDescriptor segmentDescriptor : segmentDescriptors) {
Boolean wasHandedOff = FutureUtils.get(
coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor),
true
);
if (Boolean.TRUE.equals(wasHandedOff)) {
handedOffSegments.add(segmentDescriptor);
}
}
return handedOffSegments;
}
catch (Exception e) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Could not contact coordinator");
}
}
static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor)
{
return new SegmentDescriptor(
richSegmentDescriptor.getFullInterval(),
richSegmentDescriptor.getVersion(),
richSegmentDescriptor.getPartitionNumber()
);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.rpc.ServiceClientFactory;
@ -33,11 +34,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Creates new instances of {@link LoadedSegmentDataProvider} and manages the cancellation threadpool.
* Creates new instances of {@link DataServerQueryHandler} and manages the cancellation threadpool.
*/
public class LoadedSegmentDataProviderFactory implements Closeable
public class DataServerQueryHandlerFactory implements Closeable
{
private static final Logger log = new Logger(LoadedSegmentDataProviderFactory.class);
private static final Logger log = new Logger(DataServerQueryHandlerFactory.class);
private static final int DEFAULT_THREAD_COUNT = 4;
private final CoordinatorClient coordinatorClient;
private final ServiceClientFactory serviceClientFactory;
@ -45,7 +46,7 @@ public class LoadedSegmentDataProviderFactory implements Closeable
private final QueryToolChestWarehouse warehouse;
private final ScheduledExecutorService queryCancellationExecutor;
public LoadedSegmentDataProviderFactory(
public DataServerQueryHandlerFactory(
CoordinatorClient coordinatorClient,
ServiceClientFactory serviceClientFactory,
ObjectMapper objectMapper,
@ -59,19 +60,21 @@ public class LoadedSegmentDataProviderFactory implements Closeable
this.queryCancellationExecutor = ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor");
}
public LoadedSegmentDataProvider createLoadedSegmentDataProvider(
public DataServerQueryHandler createDataServerQueryHandler(
String dataSource,
ChannelCounters channelCounters
ChannelCounters channelCounters,
DataServerRequestDescriptor dataServerRequestDescriptor
)
{
return new LoadedSegmentDataProvider(
return new DataServerQueryHandler(
dataSource,
channelCounters,
serviceClientFactory,
coordinatorClient,
objectMapper,
warehouse,
queryCancellationExecutor
queryCancellationExecutor,
dataServerRequestDescriptor
);
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import java.util.List;
/**
* Contains the results for a query to a dataserver. {@link #resultsYielders} contains the results fetched and
* {@link #segmentsInputSlice} is an {@link SegmentsInputSlice} containing the segments which have already been handed
* off, so that it can be fetched from deep storage.
*/
public class DataServerQueryResult<RowType>
{
private final List<Yielder<RowType>> resultsYielders;
private final SegmentsInputSlice segmentsInputSlice;
public DataServerQueryResult(
List<Yielder<RowType>> resultsYielders,
List<RichSegmentDescriptor> handedOffSegments,
String dataSource
)
{
this.resultsYielders = resultsYielders;
this.segmentsInputSlice = new SegmentsInputSlice(dataSource, handedOffSegments, ImmutableList.of());
}
public List<Yielder<RowType>> getResultsYielders()
{
return resultsYielders;
}
public SegmentsInputSlice getHandedOffSegments()
{
return segmentsInputSlice;
}
}

View File

@ -1,268 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.discovery.DataServerClient;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.rpc.FixedSetServiceLocator;
import org.apache.druid.rpc.RpcException;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
/**
* Class responsible for querying dataservers and retriving results for a given query. Also queries the coordinator
* to check if a segment has been handed off.
*/
public class LoadedSegmentDataProvider
{
private static final Logger log = new Logger(LoadedSegmentDataProvider.class);
private static final int DEFAULT_NUM_TRIES = 5;
private final String dataSource;
private final ChannelCounters channelCounters;
private final ServiceClientFactory serviceClientFactory;
private final CoordinatorClient coordinatorClient;
private final ObjectMapper objectMapper;
private final QueryToolChestWarehouse warehouse;
private final ScheduledExecutorService queryCancellationExecutor;
public LoadedSegmentDataProvider(
String dataSource,
ChannelCounters channelCounters,
ServiceClientFactory serviceClientFactory,
CoordinatorClient coordinatorClient,
ObjectMapper objectMapper,
QueryToolChestWarehouse warehouse,
ScheduledExecutorService queryCancellationExecutor
)
{
this.dataSource = dataSource;
this.channelCounters = channelCounters;
this.serviceClientFactory = serviceClientFactory;
this.coordinatorClient = coordinatorClient;
this.objectMapper = objectMapper;
this.warehouse = warehouse;
this.queryCancellationExecutor = queryCancellationExecutor;
}
@VisibleForTesting
DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
{
return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper, queryCancellationExecutor);
}
/**
* Performs some necessary transforms to the query, so that the dataserver is able to understand it first.
* - Changing the datasource to a {@link TableDataSource}
* - Limiting the query to a single required segment with {@link Queries#withSpecificSegments(Query, List)}
* <br>
* Then queries a data server and returns a {@link Yielder} for the results, retrying if needed. If a dataserver
* indicates that the segment was not found, checks with the coordinator to see if the segment was handed off.
* - If the segment was handed off, returns with a {@link DataServerQueryStatus#HANDOFF} status.
* - If the segment was not handed off, retries with the known list of servers and throws an exception if the retry
* count is exceeded.
* - If the servers could not be found, checks if the segment was handed-off. If it was, returns with a
* {@link DataServerQueryStatus#HANDOFF} status. Otherwise, throws an exception.
* <br>
* Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, MetricManipulationFn)} and reports channel
* metrics on the returned results.
*
* @param <QueryType> result return type for the query from the data server
* @param <RowType> type of the result rows after parsing from QueryType object
*/
public <RowType, QueryType> Pair<DataServerQueryStatus, Yielder<RowType>> fetchRowsFromDataServer(
Query<QueryType> query,
RichSegmentDescriptor segmentDescriptor,
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
Closer closer
) throws IOException
{
final Query<QueryType> preparedQuery = Queries.withSpecificSegments(
query.withDataSource(new TableDataSource(dataSource)),
ImmutableList.of(segmentDescriptor)
);
final Set<DruidServerMetadata> servers = segmentDescriptor.getServers();
final FixedSetServiceLocator fixedSetServiceLocator = FixedSetServiceLocator.forDruidServerMetadata(servers);
final QueryToolChest<QueryType, Query<QueryType>> toolChest = warehouse.getToolChest(query);
final Function<QueryType, QueryType> preComputeManipulatorFn =
toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing());
final JavaType queryResultType = toolChest.getBaseResultType();
final int numRetriesOnMissingSegments = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES);
log.debug("Querying severs[%s] for segment[%s], retries:[%d]", servers, segmentDescriptor, numRetriesOnMissingSegments);
final ResponseContext responseContext = new DefaultResponseContext();
Pair<DataServerQueryStatus, Yielder<RowType>> statusSequencePair;
try {
// We need to check for handoff to decide if we need to retry. Therefore, we handle it here instead of inside
// the client.
statusSequencePair = RetryUtils.retry(
() -> {
ServiceLocation serviceLocation = CollectionUtils.getOnlyElement(
fixedSetServiceLocator.locate().get().getLocations(),
serviceLocations -> {
throw DruidException.defensive("Should only have one location");
}
);
DataServerClient dataServerClient = makeDataServerClient(serviceLocation);
Sequence<QueryType> sequence = dataServerClient.run(preparedQuery, responseContext, queryResultType, closer)
.map(preComputeManipulatorFn);
final List<SegmentDescriptor> missingSegments = getMissingSegments(responseContext);
// Only one segment is fetched, so this should be empty if it was fetched
if (missingSegments.isEmpty()) {
log.debug("Successfully fetched rows from server for segment[%s]", segmentDescriptor);
// Segment was found
Yielder<RowType> yielder = closer.register(
Yielders.each(mappingFunction.apply(sequence)
.map(row -> {
channelCounters.incrementRowCount();
return row;
}))
);
return Pair.of(DataServerQueryStatus.SUCCESS, yielder);
} else {
Boolean wasHandedOff = checkSegmentHandoff(coordinatorClient, dataSource, segmentDescriptor);
if (Boolean.TRUE.equals(wasHandedOff)) {
log.debug("Segment[%s] was handed off.", segmentDescriptor);
return Pair.of(DataServerQueryStatus.HANDOFF, null);
} else {
log.error("Segment[%s] could not be found on data server, but segment was not handed off.", segmentDescriptor);
throw new IOE(
"Segment[%s] could not be found on data server, but segment was not handed off.",
segmentDescriptor
);
}
}
},
throwable -> !(throwable instanceof QueryInterruptedException && throwable.getCause() instanceof InterruptedException),
numRetriesOnMissingSegments
);
return statusSequencePair;
}
catch (QueryInterruptedException e) {
if (e.getCause() instanceof RpcException) {
// In the case that all the realtime servers for a segment are gone (for example, if they were scaled down),
// we would also be unable to fetch the segment. Check if the segment was handed off, just in case, instead of
// failing the query.
boolean wasHandedOff = checkSegmentHandoff(coordinatorClient, dataSource, segmentDescriptor);
if (wasHandedOff) {
log.debug("Segment[%s] was handed off.", segmentDescriptor);
return Pair.of(DataServerQueryStatus.HANDOFF, null);
}
}
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception while fetching rows for query from dataservers[%s]", servers);
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOE.class);
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception while fetching rows for query from dataservers[%s]", servers);
}
}
/**
* Retreives the list of missing segments from the response context.
*/
private static List<SegmentDescriptor> getMissingSegments(final ResponseContext responseContext)
{
List<SegmentDescriptor> missingSegments = responseContext.getMissingSegments();
if (missingSegments == null) {
return ImmutableList.of();
}
return missingSegments;
}
/**
* Queries the coordinator to check if a segment has been handed off.
* <br>
* See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)}
*/
private static boolean checkSegmentHandoff(
CoordinatorClient coordinatorClient,
String dataSource,
SegmentDescriptor segmentDescriptor
) throws IOE
{
Boolean wasHandedOff;
try {
wasHandedOff = FutureUtils.get(coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor), true);
}
catch (Exception e) {
throw new IOE(e, "Could not contact coordinator for segment[%s]", segmentDescriptor);
}
return Boolean.TRUE.equals(wasHandedOff);
}
/**
* Represents the status of fetching a segment from a data server
*/
public enum DataServerQueryStatus
{
/**
* Segment was found on the data server and fetched successfully.
*/
SUCCESS,
/**
* Segment was not found on the realtime server as it has been handed off to a historical. Only returned while
* querying a realtime server.
*/
HANDOFF
}
}

View File

@ -74,7 +74,7 @@ public interface WorkerContext
DruidNode selfNode();
Bouncer processorBouncer();
LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory();
DataServerQueryHandlerFactory dataServerQueryHandlerFactory();
default File tempDir(int stageNumber, String id)
{

View File

@ -295,7 +295,7 @@ public class WorkerImpl implements Worker
{
this.controllerClient = context.makeControllerClient(task.getControllerTaskId());
closer.register(controllerClient::close);
closer.register(context.loadedSegmentDataProviderFactory());
closer.register(context.dataServerQueryHandlerFactory());
context.registerWorker(this, closer); // Uses controllerClient, so must be called after that is initialized
this.workerClient = new ExceptionWrappingWorkerClient(context.makeWorkerClient());

View File

@ -20,7 +20,7 @@
package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.DataSegmentProvider;
@ -39,20 +39,20 @@ public class IndexerFrameContext implements FrameContext
private final IndexIO indexIO;
private final DataSegmentProvider dataSegmentProvider;
private final WorkerMemoryParameters memoryParameters;
private final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory;
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
public IndexerFrameContext(
IndexerWorkerContext context,
IndexIO indexIO,
DataSegmentProvider dataSegmentProvider,
LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory,
DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
WorkerMemoryParameters memoryParameters
)
{
this.context = context;
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
this.loadedSegmentDataProviderFactory = loadedSegmentDataProviderFactory;
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
this.memoryParameters = memoryParameters;
}
@ -81,9 +81,9 @@ public class IndexerFrameContext implements FrameContext
}
@Override
public LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory()
public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
return loadedSegmentDataProviderFactory;
return dataServerQueryHandlerFactory;
}

View File

@ -34,7 +34,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.ControllerClient;
import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.TaskDataSegmentProvider;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
@ -71,7 +71,7 @@ public class IndexerWorkerContext implements WorkerContext
private final Injector injector;
private final IndexIO indexIO;
private final TaskDataSegmentProvider dataSegmentProvider;
private final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory;
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
private final ServiceClientFactory clientFactory;
@GuardedBy("this")
@ -85,7 +85,7 @@ public class IndexerWorkerContext implements WorkerContext
final Injector injector,
final IndexIO indexIO,
final TaskDataSegmentProvider dataSegmentProvider,
final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory,
final DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
final ServiceClientFactory clientFactory
)
{
@ -93,7 +93,7 @@ public class IndexerWorkerContext implements WorkerContext
this.injector = injector;
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
this.loadedSegmentDataProviderFactory = loadedSegmentDataProviderFactory;
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
this.clientFactory = clientFactory;
}
@ -117,7 +117,7 @@ public class IndexerWorkerContext implements WorkerContext
segmentCacheManager,
indexIO
),
new LoadedSegmentDataProviderFactory(
new DataServerQueryHandlerFactory(
toolbox.getCoordinatorClient(),
serviceClientFactory,
smileMapper,
@ -245,7 +245,7 @@ public class IndexerWorkerContext implements WorkerContext
this,
indexIO,
dataSegmentProvider,
loadedSegmentDataProviderFactory,
dataServerQueryHandlerFactory,
WorkerMemoryParameters.createProductionInstanceForWorker(injector, queryDef, stageNumber)
);
}
@ -269,9 +269,9 @@ public class IndexerWorkerContext implements WorkerContext
}
@Override
public LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory()
public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
return loadedSegmentDataProviderFactory;
return dataServerQueryHandlerFactory;
}
private synchronized OverlordClient makeOverlordClient()

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.kernel.StagePartition;
@ -30,7 +31,7 @@ import javax.annotation.Nullable;
/**
* A single item of readable input. Generated by {@link InputSliceReader#attach} from an {@link InputSlice}.
*
* <br>
* Each item is either readable as a {@link org.apache.druid.segment.Segment} or as a {@link ReadableFrameChannel},
* but not both. Check {@link #hasSegment()} and {@link #hasChannel()} to see which one you have.
*/
@ -39,6 +40,9 @@ public class ReadableInput
@Nullable
private final SegmentWithDescriptor segment;
@Nullable
private final DataServerQueryHandler dataServerQuery;
@Nullable
private final ReadableFrameChannel channel;
@ -50,18 +54,20 @@ public class ReadableInput
private ReadableInput(
@Nullable SegmentWithDescriptor segment,
@Nullable DataServerQueryHandler dataServerQuery,
@Nullable ReadableFrameChannel channel,
@Nullable FrameReader frameReader,
@Nullable StagePartition stagePartition
)
{
this.segment = segment;
this.dataServerQuery = dataServerQuery;
this.channel = channel;
this.frameReader = frameReader;
this.stagePartition = stagePartition;
if ((segment == null) == (channel == null)) {
throw new ISE("Provide either 'segment' or 'channel'");
if ((segment == null) && (channel == null) && (dataServerQuery == null)) {
throw new ISE("Provide 'segment', 'dataServerQuery' or 'channel'");
}
}
@ -72,7 +78,17 @@ public class ReadableInput
*/
public static ReadableInput segment(final SegmentWithDescriptor segment)
{
return new ReadableInput(Preconditions.checkNotNull(segment, "segment"), null, null, null);
return new ReadableInput(Preconditions.checkNotNull(segment, "segment"), null, null, null, null);
}
/**
* Create an input associated with a query to a data server
*
* @param dataServerQueryHandler the data server query handler
*/
public static ReadableInput dataServerQuery(final DataServerQueryHandler dataServerQueryHandler)
{
return new ReadableInput(null, Preconditions.checkNotNull(dataServerQueryHandler, "dataServerQuery"), null, null, null);
}
/**
@ -90,6 +106,7 @@ public class ReadableInput
)
{
return new ReadableInput(
null,
null,
Preconditions.checkNotNull(channel, "channel"),
Preconditions.checkNotNull(frameReader, "frameReader"),
@ -98,13 +115,21 @@ public class ReadableInput
}
/**
* Whether this input is a segment (from {@link #segment(SegmentWithDescriptor)}.
* Whether this input is a segment (from {@link #segment(SegmentWithDescriptor)}).
*/
public boolean hasSegment()
{
return segment != null;
}
/**
* Whether this input is a dataserver query (from {@link #dataServerQuery(DataServerQueryHandler)}).
*/
public boolean hasDataServerQuery()
{
return dataServerQuery != null;
}
/**
* Whether this input is a channel (from {@link #channel(ReadableFrameChannel, FrameReader, StagePartition)}.
*/
@ -122,6 +147,15 @@ public class ReadableInput
return segment;
}
/**
* The data server query for this input. Only valid if {@link #hasDataServerQuery()}}.
*/
public DataServerQueryHandler getDataServerQuery()
{
checkIsDataServerQuery();
return dataServerQuery;
}
/**
* The channel for this input. Only valid if {@link #hasChannel()}.
*/
@ -158,7 +192,14 @@ public class ReadableInput
private void checkIsSegment()
{
if (!hasSegment()) {
throw new ISE("Not a channel input; cannot call this method");
throw new ISE("Not a segment; cannot call this method");
}
}
private void checkIsDataServerQuery()
{
if (!hasDataServerQuery()) {
throw new ISE("Not a data server query; cannot call this method");
}
}

View File

@ -159,8 +159,7 @@ public class ExternalInputSliceReader implements InputSliceReader
);
return new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
null,
new RichSegmentDescriptor(segmentId.toDescriptor(), null, null)
new RichSegmentDescriptor(segmentId.toDescriptor(), null)
);
}
);

View File

@ -44,7 +44,7 @@ public class InlineInputSliceReader implements InputSliceReader
{
public static final String SEGMENT_ID = "__inline";
private static final RichSegmentDescriptor DUMMY_SEGMENT_DESCRIPTOR
= new RichSegmentDescriptor(SegmentId.dummy(SEGMENT_ID).toDescriptor(), null, null);
= new RichSegmentDescriptor(SegmentId.dummy(SEGMENT_ID).toDescriptor(), null);
private final SegmentWrangler segmentWrangler;
@ -75,7 +75,6 @@ public class InlineInputSliceReader implements InputSliceReader
segment -> ReadableInput.segment(
new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
null,
DUMMY_SEGMENT_DESCRIPTOR
)
)

View File

@ -100,8 +100,7 @@ public class LookupInputSliceReader implements InputSliceReader
return ResourceHolder.fromCloseable(segment);
},
null,
new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null, null)
new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null)
)
)
)

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.input.table;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.coordination.DruidServerMetadata;
import java.util.List;
import java.util.Objects;
/**
* Contains information on a set of segments, and the {@link DruidServerMetadata} of a data server serving
* those segments. Used by MSQ to query dataservers directly.
*/
public class DataServerRequestDescriptor
{
private final DruidServerMetadata serverMetadata;
private final List<RichSegmentDescriptor> segments;
@JsonCreator
public DataServerRequestDescriptor(
@JsonProperty("serverMetadata") DruidServerMetadata serverMetadata,
@JsonProperty("segments") List<RichSegmentDescriptor> segments
)
{
this.segments = segments;
this.serverMetadata = serverMetadata;
}
@JsonProperty("serverMetadata")
public DruidServerMetadata getServerMetadata()
{
return serverMetadata;
}
@JsonProperty("segments")
public List<RichSegmentDescriptor> getSegments()
{
return segments;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DataServerRequestDescriptor that = (DataServerRequestDescriptor) o;
return Objects.equals(serverMetadata, that.serverMetadata) && Objects.equals(
segments,
that.segments
);
}
@Override
public int hashCode()
{
return Objects.hash(serverMetadata, segments);
}
@Override
public String toString()
{
return "DataServerRequestDescriptor{" +
"serverMetadata=" + serverMetadata +
", segments=" + segments +
'}';
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.input.table;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.jboss.netty.util.internal.ThreadLocalRandom;
import java.util.Set;
import java.util.function.Function;
public enum DataServerSelector
{
RANDOM(servers -> servers.stream()
.skip(ThreadLocalRandom.current().nextInt(servers.size()))
.findFirst()
.orElse(null));
private final Function<Set<DruidServerMetadata>, DruidServerMetadata> selectServer;
DataServerSelector(Function<Set<DruidServerMetadata>, DruidServerMetadata> selectServer)
{
this.selectServer = selectServer;
}
public Function<Set<DruidServerMetadata>, DruidServerMetadata> getSelectServerFunction()
{
return selectServer;
}
}

View File

@ -23,54 +23,45 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Set;
/**
* Like {@link SegmentDescriptor}, but provides both the full interval and the clipped interval for a segment
* (SegmentDescriptor only provides the clipped interval.), as well as the metadata of the servers it is loaded on.
* Like {@link SegmentDescriptor}, but provides both the full interval and the clipped interval for a segment.
* (SegmentDescriptor only provides the clipped interval.)
* <br>
* To keep the serialized form lightweight, the full interval is only serialized if it is different from the
* clipped interval.
* <br>
* It is possible to deserialize this class as {@link SegmentDescriptor}. However, going the other direction is
* not a good idea, because the {@link #fullInterval} and {@link #servers} will not end up being set correctly.
* not a good idea, because the {@link #fullInterval} will not end up being set correctly.
*/
public class RichSegmentDescriptor extends SegmentDescriptor
{
@Nullable
private final Interval fullInterval;
private final Set<DruidServerMetadata> servers;
public RichSegmentDescriptor(
final Interval fullInterval,
final Interval interval,
final String version,
final int partitionNumber,
final Set<DruidServerMetadata> servers
final int partitionNumber
)
{
super(interval, version, partitionNumber);
this.fullInterval = interval.equals(Preconditions.checkNotNull(fullInterval, "fullInterval")) ? null : fullInterval;
this.servers = servers;
}
public RichSegmentDescriptor(
SegmentDescriptor segmentDescriptor,
@Nullable Interval fullInterval,
Set<DruidServerMetadata> servers
@Nullable Interval fullInterval
)
{
super(segmentDescriptor.getInterval(), segmentDescriptor.getVersion(), segmentDescriptor.getPartitionNumber());
this.fullInterval = fullInterval;
this.servers = servers;
}
@JsonCreator
@ -78,33 +69,17 @@ public class RichSegmentDescriptor extends SegmentDescriptor
@JsonProperty("fi") @Nullable final Interval fullInterval,
@JsonProperty("itvl") final Interval interval,
@JsonProperty("ver") final String version,
@JsonProperty("part") final int partitionNumber,
@JsonProperty("servers") @Nullable final Set<DruidServerMetadata> servers
@JsonProperty("part") final int partitionNumber
)
{
return new RichSegmentDescriptor(
fullInterval != null ? fullInterval : interval,
interval,
version,
partitionNumber,
servers == null ? ImmutableSet.of() : servers
partitionNumber
);
}
/**
* Returns true if the location the segment is loaded is available, and false if it is not.
*/
public boolean isLoadedOnServer()
{
return !CollectionUtils.isNullOrEmpty(getServers());
}
@JsonProperty("servers")
public Set<DruidServerMetadata> getServers()
{
return servers;
}
public Interval getFullInterval()
{
return fullInterval == null ? getInterval() : fullInterval;
@ -131,13 +106,13 @@ public class RichSegmentDescriptor extends SegmentDescriptor
return false;
}
RichSegmentDescriptor that = (RichSegmentDescriptor) o;
return Objects.equals(fullInterval, that.fullInterval) && Objects.equals(servers, that.servers);
return Objects.equals(fullInterval, that.fullInterval);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), fullInterval, servers);
return Objects.hash(super.hashCode(), fullInterval);
}
@Override
@ -145,7 +120,6 @@ public class RichSegmentDescriptor extends SegmentDescriptor
{
return "RichSegmentDescriptor{" +
"fullInterval=" + (fullInterval == null ? getInterval() : fullInterval) +
", servers=" + getServers() +
", interval=" + getInterval() +
", version='" + getVersion() + '\'' +
", partitionNumber=" + getPartitionNumber() +

View File

@ -21,19 +21,9 @@ package org.apache.druid.msq.input.table;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.exec.LoadedSegmentDataProvider;
import org.apache.druid.query.Query;
import org.apache.druid.segment.Segment;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
/**
@ -42,8 +32,6 @@ import java.util.function.Supplier;
public class SegmentWithDescriptor
{
private final Supplier<? extends ResourceHolder<Segment>> segmentSupplier;
@Nullable
private final LoadedSegmentDataProvider loadedSegmentDataProvider;
private final RichSegmentDescriptor descriptor;
/**
@ -51,18 +39,14 @@ public class SegmentWithDescriptor
*
* @param segmentSupplier supplier of a {@link ResourceHolder} of segment. The {@link ResourceHolder#close()}
* logic must include a delegated call to {@link Segment#close()}.
* @param loadedSegmentDataProvider {@link LoadedSegmentDataProvider} which fetches the corresponding results from a
* data server where the segment is loaded. The call will fetch the
* @param descriptor segment descriptor
*/
public SegmentWithDescriptor(
final Supplier<? extends ResourceHolder<Segment>> segmentSupplier,
final @Nullable LoadedSegmentDataProvider loadedSegmentDataProvider,
final RichSegmentDescriptor descriptor
)
{
this.segmentSupplier = Preconditions.checkNotNull(segmentSupplier, "segment");
this.loadedSegmentDataProvider = loadedSegmentDataProvider;
this.descriptor = Preconditions.checkNotNull(descriptor, "descriptor");
}
@ -80,19 +64,6 @@ public class SegmentWithDescriptor
return segmentSupplier.get();
}
public <QueryType, RowType> Pair<LoadedSegmentDataProvider.DataServerQueryStatus, Yielder<RowType>> fetchRowsFromDataServer(
Query<QueryType> query,
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
Closer closer
) throws IOException
{
if (loadedSegmentDataProvider == null) {
throw DruidException.defensive("loadedSegmentDataProvider was null. Fetching segments from servers is not "
+ "supported for segment[%s]", descriptor);
}
return loadedSegmentDataProvider.fetchRowsFromDataServer(query, descriptor, mappingFunction, closer);
}
/**
* The segment descriptor associated with this physical segment.
*/

View File

@ -29,11 +29,11 @@ import java.util.Objects;
/**
* Input slice representing a set of segments to read.
*
* <br>
* Sliced from {@link TableInputSpec} by {@link TableInputSpecSlicer}.
*
* <br>
* Similar to {@link org.apache.druid.query.spec.MultipleSpecificSegmentSpec} from native queries.
*
* <br>
* These use {@link RichSegmentDescriptor}, not {@link org.apache.druid.timeline.DataSegment}, to minimize overhead
* in scenarios where the target server already has the segment cached. If the segment isn't cached, the target
* server does need to fetch the full {@link org.apache.druid.timeline.DataSegment} object, so it can get the
@ -44,15 +44,18 @@ public class SegmentsInputSlice implements InputSlice
{
private final String dataSource;
private final List<RichSegmentDescriptor> descriptors;
private final List<DataServerRequestDescriptor> servedSegments;
@JsonCreator
public SegmentsInputSlice(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<RichSegmentDescriptor> descriptors
@JsonProperty("segments") List<RichSegmentDescriptor> descriptors,
@JsonProperty("servedSegments") List<DataServerRequestDescriptor> servedSegments
)
{
this.dataSource = dataSource;
this.descriptors = descriptors;
this.servedSegments = servedSegments;
}
@JsonProperty
@ -67,6 +70,12 @@ public class SegmentsInputSlice implements InputSlice
return descriptors;
}
@JsonProperty("servedSegments")
public List<DataServerRequestDescriptor> getServedSegments()
{
return servedSegments;
}
@Override
public int fileCount()
{
@ -83,13 +92,16 @@ public class SegmentsInputSlice implements InputSlice
return false;
}
SegmentsInputSlice that = (SegmentsInputSlice) o;
return Objects.equals(dataSource, that.dataSource) && Objects.equals(descriptors, that.descriptors);
return Objects.equals(dataSource, that.dataSource) && Objects.equals(
descriptors,
that.descriptors
) && Objects.equals(servedSegments, that.servedSegments);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, descriptors);
return Objects.hash(dataSource, descriptors, servedSegments);
}
@Override
@ -98,6 +110,7 @@ public class SegmentsInputSlice implements InputSlice
return "SegmentsInputSlice{" +
"dataSource='" + dataSource + '\'' +
", descriptors=" + descriptors +
", servedSegments=" + servedSegments +
'}';
}
}

View File

@ -23,7 +23,8 @@ import com.google.common.collect.Iterators;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
@ -42,13 +43,13 @@ import java.util.function.Consumer;
public class SegmentsInputSliceReader implements InputSliceReader
{
private final DataSegmentProvider dataSegmentProvider;
private final LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory;
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
private final boolean isReindex;
public SegmentsInputSliceReader(final FrameContext frameContext, final boolean isReindex)
{
this.dataSegmentProvider = frameContext.dataSegmentProvider();
this.loadedSegmentDataProviderFactory = frameContext.loadedSegmentDataProviderFactory();
this.dataServerQueryHandlerFactory = frameContext.dataServerQueryHandlerFactory();
this.isReindex = isReindex;
}
@ -56,7 +57,7 @@ public class SegmentsInputSliceReader implements InputSliceReader
public int numReadableInputs(InputSlice slice)
{
final SegmentsInputSlice segmentsInputSlice = (SegmentsInputSlice) slice;
return segmentsInputSlice.getDescriptors().size();
return segmentsInputSlice.getDescriptors().size() + segmentsInputSlice.getServedSegments().size();
}
@Override
@ -69,16 +70,23 @@ public class SegmentsInputSliceReader implements InputSliceReader
{
final SegmentsInputSlice segmentsInputSlice = (SegmentsInputSlice) slice;
return ReadableInputs.segments(
() -> Iterators.transform(
Iterator<ReadableInput> segmentIterator =
Iterators.transform(
dataSegmentIterator(
segmentsInputSlice.getDataSource(),
segmentsInputSlice.getDescriptors(),
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
),
ReadableInput::segment
)
);
), ReadableInput::segment);
Iterator<ReadableInput> dataServerIterator =
Iterators.transform(
dataServerIterator(
segmentsInputSlice.getDataSource(),
segmentsInputSlice.getServedSegments(),
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount())
), ReadableInput::dataServerQuery);
return ReadableInputs.segments(() -> Iterators.concat(dataServerIterator, segmentIterator));
}
private Iterator<SegmentWithDescriptor> dataSegmentIterator(
@ -98,10 +106,24 @@ public class SegmentsInputSliceReader implements InputSliceReader
return new SegmentWithDescriptor(
dataSegmentProvider.fetchSegment(segmentId, channelCounters, isReindex),
descriptor.isLoadedOnServer() ? loadedSegmentDataProviderFactory.createLoadedSegmentDataProvider(dataSource, channelCounters) : null,
descriptor
);
}
).iterator();
}
private Iterator<DataServerQueryHandler> dataServerIterator(
final String dataSource,
final List<DataServerRequestDescriptor> servedSegments,
final ChannelCounters channelCounters
)
{
return servedSegments.stream().map(
dataServerRequestDescriptor -> dataServerQueryHandlerFactory.createDataServerQueryHandler(
dataSource,
channelCounters,
dataServerRequestDescriptor
)
).iterator();
}
}

View File

@ -20,6 +20,8 @@
package org.apache.druid.msq.input.table;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
@ -27,6 +29,7 @@ import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils;
import org.apache.druid.msq.querykit.DataSegmentTimelineView;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.joda.time.Interval;
@ -34,9 +37,12 @@ import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
@ -61,10 +67,24 @@ public class TableInputSpecSlicer implements InputSpecSlicer
public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final List<List<DataSegmentWithInterval>> assignments =
final List<WeightedInputInstance> prunedPublishedSegments = new ArrayList<>();
final List<DataSegmentWithInterval> prunedServedSegments = new ArrayList<>();
for (DataSegmentWithInterval dataSegmentWithInterval : getPrunedSegmentSet(tableInputSpec)) {
if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
prunedServedSegments.add(dataSegmentWithInterval);
} else {
prunedPublishedSegments.add(dataSegmentWithInterval);
}
}
final List<WeightedInputInstance> groupedServedSegments = createWeightedSegmentSet(prunedServedSegments);
final List<List<WeightedInputInstance>> assignments =
SlicerUtils.makeSlicesStatic(
getPrunedSegmentSet(tableInputSpec).iterator(),
segment -> segment.getSegment().getSize(),
Iterators.concat(groupedServedSegments.iterator(), prunedPublishedSegments.iterator()),
WeightedInputInstance::getWeight,
maxNumSlices
);
return makeSlices(tableInputSpec, assignments);
@ -79,10 +99,25 @@ public class TableInputSpecSlicer implements InputSpecSlicer
)
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final List<List<DataSegmentWithInterval>> assignments =
final List<WeightedInputInstance> prunedSegments = new ArrayList<>();
final List<DataSegmentWithInterval> prunedServedSegments = new ArrayList<>();
for (DataSegmentWithInterval dataSegmentWithInterval : getPrunedSegmentSet(tableInputSpec)) {
if (dataSegmentWithInterval.segment instanceof DataSegmentWithLocation) {
prunedServedSegments.add(dataSegmentWithInterval);
} else {
prunedSegments.add(dataSegmentWithInterval);
}
}
List<WeightedInputInstance> groupedServedSegments = createWeightedSegmentSet(prunedServedSegments);
prunedSegments.addAll(groupedServedSegments);
final List<List<WeightedInputInstance>> assignments =
SlicerUtils.makeSlicesDynamic(
getPrunedSegmentSet(tableInputSpec).iterator(),
segment -> segment.getSegment().getSize(),
prunedSegments.iterator(),
WeightedInputInstance::getWeight,
maxNumSlices,
maxFilesPerSlice,
maxBytesPerSlice
@ -126,28 +161,75 @@ public class TableInputSpecSlicer implements InputSpecSlicer
private static List<InputSlice> makeSlices(
final TableInputSpec tableInputSpec,
final List<List<DataSegmentWithInterval>> assignments
final List<List<WeightedInputInstance>> assignments
)
{
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
for (final List<DataSegmentWithInterval> assignment : assignments) {
for (final List<WeightedInputInstance> assignment : assignments) {
final List<RichSegmentDescriptor> descriptors = new ArrayList<>();
for (final DataSegmentWithInterval dataSegmentWithInterval : assignment) {
descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
final List<DataServerRequestDescriptor> dataServerRequests = new ArrayList<>();
for (final WeightedInputInstance weightedSegment : assignment) {
if (weightedSegment instanceof DataSegmentWithInterval) {
DataSegmentWithInterval dataSegmentWithInterval = (DataSegmentWithInterval) weightedSegment;
descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
} else {
DataServerRequest serverRequest = (DataServerRequest) weightedSegment;
dataServerRequests.add(serverRequest.toDataServerRequestDescriptor());
}
}
if (descriptors.isEmpty()) {
if (descriptors.isEmpty() && dataServerRequests.isEmpty()) {
retVal.add(NilInputSlice.INSTANCE);
} else {
retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(), descriptors));
retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(), descriptors, dataServerRequests));
}
}
return retVal;
}
private static class DataSegmentWithInterval
/**
* Creates a list of {@link WeightedInputInstance} from the prunedServedSegments parameter.
* The function selects a data server from the servers hosting the segment, and then groups segments on the basis of
* data servers.
* The returned value is a list of {@link WeightedInputInstance}, each of which denotes either a {@link DataSegmentWithInterval},
* in the case of a segment or a {@link DataServerRequest} for a request to a data server. A data server request fetches
* the results of all relevent segments from the data server.
*/
private static List<WeightedInputInstance> createWeightedSegmentSet(List<DataSegmentWithInterval> prunedServedSegments)
{
// Create a map of server to segment for loaded segments.
final Map<DruidServerMetadata, Set<DataSegmentWithInterval>> serverVsSegmentsMap = new HashMap<>();
for (DataSegmentWithInterval dataSegmentWithInterval : prunedServedSegments) {
DataSegmentWithLocation segmentWithLocation = (DataSegmentWithLocation) dataSegmentWithInterval.segment;
// Choose a server out of the ones available.
DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(segmentWithLocation.getServers());
serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>());
serverVsSegmentsMap.get(druidServerMetadata).add(dataSegmentWithInterval);
}
List<WeightedInputInstance> retVal = new ArrayList<>();
for (Map.Entry<DruidServerMetadata, Set<DataSegmentWithInterval>> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) {
DataServerRequest dataServerRequest = new DataServerRequest(
druidServerMetadataSetEntry.getKey(),
ImmutableList.copyOf(druidServerMetadataSetEntry.getValue())
);
retVal.add(dataServerRequest);
}
return retVal;
}
private interface WeightedInputInstance
{
long getWeight();
}
private static class DataSegmentWithInterval implements WeightedInputInstance
{
private final DataSegment segment;
private final Interval interval;
@ -169,9 +251,42 @@ public class TableInputSpecSlicer implements InputSpecSlicer
segment.getInterval(),
interval,
segment.getVersion(),
segment.getShardSpec().getPartitionNum(),
segment instanceof DataSegmentWithLocation ? ((DataSegmentWithLocation) segment).getServers() : null
segment.getShardSpec().getPartitionNum()
);
}
@Override
public long getWeight()
{
return segment.getSize();
}
}
private static class DataServerRequest implements WeightedInputInstance
{
private static final long DATA_SERVER_WEIGHT_ESTIMATION = 5000L;
private final List<DataSegmentWithInterval> segments;
private final DruidServerMetadata serverMetadata;
public DataServerRequest(DruidServerMetadata serverMetadata, List<DataSegmentWithInterval> segments)
{
this.segments = Preconditions.checkNotNull(segments, "segments");
this.serverMetadata = Preconditions.checkNotNull(serverMetadata, "server");
}
@Override
public long getWeight()
{
// Estimate the size of a realtime segment as DATA_SERVER_WEIGHT_ESTIMATION, since we don't store accurate row count in
// the coordinator.
return segments.size() * DATA_SERVER_WEIGHT_ESTIMATION;
}
public DataServerRequestDescriptor toDataServerRequestDescriptor()
{
return new DataServerRequestDescriptor(
serverMetadata,
segments.stream().map(DataSegmentWithInterval::toRichSegmentDescriptor).collect(Collectors.toList()));
}
}
}

View File

@ -20,7 +20,7 @@
package org.apache.druid.msq.kernel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.query.groupby.GroupingEngine;
@ -44,7 +44,8 @@ public interface FrameContext
RowIngestionMeters rowIngestionMeters();
DataSegmentProvider dataSegmentProvider();
LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory();
DataServerQueryHandlerFactory dataServerQueryHandlerFactory();
File tempDir();

View File

@ -29,8 +29,10 @@ import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
@ -63,7 +65,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Object>
@Override
public List<ReadableFrameChannel> inputChannels()
{
if (baseInput.hasSegment()) {
if (baseInput.hasSegment() || baseInput.hasDataServerQuery()) {
return Collections.emptyList();
} else {
return Collections.singletonList(baseInput.getChannel());
@ -79,21 +81,19 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Object>
@Override
public ReturnOrAwait<Object> runIncrementally(final IntSet readableInputs) throws IOException
{
final ReturnOrAwait<Unit> retVal;
//noinspection rawtypes
final ReturnOrAwait retVal;
if (baseInput.hasSegment()) {
SegmentWithDescriptor segment = baseInput.getSegment();
if (segment.getDescriptor().isLoadedOnServer()) {
retVal = runWithLoadedSegment(baseInput.getSegment());
} else {
retVal = runWithSegment(baseInput.getSegment());
}
retVal = runWithSegment(baseInput.getSegment());
} else if (baseInput.hasDataServerQuery()) {
retVal = runWithDataServerQuery(baseInput.getDataServerQuery());
} else {
retVal = runWithInputChannel(baseInput.getChannel(), baseInput.getChannelFrameReader());
}
//noinspection rawtypes,unchecked
return (ReturnOrAwait) retVal;
//noinspection rawtype,unchecked
return retVal;
}
@Override
@ -109,8 +109,17 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Object>
return frameWriterFactoryHolder.get();
}
/**
* Runs the leaf processor using a segment described by the {@link SegmentWithDescriptor} as the input. This may result
* in calls to fetch the segment from an external source.
*/
protected abstract ReturnOrAwait<Unit> runWithSegment(SegmentWithDescriptor segment) throws IOException;
protected abstract ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor segment) throws IOException;
/**
* Runs the leaf processor using the results from a data server as the input. The query and data server details are
* described by {@link DataServerQueryHandler}.
*/
protected abstract ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(DataServerQueryHandler dataServerQueryHandler) throws IOException;
protected abstract ReturnOrAwait<Unit> runWithInputChannel(
ReadableFrameChannel inputChannel,

View File

@ -19,11 +19,13 @@
package org.apache.druid.msq.querykit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.channel.ReadableConcatFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
@ -42,11 +44,13 @@ import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.input.external.ExternalInputSlice;
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.ProcessorsAndChannels;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.query.Query;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@ -125,14 +129,6 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa
);
}
// Read all base inputs in separate processors, one per processor.
final Iterable<ReadableInput> processorBaseInputs = readBaseInputs(
stageDefinition,
inputSlices,
inputSliceReader,
counters,
warningPublisher
);
// SegmentMapFn processor, if needed. May be null.
final FrameProcessor<Function<SegmentReference, SegmentReference>> segmentMapFnProcessor =
@ -146,16 +142,21 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa
);
// Function to generate a processor manger for the regular processors, which run after the segmentMapFnProcessor.
final Function<Function<SegmentReference, SegmentReference>, ProcessorManager<Object, Long>> processorManagerFn =
segmentMapFn ->
new BaseLeafFrameProcessorManager(
processorBaseInputs,
segmentMapFn,
frameWriterFactoryQueue,
channelQueue,
frameContext,
this
);
final Function<List<Function<SegmentReference, SegmentReference>>, ProcessorManager<Object, Long>> processorManagerFn = segmentMapFnList -> {
final Function<SegmentReference, SegmentReference> segmentMapFunction =
CollectionUtils.getOnlyElement(segmentMapFnList, throwable -> DruidException.defensive("Only one segment map function expected"));
return createBaseLeafProcessorManagerWithHandoff(
stageDefinition,
inputSlices,
inputSliceReader,
counters,
warningPublisher,
segmentMapFunction,
frameWriterFactoryQueue,
channelQueue,
frameContext
);
};
//noinspection rawtypes
final ProcessorManager processorManager;
@ -163,15 +164,71 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa
if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource().createSegmentMapFunction(query, new AtomicLong());
processorManager = processorManagerFn.apply(segmentMapFn);
processorManager = processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
processorManager = new ChainedProcessorManager<>(segmentMapFnProcessor, processorManagerFn);
processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(() -> segmentMapFnProcessor), processorManagerFn);
}
//noinspection unchecked,rawtypes
return new ProcessorsAndChannels<>(processorManager, OutputChannels.wrapReadOnly(outputChannels));
}
private ProcessorManager<Object, Long> createBaseLeafProcessorManagerWithHandoff(
final StageDefinition stageDefinition,
final List<InputSlice> inputSlices,
final InputSliceReader inputSliceReader,
final CounterTracker counters,
final Consumer<Throwable> warningPublisher,
final Function<SegmentReference, SegmentReference> segmentMapFunction,
final Queue<FrameWriterFactory> frameWriterFactoryQueue,
final Queue<WritableFrameChannel> channelQueue,
final FrameContext frameContext
)
{
final BaseLeafFrameProcessorFactory factory = this;
// Read all base inputs in separate processors, one per processor.
final Iterable<ReadableInput> processorBaseInputs = readBaseInputs(
stageDefinition,
inputSlices,
inputSliceReader,
counters,
warningPublisher
);
return new ChainedProcessorManager<>(
new BaseLeafFrameProcessorManager(
processorBaseInputs,
segmentMapFunction,
frameWriterFactoryQueue,
channelQueue,
frameContext,
factory
),
objects -> {
if (objects == null || objects.isEmpty()) {
return ProcessorManagers.none();
}
List<InputSlice> handedOffSegments = new ArrayList<>();
for (Object o : objects) {
if (o != null && o instanceof SegmentsInputSlice) {
SegmentsInputSlice slice = (SegmentsInputSlice) o;
handedOffSegments.add(slice);
}
}
// Fetch any handed off segments from deep storage.
return new BaseLeafFrameProcessorManager(
readBaseInputs(stageDefinition, handedOffSegments, inputSliceReader, counters, warningPublisher),
segmentMapFunction,
frameWriterFactoryQueue,
channelQueue,
frameContext,
factory
);
}
);
}
protected abstract FrameProcessor<Object> makeProcessor(
ReadableInput baseInput,
Function<SegmentReference, SegmentReference> segmentMapFn,

View File

@ -24,32 +24,34 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.manager.ProcessorAndCallback;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* Manager that chains processors: runs {@link #first} first, then based on its result, creates {@link #restFuture}
* using {@link #restFactory} and runs that next.
* Manager that chains processors: runs all processors generated by {@link #first} first, then based on its result,
* creates {@link #restFuture} using {@link #restFactory} and runs that next.
*/
public class ChainedProcessorManager<A, B, R> implements ProcessorManager<Object, R>
{
/**
* First processor. This one blocks all the others. The reference is set to null once the processor has been
* returned by the channel.
* First processor manager. FrameProcessors created by this runs before all the others.
* The reference is set to null once all the processors have been returned by the channel.
*/
@Nullable
private FrameProcessor<A> first;
private ProcessorManager<A, List<A>> first;
/**
* Produces {@link #restFuture}.
*/
private final Function<A, ProcessorManager<B, R>> restFactory;
private final Function<List<A>, ProcessorManager<B, R>> restFactory;
/**
* The rest of the processors. Produced by {@link #restFactory} once {@link #first} has completed.
@ -61,12 +63,25 @@ public class ChainedProcessorManager<A, B, R> implements ProcessorManager<Object
*/
private boolean closed;
private final List<A> firstProcessorResult = new CopyOnWriteArrayList<>();
private final AtomicInteger firstProcessorCount = new AtomicInteger(0);
public ChainedProcessorManager(
final FrameProcessor<A> first,
final Function<A, ProcessorManager<B, R>> restFactory
final ProcessorManager<A, ?> firstProcessor,
final Function<List<A>, ProcessorManager<B, R>> restFactory
)
{
this.first = Preconditions.checkNotNull(first, "first");
Preconditions.checkNotNull(firstProcessor, "first");
this.first = firstProcessor.withAccumulation(
firstProcessorResult,
(acc, a) -> {
acc.add(a);
checkFirstProcessorComplete();
return acc;
}
);
this.restFactory = Preconditions.checkNotNull(restFactory, "restFactory");
}
@ -76,22 +91,31 @@ public class ChainedProcessorManager<A, B, R> implements ProcessorManager<Object
if (closed) {
throw new IllegalStateException();
} else if (first != null) {
//noinspection unchecked
final FrameProcessor<Object> tmp = (FrameProcessor<Object>) first;
first = null;
return Futures.immediateFuture(Optional.of(new ProcessorAndCallback<>(tmp, this::onFirstProcessorComplete)));
} else {
return FutureUtils.transformAsync(
restFuture,
rest -> (ListenableFuture) rest.next()
);
Optional<ProcessorAndCallback<A>> processorAndCallbackOptional = Futures.getUnchecked(first.next());
if (processorAndCallbackOptional.isPresent()) {
// More processors left to run.
firstProcessorCount.incrementAndGet();
ProcessorAndCallback<A> aProcessorAndCallback = processorAndCallbackOptional.get();
//noinspection unchecked
return Futures.immediateFuture(Optional.of((ProcessorAndCallback<Object>) aProcessorAndCallback));
} else {
first = null;
checkFirstProcessorComplete();
}
}
//noinspection unchecked
return FutureUtils.transformAsync(
restFuture,
rest -> (ListenableFuture) rest.next()
);
}
private void onFirstProcessorComplete(final Object firstResult)
private synchronized void checkFirstProcessorComplete()
{
//noinspection unchecked
restFuture.set(restFactory.apply((A) firstResult));
if (first == null && (firstProcessorResult.size() == firstProcessorCount.get())) {
restFuture.set(restFactory.apply(firstProcessorResult));
}
}
@Override
@ -106,7 +130,7 @@ public class ChainedProcessorManager<A, B, R> implements ProcessorManager<Object
if (!closed) {
closed = true;
CloseableUtils.closeAndWrapExceptions(() -> CloseableUtils.closeAll(
first != null ? first::cleanup : null,
first != null ? first : null,
restFuture.isDone() ? FutureUtils.getUnchecked(restFuture, false) : null
));
}

View File

@ -33,16 +33,18 @@ import org.apache.druid.frame.segment.FrameSegment;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.LoadedSegmentDataProvider;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.DataServerQueryResult;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupingEngine;
@ -57,6 +59,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
/**
@ -74,6 +77,8 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
private Yielder<ResultRow> resultYielder;
private FrameWriter frameWriter;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private SegmentsInputSlice handedOffSegments = null;
private Yielder<Yielder<ResultRow>> currentResultsYielder;
public GroupByPreShuffleFrameProcessor(
final GroupByQuery query,
@ -100,23 +105,38 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
}
@Override
protected ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor segment) throws IOException
protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(DataServerQueryHandler dataServerQueryHandler) throws IOException
{
if (resultYielder == null) {
Pair<LoadedSegmentDataProvider.DataServerQueryStatus, Yielder<ResultRow>> statusSequencePair =
segment.fetchRowsFromDataServer(groupingEngine.prepareGroupByQuery(query), Function.identity(), closer);
if (LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF.equals(statusSequencePair.lhs)) {
log.info("Segment[%s] was handed off, falling back to fetching the segment from deep storage.",
segment.getDescriptor());
return runWithSegment(segment);
if (resultYielder == null || resultYielder.isDone()) {
if (currentResultsYielder == null) {
final DataServerQueryResult<ResultRow> dataServerQueryResult =
dataServerQueryHandler.fetchRowsFromDataServer(
groupingEngine.prepareGroupByQuery(query),
Function.identity(),
closer
);
handedOffSegments = dataServerQueryResult.getHandedOffSegments();
if (!handedOffSegments.getDescriptors().isEmpty()) {
log.info(
"Query to dataserver for segments found [%d] handed off segments",
handedOffSegments.getDescriptors().size()
);
}
List<Yielder<ResultRow>> yielders = dataServerQueryResult.getResultsYielders();
currentResultsYielder = Yielders.each(Sequences.simple(yielders));
}
if (currentResultsYielder.isDone()) {
return ReturnOrAwait.returnObject(handedOffSegments);
} else {
resultYielder = currentResultsYielder.get();
currentResultsYielder = currentResultsYielder.next(null);
}
resultYielder = statusSequencePair.rhs;
}
populateFrameWriterAndFlushIfNeeded();
if (resultYielder == null || resultYielder.isDone()) {
return ReturnOrAwait.returnObject(Unit.instance());
if ((resultYielder == null || resultYielder.isDone()) && currentResultsYielder.isDone()) {
return ReturnOrAwait.returnObject(handedOffSegments);
} else {
return ReturnOrAwait.runAgain();
}

View File

@ -50,11 +50,13 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.LoadedSegmentDataProvider;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.DataServerQueryResult;
import org.apache.druid.msq.input.ParseExceptionUtils;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.external.ExternalSegment;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.query.Druids;
@ -86,6 +88,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A {@link FrameProcessor} that reads one {@link Frame} at a time from a particular segment, writes them
@ -106,6 +109,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE);
private FrameWriter frameWriter;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private SegmentsInputSlice handedOffSegments = null;
public ScanQueryFrameProcessor(
final ScanQuery query,
@ -192,39 +196,45 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
}
@Override
protected ReturnOrAwait<Unit> runWithLoadedSegment(final SegmentWithDescriptor segment) throws IOException
protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final DataServerQueryHandler dataServerQueryHandler) throws IOException
{
if (cursor == null) {
ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
final Pair<LoadedSegmentDataProvider.DataServerQueryStatus, Yielder<Object[]>> statusSequencePair =
segment.fetchRowsFromDataServer(
final DataServerQueryResult<Object[]> dataServerQueryResult =
dataServerQueryHandler.fetchRowsFromDataServer(
preparedQuery,
ScanQueryFrameProcessor::mappingFunction,
closer
);
if (LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF.equals(statusSequencePair.lhs)) {
log.info("Segment[%s] was handed off, falling back to fetching the segment from deep storage.",
segment.getDescriptor());
return runWithSegment(segment);
handedOffSegments = dataServerQueryResult.getHandedOffSegments();
if (!handedOffSegments.getDescriptors().isEmpty()) {
log.info(
"Query to dataserver for segments found [%d] handed off segments",
handedOffSegments.getDescriptors().size()
);
}
RowSignature rowSignature = ScanQueryKit.getAndValidateSignature(preparedQuery, jsonMapper);
Pair<Cursor, Closeable> cursorFromIterable = IterableRowsCursorHelper.getCursorFromYielder(
statusSequencePair.rhs,
rowSignature
);
List<Cursor> cursors = dataServerQueryResult.getResultsYielders().stream().map(yielder -> {
Pair<Cursor, Closeable> cursorFromIterable = IterableRowsCursorHelper.getCursorFromYielder(
yielder,
rowSignature
);
closer.register(cursorFromIterable.rhs);
return cursorFromIterable.lhs;
}).collect(Collectors.toList());
closer.register(cursorFromIterable.rhs);
final Yielder<Cursor> cursorYielder = Yielders.each(Sequences.simple(ImmutableList.of(cursorFromIterable.lhs)));
final Yielder<Cursor> cursorYielder = Yielders.each(Sequences.simple(cursors));
if (cursorYielder.isDone()) {
// No cursors!
cursorYielder.close();
return ReturnOrAwait.returnObject(Unit.instance());
return ReturnOrAwait.returnObject(handedOffSegments);
} else {
final long rowsFlushed = setNextCursor(cursorYielder.get(), null);
assert rowsFlushed == 0; // There's only ever one cursor when running with a segment
closer.register(cursorYielder);
if (rowsFlushed > 0) {
return ReturnOrAwait.runAgain();
}
}
}
@ -235,7 +245,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
}
if (cursor.isDone() && (frameWriter == null || frameWriter.getNumRows() == 0)) {
return ReturnOrAwait.returnObject(Unit.instance());
return ReturnOrAwait.returnObject(handedOffSegments);
} else {
return ReturnOrAwait.runAgain();
}

View File

@ -23,27 +23,29 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.discovery.DataServerClient;
import org.apache.druid.discovery.DruidServiceTestUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.querykit.InputNumberDataSource;
import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
@ -51,17 +53,20 @@ import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.rpc.RpcException;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.IOException;
import java.util.List;
import static org.apache.druid.msq.exec.DataServerQueryHandler.toSegmentDescriptorWithFullInterval;
import static org.apache.druid.query.Druids.newScanQueryBuilder;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@ -73,7 +78,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class LoadedSegmentDataProviderTest
public class DataServerQueryHandlerTest
{
private static final String DATASOURCE1 = "dataSource1";
private static final DruidServerMetadata DRUID_SERVER_1 = new DruidServerMetadata(
@ -85,25 +90,79 @@ public class LoadedSegmentDataProviderTest
"tier1",
0
);
private static final DruidServerMetadata DRUID_SERVER_2 = new DruidServerMetadata(
"name2",
"host2:5050",
null,
100L,
ServerType.REALTIME,
"tier1",
0
);
private static final RichSegmentDescriptor SEGMENT_1 = new RichSegmentDescriptor(
Intervals.of("2003/2004"),
Intervals.of("2003/2004"),
"v1",
1,
ImmutableSet.of(DRUID_SERVER_1)
0
);
private DataServerClient dataServerClient;
private static final RichSegmentDescriptor SEGMENT_2 = new RichSegmentDescriptor(
Intervals.of("2004/2005"),
Intervals.of("2004/2005"),
"v1",
0
);
private DataServerClient dataServerClient1;
private DataServerClient dataServerClient2;
private CoordinatorClient coordinatorClient;
private ScanResultValue scanResultValue;
private ScanQuery query;
private LoadedSegmentDataProvider target;
private DataServerQueryHandler target;
@Before
public void setUp()
{
dataServerClient = mock(DataServerClient.class);
dataServerClient1 = mock(DataServerClient.class);
dataServerClient2 = mock(DataServerClient.class);
coordinatorClient = mock(CoordinatorClient.class);
scanResultValue = new ScanResultValue(
query = newScanQueryBuilder()
.dataSource(new InputNumberDataSource(1))
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003/2004"))))
.columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 1, MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, SegmentSource.REALTIME.toString()))
.build();
QueryToolChestWarehouse queryToolChestWarehouse = new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(ScanQuery.class, new ScanQueryQueryToolChest(null, null))
.build()
);
target = spy(
new DataServerQueryHandler(
DATASOURCE1,
new ChannelCounters(),
mock(ServiceClientFactory.class),
coordinatorClient,
DruidServiceTestUtils.newJsonMapper(),
queryToolChestWarehouse,
Execs.scheduledSingleThreaded("query-cancellation-executor"),
new DataServerRequestDescriptor(DRUID_SERVER_1, ImmutableList.of(SEGMENT_1, SEGMENT_2))
)
);
doAnswer(invocationOnMock -> {
ServiceLocation serviceLocation = invocationOnMock.getArgument(0);
if (ServiceLocation.fromDruidServerMetadata(DRUID_SERVER_1).equals(serviceLocation)) {
return dataServerClient1;
} else if (ServiceLocation.fromDruidServerMetadata(DRUID_SERVER_2).equals(serviceLocation)) {
return dataServerClient2;
} else {
throw new IllegalStateException();
}
}).when(target).makeDataServerClient(any());
}
@Test
public void testFetchRowsFromServer()
{
ScanResultValue scanResultValue = new ScanResultValue(
null,
ImmutableList.of(),
ImmutableList.of(
@ -112,47 +171,18 @@ public class LoadedSegmentDataProviderTest
ImmutableList.of("xyz", "789")
)
);
query = newScanQueryBuilder()
.dataSource(new InputNumberDataSource(1))
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003/2004"))))
.columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 1))
.build();
QueryToolChestWarehouse queryToolChestWarehouse = new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(ScanQuery.class, new ScanQueryQueryToolChest(null, null))
.build()
);
target = spy(
new LoadedSegmentDataProvider(
DATASOURCE1,
new ChannelCounters(),
mock(ServiceClientFactory.class),
coordinatorClient,
DruidServiceTestUtils.newJsonMapper(),
queryToolChestWarehouse,
Execs.scheduledSingleThreaded("query-cancellation-executor")
)
);
doReturn(dataServerClient).when(target).makeDataServerClient(any());
}
@Test
public void testFetchRowsFromServer() throws IOException
{
doReturn(Sequences.simple(ImmutableList.of(scanResultValue))).when(dataServerClient).run(any(), any(), any(), any());
doReturn(Sequences.simple(ImmutableList.of(scanResultValue))).when(dataServerClient1).run(any(), any(), any(), any());
Pair<LoadedSegmentDataProvider.DataServerQueryStatus, Yielder<Object[]>> dataServerQueryStatusYielderPair = target.fetchRowsFromDataServer(
DataServerQueryResult<Object[]> dataServerQueryResult = target.fetchRowsFromDataServer(
query,
SEGMENT_1,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
);
Assert.assertEquals(LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS, dataServerQueryStatusYielderPair.lhs);
Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty());
List<List<Object>> events = (List<List<Object>>) scanResultValue.getEvents();
Yielder<Object[]> yielder = dataServerQueryStatusYielderPair.rhs;
Yielder<Object[]> yielder = dataServerQueryResult.getResultsYielders().get(0);
events.forEach(
event -> {
Assert.assertArrayEquals(event.toArray(), yielder.get());
@ -162,24 +192,100 @@ public class LoadedSegmentDataProviderTest
}
@Test
public void testHandoff() throws IOException
public void testOneSegmentRelocated()
{
ScanResultValue scanResultValue1 = new ScanResultValue(
null,
ImmutableList.of(),
ImmutableList.of(
ImmutableList.of("abc", "123"),
ImmutableList.of("ghi", "456")
)
);
doAnswer(invocation -> {
ResponseContext responseContext = invocation.getArgument(1);
responseContext.addMissingSegments(ImmutableList.of(SEGMENT_1));
return Sequences.empty();
}).when(dataServerClient).run(any(), any(), any(), any());
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1);
responseContext.addMissingSegments(
ImmutableList.of(
toSegmentDescriptorWithFullInterval(SEGMENT_2)
)
);
return Sequences.simple(ImmutableList.of(scanResultValue1));
}).when(dataServerClient1).run(any(), any(), any(), any());
Pair<LoadedSegmentDataProvider.DataServerQueryStatus, Yielder<Object[]>> dataServerQueryStatusYielderPair = target.fetchRowsFromDataServer(
ScanResultValue scanResultValue2 = new ScanResultValue(
null,
ImmutableList.of(),
ImmutableList.of(
ImmutableList.of("pit", "579"),
ImmutableList.of("xyz", "897")
)
);
doReturn(Sequences.simple(ImmutableList.of(scanResultValue2))).when(dataServerClient2).run(any(), any(), any(), any());
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2));
doReturn(ImmutableList.of(
new ImmutableSegmentLoadInfo(
DataSegment.builder()
.interval(SEGMENT_2.getInterval())
.version(SEGMENT_2.getVersion())
.shardSpec(new NumberedShardSpec(SEGMENT_2.getPartitionNumber(), SEGMENT_2.getPartitionNumber()))
.dataSource(DATASOURCE1)
.size(1)
.build(),
ImmutableSet.of(DRUID_SERVER_2)
))).when(coordinatorClient).fetchServerViewSegments(DATASOURCE1, ImmutableList.of(SEGMENT_2.getFullInterval()));
DataServerQueryResult<Object[]> dataServerQueryResult = target.fetchRowsFromDataServer(
query,
SEGMENT_1,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
);
Assert.assertEquals(LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF, dataServerQueryStatusYielderPair.lhs);
Assert.assertNull(dataServerQueryStatusYielderPair.rhs);
Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty());
Yielder<Object[]> yielder1 = dataServerQueryResult.getResultsYielders().get(0);
((List<List<Object>>) scanResultValue1.getEvents()).forEach(
event -> {
Assert.assertArrayEquals(event.toArray(), yielder1.get());
yielder1.next(null);
}
);
Yielder<Object[]> yielder2 = dataServerQueryResult.getResultsYielders().get(1);
((List<List<Object>>) scanResultValue2.getEvents()).forEach(
event -> {
Assert.assertArrayEquals(event.toArray(), yielder2.get());
yielder2.next(null);
}
);
}
@Test
public void testHandoff()
{
doAnswer(invocation -> {
ResponseContext responseContext = invocation.getArgument(1);
responseContext.addMissingSegments(
ImmutableList.of(
toSegmentDescriptorWithFullInterval(SEGMENT_1),
toSegmentDescriptorWithFullInterval(SEGMENT_2)
)
);
return Sequences.empty();
}).when(dataServerClient1).run(any(), any(), any(), any());
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1));
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2));
DataServerQueryResult<Object[]> dataServerQueryResult = target.fetchRowsFromDataServer(
query,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
);
Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), dataServerQueryResult.getHandedOffSegments().getDescriptors());
Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty());
}
@Test
@ -187,58 +293,57 @@ public class LoadedSegmentDataProviderTest
{
doThrow(
new QueryInterruptedException(new RpcException("Could not connect to server"))
).when(dataServerClient).run(any(), any(), any(), any());
).when(dataServerClient1).run(any(), any(), any(), any());
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1);
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1));
ScanQuery queryWithRetry = query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 3));
Assert.assertThrows(DruidException.class, () ->
target.fetchRowsFromDataServer(
queryWithRetry,
SEGMENT_1,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
)
);
verify(dataServerClient, times(3)).run(any(), any(), any(), any());
verify(dataServerClient1, times(5)).run(any(), any(), any(), any());
}
@Test
public void testServerNotFoundButHandoffShouldReturnWithStatus() throws IOException
public void testServerNotFoundButHandoffShouldReturnWithStatus()
{
doThrow(
new QueryInterruptedException(new RpcException("Could not connect to server"))
).when(dataServerClient).run(any(), any(), any(), any());
).when(dataServerClient1).run(any(), any(), any(), any());
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1);
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1));
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2));
Pair<LoadedSegmentDataProvider.DataServerQueryStatus, Yielder<Object[]>> dataServerQueryStatusYielderPair = target.fetchRowsFromDataServer(
DataServerQueryResult<Object[]> dataServerQueryResult = target.fetchRowsFromDataServer(
query,
SEGMENT_1,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
);
Assert.assertEquals(LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF, dataServerQueryStatusYielderPair.lhs);
Assert.assertNull(dataServerQueryStatusYielderPair.rhs);
Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), dataServerQueryResult.getHandedOffSegments().getDescriptors());
Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty());
}
@Test
public void testQueryFail()
{
SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(SEGMENT_1);
doAnswer(invocation -> {
ResponseContext responseContext = invocation.getArgument(1);
responseContext.addMissingSegments(ImmutableList.of(SEGMENT_1));
responseContext.addMissingSegments(ImmutableList.of(segmentDescriptorWithFullInterval));
return Sequences.empty();
}).when(dataServerClient).run(any(), any(), any(), any());
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1);
}).when(dataServerClient1).run(any(), any(), any(), any());
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, segmentDescriptorWithFullInterval);
Assert.assertThrows(IOE.class, () ->
Assert.assertThrows(DruidException.class, () ->
target.fetchRowsFromDataServer(
query,
SEGMENT_1,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
)

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielders;
@ -55,7 +54,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
@ -96,7 +94,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
}
@Test
public void testSelectWithLoadedSegmentsOnFoo() throws IOException
public void testSelectWithLoadedSegmentsOnFoo()
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
@ -104,20 +102,20 @@ public class MSQLoadedSegmentTests extends MSQTestBase
.build();
doReturn(
Pair.of(
LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS,
Yielders.each(
Sequences.simple(
ImmutableList.of(
new Object[]{1L, "qwe"},
new Object[]{1L, "tyu"}
new DataServerQueryResult<>(
ImmutableList.of(
Yielders.each(
Sequences.simple(
ImmutableList.of(
new Object[]{1L, "qwe"},
new Object[]{1L, "tyu"}
)
)
)
)
)
)
.when(loadedSegmentDataProvider)
.fetchRowsFromDataServer(any(), any(), any(), any());
)),
ImmutableList.of(),
"foo"
)).when(dataServerQueryHandler)
.fetchRowsFromDataServer(any(), any(), any());
testSelectQuery()
.setSql("select cnt, dim1 from foo")
@ -139,10 +137,10 @@ public class MSQLoadedSegmentTests extends MSQTestBase
.setQueryContext(REALTIME_QUERY_CTX)
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, ""},
new Object[]{1L, "qwe"},
new Object[]{1L, "tyu"},
new Object[]{1L, ""},
new Object[]{1L, "10.1"},
new Object[]{1L, "tyu"},
new Object[]{1L, "2"},
new Object[]{1L, "1"},
new Object[]{1L, "def"},
@ -152,7 +150,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
}
@Test
public void testSelectWithLoadedSegmentsOnFooWithOrderBy() throws IOException
public void testSelectWithLoadedSegmentsOnFooWithOrderBy()
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
@ -164,22 +162,23 @@ public class MSQLoadedSegmentTests extends MSQTestBase
ScanQuery query = invocationOnMock.getArgument(0);
ScanQuery.verifyOrderByForNativeExecution(query);
Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit());
return Pair.of(
LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS,
Yielders.each(
Sequences.simple(
ImmutableList.of(
new Object[]{1L, "qwe"},
new Object[]{1L, "tyu"}
return new DataServerQueryResult<>(
ImmutableList.of(
Yielders.each(
Sequences.simple(
ImmutableList.of(
new Object[]{1L, "qwe"},
new Object[]{1L, "tyu"}
)
)
)
)
)),
ImmutableList.of(),
"foo"
);
}
)
.when(loadedSegmentDataProvider)
.fetchRowsFromDataServer(any(), any(), any(), any());
.when(dataServerQueryHandler)
.fetchRowsFromDataServer(any(), any(), any());
testSelectQuery()
.setSql("select cnt, dim1 from foo order by dim1")
@ -215,7 +214,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
}
@Test
public void testGroupByWithLoadedSegmentsOnFoo() throws IOException
public void testGroupByWithLoadedSegmentsOnFoo()
{
RowSignature rowSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
@ -223,18 +222,21 @@ public class MSQLoadedSegmentTests extends MSQTestBase
.build();
doReturn(
Pair.of(LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS,
new DataServerQueryResult<>(
ImmutableList.of(
Yielders.each(
Sequences.simple(
ImmutableList.of(
ResultRow.of(1L, 2L)
)
)
)
)),
ImmutableList.of(),
"foo"
)
)
.when(loadedSegmentDataProvider)
.fetchRowsFromDataServer(any(), any(), any(), any());
.when(dataServerQueryHandler)
.fetchRowsFromDataServer(any(), any(), any());
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo group by cnt")
@ -272,7 +274,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
}
@Test
public void testGroupByWithOnlyLoadedSegmentsOnFoo() throws IOException
public void testGroupByWithOnlyLoadedSegmentsOnFoo()
{
RowSignature rowSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
@ -280,13 +282,21 @@ public class MSQLoadedSegmentTests extends MSQTestBase
.build();
doReturn(
Pair.of(LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS,
new DataServerQueryResult<>(
ImmutableList.of(
Yielders.each(
Sequences.simple(
ImmutableList.of(
ResultRow.of(1L, 2L)))))
).when(loadedSegmentDataProvider)
.fetchRowsFromDataServer(any(), any(), any(), any());
ResultRow.of(1L, 2L)
)
)
)),
ImmutableList.of(),
"foo"
)
)
.when(dataServerQueryHandler)
.fetchRowsFromDataServer(any(), any(), any());
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP '2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 00:00:00') group by cnt")
@ -324,7 +334,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
}
@Test
public void testDataServerQueryFailedShouldFail() throws IOException
public void testDataServerQueryFailedShouldFail()
{
RowSignature rowSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
@ -334,8 +344,8 @@ public class MSQLoadedSegmentTests extends MSQTestBase
doThrow(
new ISE("Segment could not be found on data server, but segment was not handed off.")
)
.when(loadedSegmentDataProvider)
.fetchRowsFromDataServer(any(), any(), any(), any());
.when(dataServerQueryHandler)
.fetchRowsFromDataServer(any(), any(), any());
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP '2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 00:00:00') group by cnt")

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.input.table;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.junit.Assert;
import org.junit.Test;
public class DataServerRequestDescriptorTest
{
@Test
public void testSerde() throws JsonProcessingException
{
DataServerRequestDescriptor segment = new DataServerRequestDescriptor(
new DruidServerMetadata(
"testServer",
"localhost:8081",
null,
1,
ServerType.INDEXER_EXECUTOR,
"tier1", 2
),
ImmutableList.of(new RichSegmentDescriptor(Intervals.ETERNITY, Intervals.ETERNITY, "v1", 2))
);
final ObjectMapper mapper = TestHelper.makeJsonMapper()
.registerModules(new MSQIndexingModule().getJacksonModules());
Assert.assertEquals(
segment,
mapper.readValue(mapper.writeValueAsString(segment), DataServerRequestDescriptor.class)
);
}
}

View File

@ -20,28 +20,15 @@
package org.apache.druid.msq.input.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.junit.Assert;
import org.junit.Test;
public class RichSegmentDescriptorTest
{
private static final DruidServerMetadata DRUID_SERVER_1 = new DruidServerMetadata(
"name1",
"host1",
null,
100L,
ServerType.REALTIME,
"tier1",
0
);
@Test
public void testSerdeWithFullIntervalDifferentFromInterval() throws Exception
{
@ -50,8 +37,7 @@ public class RichSegmentDescriptorTest
Intervals.of("2000/2002"),
Intervals.of("2000/2001"),
"2",
3,
ImmutableSet.of(DRUID_SERVER_1)
3
);
Assert.assertEquals(
@ -68,8 +54,7 @@ public class RichSegmentDescriptorTest
Intervals.of("2000/2001"),
Intervals.of("2000/2001"),
"2",
3,
ImmutableSet.of(DRUID_SERVER_1)
3
);
Assert.assertEquals(
@ -86,8 +71,7 @@ public class RichSegmentDescriptorTest
Intervals.of("2000/2002"),
Intervals.of("2000/2001"),
"2",
3,
ImmutableSet.of(DRUID_SERVER_1)
3
);
Assert.assertEquals(

View File

@ -19,7 +19,6 @@
package org.apache.druid.msq.input.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
@ -29,8 +28,6 @@ public class SegmentWithDescriptorTest
public void testEquals()
{
EqualsVerifier.forClass(SegmentWithDescriptor.class)
.withPrefabValues(ObjectMapper.class, new ObjectMapper(), new ObjectMapper())
.withIgnoredFields("loadedSegmentDataProvider")
.usingGetClass()
.verify();
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.msq.input.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.guice.MSQIndexingModule;
@ -47,9 +46,12 @@ public class SegmentsInputSliceTest
Intervals.of("2000/P1M"),
Intervals.of("2000/P1M"),
"1",
0,
ImmutableSet.of(
new DruidServerMetadata(
0
)
),
ImmutableList.of(
new DataServerRequestDescriptor(
new DruidServerMetadata(
"name1",
"host1",
null,
@ -57,6 +59,13 @@ public class SegmentsInputSliceTest
ServerType.REALTIME,
"tier1",
0
),
ImmutableList.of(
new RichSegmentDescriptor(
Intervals.of("2002/P1M"),
Intervals.of("2002/P1M"),
"1",
0
)
)
)

View File

@ -147,31 +147,28 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
),
new RichSegmentDescriptor(
SEGMENT2.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
),
new RichSegmentDescriptor(
SEGMENT1.getInterval(),
Intervals.of("2000-06-01/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
),
new RichSegmentDescriptor(
SEGMENT2.getInterval(),
Intervals.of("2000-06-01/P1M"),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceStatic(spec, 1)
@ -213,10 +210,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
SEGMENT1.getInterval(),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
),
NilInputSlice.INSTANCE
),
@ -243,17 +240,16 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
SEGMENT1.getInterval(),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
),
new RichSegmentDescriptor(
SEGMENT2.getInterval(),
SEGMENT2.getInterval(),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceStatic(spec, 1)
@ -282,10 +278,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
),
new SegmentsInputSlice(
DATASOURCE,
@ -294,10 +290,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
Intervals.of("2000-06-01/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceStatic(spec, 2)
@ -317,17 +313,16 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
SEGMENT1.getInterval(),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
),
new RichSegmentDescriptor(
SEGMENT2.getInterval(),
SEGMENT2.getInterval(),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceStatic(spec, 1)
@ -347,10 +342,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
SEGMENT1.getInterval(),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
),
new SegmentsInputSlice(
DATASOURCE,
@ -359,10 +354,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT2.getInterval(),
SEGMENT2.getInterval(),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceStatic(spec, 2)
@ -382,10 +377,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
SEGMENT1.getInterval(),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
),
new SegmentsInputSlice(
DATASOURCE,
@ -394,10 +389,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT2.getInterval(),
SEGMENT2.getInterval(),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
),
NilInputSlice.INSTANCE
),
@ -440,17 +435,16 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
),
new RichSegmentDescriptor(
SEGMENT2.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceDynamic(spec, 1, 1, 1)
@ -476,17 +470,16 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
),
new RichSegmentDescriptor(
SEGMENT2.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceDynamic(spec, 100, 5, BYTES_PER_SEGMENT * 5)
@ -512,10 +505,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
),
new SegmentsInputSlice(
DATASOURCE,
@ -524,10 +517,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT2.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceDynamic(spec, 100, 1, BYTES_PER_SEGMENT * 5)
@ -553,10 +546,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT1.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT1.getVersion(),
SEGMENT1.getShardSpec().getPartitionNum(),
null
SEGMENT1.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
),
new SegmentsInputSlice(
DATASOURCE,
@ -565,10 +558,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
SEGMENT2.getInterval(),
Intervals.of("2000/P1M"),
SEGMENT2.getVersion(),
SEGMENT2.getShardSpec().getPartitionNum(),
null
SEGMENT2.getShardSpec().getPartitionNum()
)
)
),
ImmutableList.of()
)
),
slicer.sliceDynamic(spec, 100, 5, BYTES_PER_SEGMENT)

View File

@ -41,8 +41,8 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.exec.LoadedSegmentDataProvider;
import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.querykit.DataSegmentProvider;
@ -175,7 +175,7 @@ public class CalciteMSQTestsHelper
binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer());
binder.bind(DataSegmentProvider.class)
.toInstance((segmentId, channelCounters, isReindex) -> getSupplierForSegment(segmentId));
binder.bind(LoadedSegmentDataProviderFactory.class).toInstance(getTestLoadedSegmentDataProviderFactory());
binder.bind(DataServerQueryHandlerFactory.class).toInstance(getTestDataServerQueryHandlerFactory());
GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
GroupingEngine groupingEngine = GroupByQueryRunnerTest.makeQueryRunnerFactory(
@ -193,21 +193,16 @@ public class CalciteMSQTestsHelper
);
}
private static LoadedSegmentDataProviderFactory getTestLoadedSegmentDataProviderFactory()
private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactory()
{
// Currently, there is no metadata in this test for loaded segments. Therefore, this should not be called.
// In the future, if this needs to be supported, mocks for LoadedSegmentDataProvider should be added like
// In the future, if this needs to be supported, mocks for DataServerQueryHandler should be added like
// org.apache.druid.msq.exec.MSQLoadedSegmentTests.
LoadedSegmentDataProviderFactory mockFactory = Mockito.mock(LoadedSegmentDataProviderFactory.class);
LoadedSegmentDataProvider loadedSegmentDataProvider = Mockito.mock(LoadedSegmentDataProvider.class);
try {
doThrow(new AssertionError("Test does not support loaded segment query"))
.when(loadedSegmentDataProvider).fetchRowsFromDataServer(any(), any(), any(), any());
doReturn(loadedSegmentDataProvider).when(mockFactory).createLoadedSegmentDataProvider(anyString(), any());
}
catch (IOException e) {
throw new RuntimeException(e);
}
DataServerQueryHandlerFactory mockFactory = Mockito.mock(DataServerQueryHandlerFactory.class);
DataServerQueryHandler dataServerQueryHandler = Mockito.mock(DataServerQueryHandler.class);
doThrow(new AssertionError("Test does not support loaded segment query"))
.when(dataServerQueryHandler).fetchRowsFromDataServer(any(), any(), any());
doReturn(dataServerQueryHandler).when(mockFactory).createDataServerQueryHandler(anyString(), any(), any());
return mockFactory;
}

View File

@ -84,8 +84,8 @@ import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.LoadedSegmentDataProvider;
import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.guice.MSQDurableStorageModule;
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
@ -317,7 +317,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
// Contains the metadata of loaded segments
protected List<ImmutableSegmentLoadInfo> loadedSegmentsMetadata = new ArrayList<>();
// Mocks the return of data from data servers
protected LoadedSegmentDataProvider loadedSegmentDataProvider = mock(LoadedSegmentDataProvider.class);
protected DataServerQueryHandler dataServerQueryHandler = mock(DataServerQueryHandler.class);
private MSQTestSegmentManager segmentManager;
private SegmentCacheManager segmentCacheManager;
@ -446,7 +446,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool")));
binder.bind(DataSegmentProvider.class)
.toInstance((segmentId, channelCounters, isReindex) -> getSupplierForSegment(segmentId));
binder.bind(LoadedSegmentDataProviderFactory.class).toInstance(getTestLoadedSegmentDataProviderFactory());
binder.bind(DataServerQueryHandlerFactory.class).toInstance(getTestDataServerQueryHandlerFactory());
binder.bind(IndexIO.class).toInstance(indexIO);
binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker());
@ -610,12 +610,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return array;
}
private LoadedSegmentDataProviderFactory getTestLoadedSegmentDataProviderFactory()
private DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactory()
{
LoadedSegmentDataProviderFactory mockFactory = Mockito.mock(LoadedSegmentDataProviderFactory.class);
doReturn(loadedSegmentDataProvider)
DataServerQueryHandlerFactory mockFactory = Mockito.mock(DataServerQueryHandlerFactory.class);
doReturn(dataServerQueryHandler)
.when(mockFactory)
.createLoadedSegmentDataProvider(anyString(), any());
.createDataServerQueryHandler(anyString(), any(), any());
return mockFactory;
}

View File

@ -29,7 +29,7 @@ import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerClient;
import org.apache.druid.msq.exec.LoadedSegmentDataProviderFactory;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerContext;
@ -160,7 +160,7 @@ public class MSQTestWorkerContext implements WorkerContext
),
indexIO,
injector.getInstance(DataSegmentProvider.class),
injector.getInstance(LoadedSegmentDataProviderFactory.class),
injector.getInstance(DataServerQueryHandlerFactory.class),
workerMemoryParameters
);
}
@ -184,8 +184,8 @@ public class MSQTestWorkerContext implements WorkerContext
}
@Override
public LoadedSegmentDataProviderFactory loadedSegmentDataProviderFactory()
public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
return injector.getInstance(LoadedSegmentDataProviderFactory.class);
return injector.getInstance(DataServerQueryHandlerFactory.class);
}
}

View File

@ -20,55 +20,89 @@
package org.apache.druid.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.client.InputStreamHolder;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.QueryResource;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Response handler for the {@link DataServerClient}. Handles the input stream from the data server and handles updating
* the {@link ResponseContext} from the header. Does not apply backpressure or query timeout.
*/
public class DataServerResponseHandler implements HttpResponseHandler<AppendableByteArrayInputStream, InputStream>
public class DataServerResponseHandler implements HttpResponseHandler<InputStream, InputStream>
{
private static final Logger log = new Logger(DataServerResponseHandler.class);
private final String queryId;
private final Query<?> query;
private final ResponseContext responseContext;
private final AtomicLong totalByteCount = new AtomicLong(0);
private final ObjectMapper objectMapper;
private final AtomicReference<TrafficCop> trafficCopRef = new AtomicReference<>();
private final long maxQueuedBytes;
final boolean usingBackpressure;
private final AtomicLong queuedByteCount = new AtomicLong(0);
private final AtomicBoolean done = new AtomicBoolean(false);
private final BlockingQueue<InputStreamHolder> queue = new LinkedBlockingQueue<>();
private final AtomicReference<String> fail = new AtomicReference<>();
private final long failTime;
public <T> DataServerResponseHandler(Query<T> query, ResponseContext responseContext, ObjectMapper objectMapper)
public DataServerResponseHandler(Query<?> query, ResponseContext responseContext, ObjectMapper objectMapper)
{
this.queryId = query.getId();
this.query = query;
this.responseContext = responseContext;
this.objectMapper = objectMapper;
QueryContext queryContext = query.context();
maxQueuedBytes = queryContext.getMaxQueuedBytes(0);
usingBackpressure = maxQueuedBytes > 0;
long startTimeMillis = System.currentTimeMillis();
if (queryContext.hasTimeout()) {
failTime = startTimeMillis + queryContext.getTimeout();
} else {
failTime = 0;
}
}
@Override
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response, TrafficCop trafficCop)
public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
log.debug("Received response status[%s] for queryId[%s]", response.getStatus(), queryId);
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
in.add(getContentBytes(response.getContent()));
trafficCopRef.set(trafficCop);
checkQueryTimeout();
log.debug("Received response status[%s] for queryId[%s]", response.getStatus(), query.getId());
final boolean continueReading;
try {
final String queryResponseHeaders = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
if (queryResponseHeaders != null) {
responseContext.merge(ResponseContext.deserialize(queryResponseHeaders, objectMapper));
}
return ClientResponse.finished(in);
continueReading = enqueue(response.getContent(), 0L);
}
catch (IOException e) {
catch (final IOException e) {
return ClientResponse.finished(
new AppendableByteArrayInputStream()
new InputStream()
{
@Override
public int read() throws IOException
@ -78,38 +112,171 @@ public class DataServerResponseHandler implements HttpResponseHandler<Appendable
}
);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return ClientResponse.finished(
new SequenceInputStream(
new Enumeration<InputStream>()
{
@Override
public boolean hasMoreElements()
{
if (fail.get() != null) {
throw new RE(fail.get());
}
checkQueryTimeout();
// Done is always true until the last stream has be put in the queue.
// Then the stream should be spouting good InputStreams.
synchronized (done) {
return !done.get() || !queue.isEmpty();
}
}
@Override
public InputStream nextElement()
{
if (fail.get() != null) {
throw new RE(fail.get());
}
try {
return dequeue();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
),
continueReading
);
}
@Override
public ClientResponse<AppendableByteArrayInputStream> handleChunk(
ClientResponse<AppendableByteArrayInputStream> clientResponse,
public ClientResponse<InputStream> handleChunk(
ClientResponse<InputStream> clientResponse,
HttpChunk chunk,
long chunkNum
)
{
clientResponse.getObj().add(getContentBytes(chunk.getContent()));
return clientResponse;
checkQueryTimeout();
final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes();
boolean continueReading = true;
if (bytes > 0) {
try {
continueReading = enqueue(channelBuffer, chunkNum);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
totalByteCount.addAndGet(bytes);
}
return ClientResponse.finished(clientResponse.getObj(), continueReading);
}
@Override
public ClientResponse<InputStream> done(ClientResponse<AppendableByteArrayInputStream> clientResponse)
public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
{
final AppendableByteArrayInputStream obj = clientResponse.getObj();
obj.done();
return ClientResponse.finished(obj);
log.debug("Finished reading response for queryId[%s]. Read total[%d]", query.getId(), totalByteCount.get());
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
// after done is set to true, regardless of the rest of the stream's state.
queue.put(InputStreamHolder.fromChannelBuffer(ChannelBuffers.EMPTY_BUFFER, Long.MAX_VALUE));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
finally {
done.set(true);
}
}
return ClientResponse.finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(ClientResponse<AppendableByteArrayInputStream> clientResponse, Throwable e)
public void exceptionCaught(ClientResponse<InputStream> clientResponse, Throwable e)
{
final AppendableByteArrayInputStream obj = clientResponse.getObj();
obj.exceptionCaught(e);
String msg = StringUtils.format(
"Query[%s] failed with exception msg [%s]",
query.getId(),
e.getMessage()
);
setupResponseReadFailure(msg, e);
}
private byte[] getContentBytes(ChannelBuffer content)
private boolean enqueue(ChannelBuffer buffer, long chunkNum) throws InterruptedException
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
// Increment queuedByteCount before queueing the object, so queuedByteCount is at least as high as
// the actual number of queued bytes at any particular time.
final InputStreamHolder holder = InputStreamHolder.fromChannelBuffer(buffer, chunkNum);
final long currentQueuedByteCount = queuedByteCount.addAndGet(holder.getLength());
queue.put(holder);
// True if we should keep reading.
return !usingBackpressure || currentQueuedByteCount < maxQueuedBytes;
}
private InputStream dequeue() throws InterruptedException
{
final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
if (holder == null) {
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] timed out.", query.getId()));
}
final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength());
if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this be?").resume(holder.getChunkNum());
}
return holder.getStream();
}
// Returns remaining timeout or throws exception if timeout already elapsed.
private long checkQueryTimeout()
{
long timeLeft = failTime - System.currentTimeMillis();
if (timeLeft <= 0) {
String msg = StringUtils.format("Query[%s] timed out.", query.getId());
setupResponseReadFailure(msg, null);
throw new QueryTimeoutException(msg);
} else {
return timeLeft;
}
}
private void setupResponseReadFailure(String msg, Throwable th)
{
fail.set(msg);
queue.clear();
queue.offer(
InputStreamHolder.fromStream(
new InputStream()
{
@Override
public int read() throws IOException
{
if (th != null) {
throw new IOException(msg, th);
} else {
throw new IOException(msg);
}
}
},
-1,
0
)
);
}
}

View File

@ -67,10 +67,15 @@ public class FixedSetServiceLocator implements ServiceLocator
}
Set<ServiceLocation> locationSet = serviceLocations.getLocations();
int size = locationSet.size();
if (size == 1) {
return Futures.immediateFuture(ServiceLocations.forLocation(locationSet.stream().findFirst().get()));
}
return Futures.immediateFuture(
ServiceLocations.forLocation(
locationSet.stream()
.skip(ThreadLocalRandom.current().nextInt(locationSet.size()))
.skip(ThreadLocalRandom.current().nextInt(size))
.findFirst()
.orElse(null)
)

View File

@ -19,6 +19,8 @@
package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
@ -27,6 +29,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
@ -37,6 +40,7 @@ import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.server.QueryResource;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
@ -46,36 +50,32 @@ import org.junit.Test;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.util.Collections;
import java.util.List;
import static org.apache.druid.query.Druids.newScanQueryBuilder;
import static org.mockito.Mockito.mock;
public class DataServerClientTest
{
MockServiceClient serviceClient;
ServiceClientFactory serviceClientFactory;
ObjectMapper jsonMapper;
ScanQuery query;
DataServerClient target;
private static final SegmentDescriptor SEGMENT_1 = new SegmentDescriptor(Intervals.of("2003/2004"), "v0", 1);
private MockServiceClient serviceClient;
private ObjectMapper jsonMapper;
private ScanQuery query;
private DataServerClient target;
@Before
public void setUp()
{
jsonMapper = DruidServiceTestUtils.newJsonMapper();
serviceClient = new MockServiceClient();
serviceClientFactory = (serviceName, serviceLocator, retryPolicy) -> serviceClient;
ServiceClientFactory serviceClientFactory = (serviceName, serviceLocator, retryPolicy) -> serviceClient;
query = newScanQueryBuilder()
.dataSource("dataSource1")
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(Intervals.of("2003/2004"), "v0", 1)
)
)
)
.intervals(new MultipleSpecificSegmentSpec(ImmutableList.of(SEGMENT_1)))
.columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(ImmutableMap.of("defaultTimeout", 5000L))
.build();
target = new DataServerClient(
@ -116,4 +116,78 @@ public class DataServerClientTest
Assert.assertEquals(ImmutableList.of(scanResultValue), result.toList());
}
@Test
public void testMissingSegmentsHeaderShouldAccumulate() throws JsonProcessingException
{
DataServerResponse dataServerResponse = new DataServerResponse(ImmutableList.of(SEGMENT_1));
RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/druid/v2/")
.jsonContent(jsonMapper, query);
serviceClient.expectAndRespond(
requestBuilder,
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON, QueryResource.HEADER_RESPONSE_CONTEXT, jsonMapper.writeValueAsString(dataServerResponse)),
jsonMapper.writeValueAsBytes(null)
);
ResponseContext responseContext = new DefaultResponseContext();
target.run(
query,
responseContext,
jsonMapper.getTypeFactory().constructType(ScanResultValue.class),
Closer.create()
);
Assert.assertEquals(1, responseContext.getMissingSegments().size());
}
@Test
public void testQueryFailure() throws JsonProcessingException
{
ScanQuery scanQueryWithTimeout = query.withOverriddenContext(ImmutableMap.of("maxQueuedBytes", 1, "timeout", 0));
ScanResultValue scanResultValue = new ScanResultValue(
null,
ImmutableList.of("id", "name"),
ImmutableList.of(
ImmutableList.of(1, "abc"),
ImmutableList.of(5, "efg")
));
RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/druid/v2/")
.jsonContent(jsonMapper, scanQueryWithTimeout);
serviceClient.expectAndRespond(
requestBuilder,
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(Collections.singletonList(scanResultValue))
);
ResponseContext responseContext = new DefaultResponseContext();
Assert.assertThrows(
QueryTimeoutException.class,
() -> target.run(
scanQueryWithTimeout,
responseContext,
jsonMapper.getTypeFactory().constructType(ScanResultValue.class),
Closer.create()
).toList()
);
}
private static class DataServerResponse
{
List<SegmentDescriptor> missingSegments;
@JsonCreator
public DataServerResponse(@JsonProperty("missingSegments") List<SegmentDescriptor> missingSegments)
{
this.missingSegments = missingSegments;
}
@JsonProperty("missingSegments")
public List<SegmentDescriptor> getMissingSegments()
{
return missingSegments;
}
}
}

View File

@ -39,6 +39,16 @@ public class FixedSetServiceLocatorTest
2
);
public static final DruidServerMetadata DATA_SERVER_2 = new DruidServerMetadata(
"TestDataServer",
"hostName:8083",
null,
2,
ServerType.REALTIME,
"tier1",
2
);
@Test
public void testLocateNullShouldBeClosed() throws ExecutionException, InterruptedException
{
@ -48,7 +58,6 @@ public class FixedSetServiceLocatorTest
Assert.assertTrue(serviceLocator.locate().get().isClosed());
}
@Test
public void testLocateSingleServer() throws ExecutionException, InterruptedException
{
@ -60,4 +69,18 @@ public class FixedSetServiceLocatorTest
serviceLocator.locate().get()
);
}
@Test
public void testLocateMultipleServers() throws ExecutionException, InterruptedException
{
FixedSetServiceLocator serviceLocator
= FixedSetServiceLocator.forDruidServerMetadata(ImmutableSet.of(DATA_SERVER_1, DATA_SERVER_2));
Assert.assertTrue(
ImmutableSet.of(
ServiceLocations.forLocation(ServiceLocation.fromDruidServerMetadata(DATA_SERVER_1)),
ServiceLocations.forLocation(ServiceLocation.fromDruidServerMetadata(DATA_SERVER_2))
).contains(serviceLocator.locate().get())
);
}
}