mirror of https://github.com/apache/druid.git
Show candidate hosts for the given query (#2282)
* Show candidate hosts for the given query * Added test cases & minor changes to address comments * Changed path-param to query-pram for intervals/numCandidates
This commit is contained in:
parent
67920c114e
commit
49c0fe0e8b
|
@ -59,7 +59,17 @@ Returns the dimensions of the datasource.
|
|||
|
||||
Returns the metrics of the datasource.
|
||||
|
||||
* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals-in-ISO8601-format}&numCandidates={numCandidates}`
|
||||
|
||||
Returns segment information lists including server locations for the given datasource and intervals. If "numCandidates" is not specified, it will return all servers for each interval.
|
||||
|
||||
* `/druid/broker/v1/loadstatus`
|
||||
|
||||
Returns a flag indicating if the broker knows about all segments in Zookeeper. This can be used to know when a broker node is ready to be queried after a restart.
|
||||
|
||||
|
||||
### POST
|
||||
|
||||
* `/druid/v2/candidates/`
|
||||
|
||||
Returns segment information lists including server locations for the given query.
|
||||
|
|
|
@ -145,4 +145,9 @@ public class QueryInterruptedException extends RuntimeException
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static QueryInterruptedException wrapIfNeeded(Throwable e)
|
||||
{
|
||||
return e instanceof QueryInterruptedException ? (QueryInterruptedException) e : new QueryInterruptedException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.query.DataSource;
|
||||
import io.druid.query.LocatedSegmentDescriptor;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.TimelineLookup;
|
||||
import io.druid.timeline.TimelineObjectHolder;
|
||||
import io.druid.timeline.partition.PartitionChunk;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ServerViewUtil
|
||||
{
|
||||
public static List<LocatedSegmentDescriptor> getTargetLocations(
|
||||
TimelineServerView serverView,
|
||||
String datasource,
|
||||
List<Interval> intervals,
|
||||
int numCandidates
|
||||
)
|
||||
{
|
||||
return getTargetLocations(serverView, new TableDataSource(datasource), intervals, numCandidates);
|
||||
}
|
||||
|
||||
public static List<LocatedSegmentDescriptor> getTargetLocations(
|
||||
TimelineServerView serverView,
|
||||
DataSource datasource,
|
||||
List<Interval> intervals,
|
||||
int numCandidates
|
||||
)
|
||||
{
|
||||
TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(datasource);
|
||||
if (timeline == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<LocatedSegmentDescriptor> located = Lists.newArrayList();
|
||||
for (Interval interval : intervals) {
|
||||
for (TimelineObjectHolder<String, ServerSelector> holder : timeline.lookup(interval)) {
|
||||
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
|
||||
ServerSelector selector = chunk.getObject();
|
||||
final SegmentDescriptor descriptor = new SegmentDescriptor(
|
||||
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
|
||||
);
|
||||
long size = selector.getSegment().getSize();
|
||||
List<DruidServerMetadata> candidates = selector.getCandidates(numCandidates);
|
||||
located.add(new LocatedSegmentDescriptor(descriptor, size, candidates));
|
||||
}
|
||||
}
|
||||
}
|
||||
return located;
|
||||
}
|
||||
}
|
|
@ -44,4 +44,13 @@ public class QueryableDruidServer
|
|||
{
|
||||
return client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "QueryableDruidServer{" +
|
||||
"server=" + server +
|
||||
", client=" + client +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,10 +19,15 @@
|
|||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -78,9 +83,40 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
}
|
||||
}
|
||||
|
||||
public List<DruidServerMetadata> getCandidates(final int numCandidates) {
|
||||
List<DruidServerMetadata> result = Lists.newArrayList();
|
||||
synchronized (this) {
|
||||
final DataSegment target = segment.get();
|
||||
for (Map.Entry<Integer, Set<QueryableDruidServer>> entry : toPrioritizedServers().entrySet()) {
|
||||
Set<QueryableDruidServer> servers = entry.getValue();
|
||||
TreeMap<Integer, Set<QueryableDruidServer>> tieredMap = Maps.newTreeMap();
|
||||
while (!servers.isEmpty()) {
|
||||
tieredMap.put(entry.getKey(), servers); // strategy.pick() removes entry
|
||||
QueryableDruidServer server = strategy.pick(tieredMap, target);
|
||||
if (server == null) {
|
||||
// regard this as any server in tieredMap is not appropriate
|
||||
break;
|
||||
}
|
||||
result.add(server.getServer().getMetadata());
|
||||
if (numCandidates > 0 && result.size() >= numCandidates) {
|
||||
return result;
|
||||
}
|
||||
servers.remove(server);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public QueryableDruidServer pick()
|
||||
{
|
||||
synchronized (this) {
|
||||
return strategy.pick(toPrioritizedServers(), segment.get());
|
||||
}
|
||||
}
|
||||
|
||||
private TreeMap<Integer, Set<QueryableDruidServer>> toPrioritizedServers()
|
||||
{
|
||||
final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
|
||||
for (QueryableDruidServer server : servers) {
|
||||
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
|
||||
|
@ -90,8 +126,6 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
}
|
||||
theServers.add(server);
|
||||
}
|
||||
|
||||
return strategy.pick(prioritizedServers, segment.get());
|
||||
}
|
||||
return prioritizedServers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* public, evolving
|
||||
* <p/>
|
||||
* extended version of SegmentDescriptor, which is internal class, with location and size information attached
|
||||
*/
|
||||
public class LocatedSegmentDescriptor
|
||||
{
|
||||
private final Interval interval;
|
||||
private final String version;
|
||||
private final int partitionNumber;
|
||||
private final long size;
|
||||
private final List<DruidServerMetadata> locations;
|
||||
|
||||
@JsonCreator
|
||||
public LocatedSegmentDescriptor(
|
||||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("version") String version,
|
||||
@JsonProperty("partitionNumber") int partitionNumber,
|
||||
@JsonProperty("size") long size,
|
||||
@JsonProperty("locations") List<DruidServerMetadata> locations
|
||||
)
|
||||
{
|
||||
this.interval = interval;
|
||||
this.version = version;
|
||||
this.partitionNumber = partitionNumber;
|
||||
this.size = size;
|
||||
this.locations = locations == null ? ImmutableList.<DruidServerMetadata>of() : locations;
|
||||
}
|
||||
|
||||
public LocatedSegmentDescriptor(SegmentDescriptor descriptor, long size, List<DruidServerMetadata> candidates)
|
||||
{
|
||||
this(descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber(), size, candidates);
|
||||
}
|
||||
|
||||
@JsonProperty("interval")
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
@JsonProperty("version")
|
||||
public String getVersion()
|
||||
{
|
||||
return version;
|
||||
}
|
||||
|
||||
@JsonProperty("partitionNumber")
|
||||
public int getPartitionNumber()
|
||||
{
|
||||
return partitionNumber;
|
||||
}
|
||||
|
||||
@JsonProperty("size")
|
||||
public long getSize()
|
||||
{
|
||||
return size;
|
||||
}
|
||||
|
||||
@JsonProperty("locations")
|
||||
public List<DruidServerMetadata> getLocations()
|
||||
{
|
||||
return locations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (!(o instanceof LocatedSegmentDescriptor)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LocatedSegmentDescriptor other = (LocatedSegmentDescriptor) o;
|
||||
|
||||
if (partitionNumber != other.partitionNumber) {
|
||||
return false;
|
||||
}
|
||||
if (!Objects.equals(interval, other.interval)) {
|
||||
return false;
|
||||
}
|
||||
if (!Objects.equals(version, other.version)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(partitionNumber, interval, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "LocatedSegmentDescriptor{" +
|
||||
"interval=" + interval +
|
||||
", version='" + version + '\'' +
|
||||
", partitionNumber=" + partitionNumber +
|
||||
", size=" + size +
|
||||
", locations=" + locations +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.server;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.ServerViewUtil;
|
||||
import io.druid.client.TimelineServerView;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.server.log.RequestLogger;
|
||||
import io.druid.server.security.AuthConfig;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Path("/druid/v2/")
|
||||
public class BrokerQueryResource extends QueryResource
|
||||
{
|
||||
private final TimelineServerView brokerServerView;
|
||||
|
||||
@Inject
|
||||
public BrokerQueryResource(
|
||||
QueryToolChestWarehouse warehouse,
|
||||
ServerConfig config,
|
||||
@Json ObjectMapper jsonMapper,
|
||||
@Smile ObjectMapper smileMapper,
|
||||
QuerySegmentWalker texasRanger,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger,
|
||||
QueryManager queryManager,
|
||||
AuthConfig authConfig,
|
||||
TimelineServerView brokerServerView
|
||||
)
|
||||
{
|
||||
super(warehouse, config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager, authConfig);
|
||||
this.brokerServerView = brokerServerView;
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/candidates")
|
||||
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
|
||||
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
|
||||
public Response getQueryTargets(
|
||||
InputStream in,
|
||||
@QueryParam("pretty") String pretty,
|
||||
@QueryParam("numCandidates") @DefaultValue("-1") int numCandidates,
|
||||
@Context final HttpServletRequest req
|
||||
) throws IOException
|
||||
{
|
||||
final ResponseContext context = createContext(req.getContentType(), pretty != null);
|
||||
try {
|
||||
Query<?> query = context.getObjectMapper().readValue(in, Query.class);
|
||||
return context.ok(
|
||||
ServerViewUtil.getTargetLocations(
|
||||
brokerServerView,
|
||||
query.getDataSource(),
|
||||
query.getIntervals(),
|
||||
numCandidates
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
return context.gotError(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,9 +33,11 @@ import com.sun.jersey.spi.container.ResourceFilters;
|
|||
import io.druid.client.DruidDataSource;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.client.FilteredServerInventoryView;
|
||||
import io.druid.client.InventoryView;
|
||||
import io.druid.client.ServerViewUtil;
|
||||
import io.druid.client.TimelineServerView;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.query.LocatedSegmentDescriptor;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.metadata.SegmentMetadataQueryConfig;
|
||||
import io.druid.server.http.security.DatasourceResourceFilter;
|
||||
|
@ -53,6 +55,7 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Interval;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
|
@ -60,6 +63,7 @@ import javax.ws.rs.Produces;
|
|||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
|
@ -300,6 +304,25 @@ public class ClientInfoResource
|
|||
return metrics;
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/{dataSourceName}/candidates")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@ResourceFilters(DatasourceResourceFilter.class)
|
||||
public Iterable<LocatedSegmentDescriptor> getQueryTargets(
|
||||
@PathParam("dataSourceName") String datasource,
|
||||
@QueryParam("intervals") String intervals,
|
||||
@QueryParam("numCandidates") @DefaultValue("-1") int numCandidates,
|
||||
@Context final HttpServletRequest req
|
||||
) throws IOException
|
||||
{
|
||||
List<Interval> intervalList = Lists.newArrayList();
|
||||
for (String interval : intervals.split(",")) {
|
||||
intervalList.add(Interval.parse(interval.trim()));
|
||||
}
|
||||
List<Interval> condensed = JodaUtils.condenseIntervals(intervalList);
|
||||
return ServerViewUtil.getTargetLocations(timelineServerView, datasource, condensed, numCandidates);
|
||||
}
|
||||
|
||||
protected DateTime getCurrentTime()
|
||||
{
|
||||
return new DateTime();
|
||||
|
|
|
@ -79,21 +79,21 @@ import java.util.UUID;
|
|||
@Path("/druid/v2/")
|
||||
public class QueryResource
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
|
||||
protected static final EmittingLogger log = new EmittingLogger(QueryResource.class);
|
||||
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
|
||||
private static final String APPLICATION_SMILE = "application/smile";
|
||||
protected static final String APPLICATION_SMILE = "application/smile";
|
||||
|
||||
private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024;
|
||||
protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024;
|
||||
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final ServerConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final QuerySegmentWalker texasRanger;
|
||||
private final ServiceEmitter emitter;
|
||||
private final RequestLogger requestLogger;
|
||||
private final QueryManager queryManager;
|
||||
private final AuthConfig authConfig;
|
||||
protected final QueryToolChestWarehouse warehouse;
|
||||
protected final ServerConfig config;
|
||||
protected final ObjectMapper jsonMapper;
|
||||
protected final ObjectMapper smileMapper;
|
||||
protected final QuerySegmentWalker texasRanger;
|
||||
protected final ServiceEmitter emitter;
|
||||
protected final RequestLogger requestLogger;
|
||||
protected final QueryManager queryManager;
|
||||
protected final AuthConfig authConfig;
|
||||
|
||||
@Inject
|
||||
public QueryResource(
|
||||
|
@ -167,19 +167,11 @@ public class QueryResource
|
|||
QueryToolChest toolChest = null;
|
||||
String queryId = null;
|
||||
|
||||
final String reqContentType = req.getContentType();
|
||||
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType)
|
||||
|| APPLICATION_SMILE.equals(reqContentType);
|
||||
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
|
||||
|
||||
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||
final ObjectWriter jsonWriter = pretty != null
|
||||
? objectMapper.writerWithDefaultPrettyPrinter()
|
||||
: objectMapper.writer();
|
||||
final ResponseContext context = createContext(req.getContentType(), pretty != null);
|
||||
|
||||
final String currThreadName = Thread.currentThread().getName();
|
||||
try {
|
||||
query = objectMapper.readValue(in, Query.class);
|
||||
query = context.getObjectMapper().readValue(in, Query.class);
|
||||
queryId = query.getId();
|
||||
if (queryId == null) {
|
||||
queryId = UUID.randomUUID().toString();
|
||||
|
@ -244,6 +236,7 @@ public class QueryResource
|
|||
try {
|
||||
final Query theQuery = query;
|
||||
final QueryToolChest theToolChest = toolChest;
|
||||
final ObjectWriter jsonWriter = context.newOutputWriter();
|
||||
Response.ResponseBuilder builder = Response
|
||||
.ok(
|
||||
new StreamingOutput()
|
||||
|
@ -285,7 +278,7 @@ public class QueryResource
|
|||
);
|
||||
}
|
||||
},
|
||||
contentType
|
||||
context.getContentType()
|
||||
)
|
||||
.header("X-Druid-Query-Id", queryId);
|
||||
|
||||
|
@ -344,9 +337,7 @@ public class QueryResource
|
|||
catch (Exception e2) {
|
||||
log.error(e2, "Unable to log query [%s]!", query);
|
||||
}
|
||||
return Response.serverError().type(contentType).entity(
|
||||
jsonWriter.writeValueAsBytes(new QueryInterruptedException(e))
|
||||
).build();
|
||||
return context.gotError(e);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Input stream has already been consumed by the json object mapper if query == null
|
||||
|
@ -390,12 +381,60 @@ public class QueryResource
|
|||
.addData("peer", req.getRemoteAddr())
|
||||
.emit();
|
||||
|
||||
return Response.serverError().type(contentType).entity(
|
||||
jsonWriter.writeValueAsBytes(new QueryInterruptedException(e))
|
||||
).build();
|
||||
return context.gotError(e);
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread().setName(currThreadName);
|
||||
}
|
||||
}
|
||||
|
||||
protected ResponseContext createContext(String requestType, boolean pretty)
|
||||
{
|
||||
boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) ||
|
||||
APPLICATION_SMILE.equals(requestType);
|
||||
String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
|
||||
return new ResponseContext(contentType, isSmile ? smileMapper : jsonMapper, pretty);
|
||||
}
|
||||
|
||||
protected static class ResponseContext
|
||||
{
|
||||
private final String contentType;
|
||||
private final ObjectMapper inputMapper;
|
||||
private final boolean isPretty;
|
||||
|
||||
ResponseContext(String contentType, ObjectMapper inputMapper, boolean isPretty)
|
||||
{
|
||||
this.contentType = contentType;
|
||||
this.inputMapper = inputMapper;
|
||||
this.isPretty = isPretty;
|
||||
}
|
||||
|
||||
String getContentType()
|
||||
{
|
||||
return contentType;
|
||||
}
|
||||
|
||||
public ObjectMapper getObjectMapper()
|
||||
{
|
||||
return inputMapper;
|
||||
}
|
||||
|
||||
ObjectWriter newOutputWriter()
|
||||
{
|
||||
return isPretty ? inputMapper.writerWithDefaultPrettyPrinter() : inputMapper.writer();
|
||||
}
|
||||
|
||||
Response ok(Object object) throws IOException
|
||||
{
|
||||
return Response.ok(newOutputWriter().writeValueAsString(object), contentType).build();
|
||||
}
|
||||
|
||||
Response gotError(Exception e) throws IOException
|
||||
{
|
||||
return Response.serverError()
|
||||
.type(contentType)
|
||||
.entity(newOutputWriter().writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e)))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.client.selector;
|
|||
import com.google.common.collect.Lists;
|
||||
import io.druid.client.DirectDruidClient;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.easymock.EasyMock;
|
||||
|
@ -31,6 +32,7 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class TierSelectorStrategyTest
|
||||
|
@ -51,8 +53,7 @@ public class TierSelectorStrategyTest
|
|||
|
||||
testTierSelectorStrategy(
|
||||
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()),
|
||||
Arrays.asList(lowPriority, highPriority),
|
||||
highPriority
|
||||
highPriority, lowPriority
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -71,8 +72,7 @@ public class TierSelectorStrategyTest
|
|||
|
||||
testTierSelectorStrategy(
|
||||
new LowestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()),
|
||||
Arrays.asList(lowPriority, highPriority),
|
||||
lowPriority
|
||||
lowPriority, highPriority
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -104,15 +104,13 @@ public class TierSelectorStrategyTest
|
|||
}
|
||||
}
|
||||
),
|
||||
Arrays.asList(lowPriority, mediumPriority, highPriority),
|
||||
mediumPriority
|
||||
mediumPriority, lowPriority, highPriority
|
||||
);
|
||||
}
|
||||
|
||||
private void testTierSelectorStrategy(
|
||||
TierSelectorStrategy tierSelectorStrategy,
|
||||
List<QueryableDruidServer> servers,
|
||||
QueryableDruidServer expectedSelection
|
||||
QueryableDruidServer... expectedSelection
|
||||
)
|
||||
{
|
||||
final ServerSelector serverSelector = new ServerSelector(
|
||||
|
@ -129,10 +127,21 @@ public class TierSelectorStrategyTest
|
|||
),
|
||||
tierSelectorStrategy
|
||||
);
|
||||
|
||||
List<QueryableDruidServer> servers = Lists.newArrayList(expectedSelection);
|
||||
|
||||
List<DruidServerMetadata> expectedCandidates = Lists.newArrayList();
|
||||
for (QueryableDruidServer server : servers) {
|
||||
expectedCandidates.add(server.getServer().getMetadata());
|
||||
}
|
||||
Collections.shuffle(servers);
|
||||
for (QueryableDruidServer server : servers) {
|
||||
serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment());
|
||||
}
|
||||
Assert.assertEquals(expectedSelection, serverSelector.pick());
|
||||
|
||||
Assert.assertEquals(expectedSelection[0], serverSelector.pick());
|
||||
Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1));
|
||||
Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class LocatedSegmentDescriptorSerdeTest
|
||||
{
|
||||
private final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
@Test
|
||||
public void testDimensionsSpecSerde() throws Exception
|
||||
{
|
||||
LocatedSegmentDescriptor expected = new LocatedSegmentDescriptor(
|
||||
new SegmentDescriptor(new Interval(100, 200), "version", 100),
|
||||
65535,
|
||||
Arrays.asList(
|
||||
new DruidServerMetadata("server1", "host1", 30000L, "historical", "tier1", 0),
|
||||
new DruidServerMetadata("server2", "host2", 40000L, "historical", "tier1", 1),
|
||||
new DruidServerMetadata("server3", "host3", 50000L, "realtime", "tier2", 2)
|
||||
)
|
||||
);
|
||||
|
||||
LocatedSegmentDescriptor actual = mapper.readValue(
|
||||
mapper.writeValueAsString(expected),
|
||||
LocatedSegmentDescriptor.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
}
|
|
@ -44,9 +44,9 @@ import io.druid.query.QuerySegmentWalker;
|
|||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.RetryQueryRunnerConfig;
|
||||
import io.druid.query.lookup.LookupModule;
|
||||
import io.druid.server.BrokerQueryResource;
|
||||
import io.druid.server.ClientInfoResource;
|
||||
import io.druid.server.ClientQuerySegmentWalker;
|
||||
import io.druid.server.QueryResource;
|
||||
import io.druid.server.coordination.broker.DruidBroker;
|
||||
import io.druid.server.http.BrokerResource;
|
||||
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
||||
|
@ -101,10 +101,10 @@ public class CliBroker extends ServerRunnable
|
|||
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
||||
|
||||
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
|
||||
Jerseys.addResource(binder, QueryResource.class);
|
||||
Jerseys.addResource(binder, BrokerQueryResource.class);
|
||||
Jerseys.addResource(binder, BrokerResource.class);
|
||||
Jerseys.addResource(binder, ClientInfoResource.class);
|
||||
LifecycleModule.register(binder, QueryResource.class);
|
||||
LifecycleModule.register(binder, BrokerQueryResource.class);
|
||||
LifecycleModule.register(binder, DruidBroker.class);
|
||||
|
||||
MetricsModule.register(binder, CacheMonitor.class);
|
||||
|
|
Loading…
Reference in New Issue