diff --git a/docs/content/design/broker.md b/docs/content/design/broker.md index 1510030fd60..7f20dfa0a48 100644 --- a/docs/content/design/broker.md +++ b/docs/content/design/broker.md @@ -59,7 +59,17 @@ Returns the dimensions of the datasource. Returns the metrics of the datasource. +* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals-in-ISO8601-format}&numCandidates={numCandidates}` + +Returns segment information lists including server locations for the given datasource and intervals. If "numCandidates" is not specified, it will return all servers for each interval. + * `/druid/broker/v1/loadstatus` Returns a flag indicating if the broker knows about all segments in Zookeeper. This can be used to know when a broker node is ready to be queried after a restart. + +### POST + +* `/druid/v2/candidates/` + +Returns segment information lists including server locations for the given query. diff --git a/processing/src/main/java/io/druid/query/QueryInterruptedException.java b/processing/src/main/java/io/druid/query/QueryInterruptedException.java index 8f8fe2cc34d..4ca01358787 100644 --- a/processing/src/main/java/io/druid/query/QueryInterruptedException.java +++ b/processing/src/main/java/io/druid/query/QueryInterruptedException.java @@ -145,4 +145,9 @@ public class QueryInterruptedException extends RuntimeException return null; } } + + public static QueryInterruptedException wrapIfNeeded(Throwable e) + { + return e instanceof QueryInterruptedException ? (QueryInterruptedException) e : new QueryInterruptedException(e); + } } diff --git a/server/src/main/java/io/druid/client/ServerViewUtil.java b/server/src/main/java/io/druid/client/ServerViewUtil.java new file mode 100644 index 00000000000..eca4c639b66 --- /dev/null +++ b/server/src/main/java/io/druid/client/ServerViewUtil.java @@ -0,0 +1,78 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.google.common.collect.Lists; +import io.druid.client.selector.ServerSelector; +import io.druid.query.DataSource; +import io.druid.query.LocatedSegmentDescriptor; +import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.PartitionChunk; +import org.joda.time.Interval; + +import java.util.Collections; +import java.util.List; + +/** + */ +public class ServerViewUtil +{ + public static List getTargetLocations( + TimelineServerView serverView, + String datasource, + List intervals, + int numCandidates + ) + { + return getTargetLocations(serverView, new TableDataSource(datasource), intervals, numCandidates); + } + + public static List getTargetLocations( + TimelineServerView serverView, + DataSource datasource, + List intervals, + int numCandidates + ) + { + TimelineLookup timeline = serverView.getTimeline(datasource); + if (timeline == null) { + return Collections.emptyList(); + } + List located = Lists.newArrayList(); + for (Interval interval : intervals) { + for (TimelineObjectHolder holder : timeline.lookup(interval)) { + for (PartitionChunk chunk : holder.getObject()) { + ServerSelector selector = chunk.getObject(); + final SegmentDescriptor descriptor = new SegmentDescriptor( + holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() + ); + long size = selector.getSegment().getSize(); + List candidates = selector.getCandidates(numCandidates); + located.add(new LocatedSegmentDescriptor(descriptor, size, candidates)); + } + } + } + return located; + } +} diff --git a/server/src/main/java/io/druid/client/selector/QueryableDruidServer.java b/server/src/main/java/io/druid/client/selector/QueryableDruidServer.java index e1e583db387..e4e298256f9 100644 --- a/server/src/main/java/io/druid/client/selector/QueryableDruidServer.java +++ b/server/src/main/java/io/druid/client/selector/QueryableDruidServer.java @@ -44,4 +44,13 @@ public class QueryableDruidServer { return client; } + + @Override + public String toString() + { + return "QueryableDruidServer{" + + "server=" + server + + ", client=" + client + + '}'; + } } diff --git a/server/src/main/java/io/druid/client/selector/ServerSelector.java b/server/src/main/java/io/druid/client/selector/ServerSelector.java index 3a4f58cfb8b..3236c4f8829 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelector.java @@ -19,10 +19,15 @@ package io.druid.client.selector; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.metamx.emitter.EmittingLogger; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; @@ -78,20 +83,49 @@ public class ServerSelector implements DiscoverySelector } } + public List getCandidates(final int numCandidates) { + List result = Lists.newArrayList(); + synchronized (this) { + final DataSegment target = segment.get(); + for (Map.Entry> entry : toPrioritizedServers().entrySet()) { + Set servers = entry.getValue(); + TreeMap> tieredMap = Maps.newTreeMap(); + while (!servers.isEmpty()) { + tieredMap.put(entry.getKey(), servers); // strategy.pick() removes entry + QueryableDruidServer server = strategy.pick(tieredMap, target); + if (server == null) { + // regard this as any server in tieredMap is not appropriate + break; + } + result.add(server.getServer().getMetadata()); + if (numCandidates > 0 && result.size() >= numCandidates) { + return result; + } + servers.remove(server); + } + } + } + return result; + } + public QueryableDruidServer pick() { synchronized (this) { - final TreeMap> prioritizedServers = new TreeMap<>(strategy.getComparator()); - for (QueryableDruidServer server : servers) { - Set theServers = prioritizedServers.get(server.getServer().getPriority()); - if (theServers == null) { - theServers = Sets.newHashSet(); - prioritizedServers.put(server.getServer().getPriority(), theServers); - } - theServers.add(server); - } - - return strategy.pick(prioritizedServers, segment.get()); + return strategy.pick(toPrioritizedServers(), segment.get()); } } + + private TreeMap> toPrioritizedServers() + { + final TreeMap> prioritizedServers = new TreeMap<>(strategy.getComparator()); + for (QueryableDruidServer server : servers) { + Set theServers = prioritizedServers.get(server.getServer().getPriority()); + if (theServers == null) { + theServers = Sets.newHashSet(); + prioritizedServers.put(server.getServer().getPriority(), theServers); + } + theServers.add(server); + } + return prioritizedServers; + } } diff --git a/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java new file mode 100644 index 00000000000..74418750919 --- /dev/null +++ b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java @@ -0,0 +1,134 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.druid.server.coordination.DruidServerMetadata; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; + +/** + * public, evolving + *

+ * extended version of SegmentDescriptor, which is internal class, with location and size information attached + */ +public class LocatedSegmentDescriptor +{ + private final Interval interval; + private final String version; + private final int partitionNumber; + private final long size; + private final List locations; + + @JsonCreator + public LocatedSegmentDescriptor( + @JsonProperty("interval") Interval interval, + @JsonProperty("version") String version, + @JsonProperty("partitionNumber") int partitionNumber, + @JsonProperty("size") long size, + @JsonProperty("locations") List locations + ) + { + this.interval = interval; + this.version = version; + this.partitionNumber = partitionNumber; + this.size = size; + this.locations = locations == null ? ImmutableList.of() : locations; + } + + public LocatedSegmentDescriptor(SegmentDescriptor descriptor, long size, List candidates) + { + this(descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber(), size, candidates); + } + + @JsonProperty("interval") + public Interval getInterval() + { + return interval; + } + + @JsonProperty("version") + public String getVersion() + { + return version; + } + + @JsonProperty("partitionNumber") + public int getPartitionNumber() + { + return partitionNumber; + } + + @JsonProperty("size") + public long getSize() + { + return size; + } + + @JsonProperty("locations") + public List getLocations() + { + return locations; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof LocatedSegmentDescriptor)) { + return false; + } + + LocatedSegmentDescriptor other = (LocatedSegmentDescriptor) o; + + if (partitionNumber != other.partitionNumber) { + return false; + } + if (!Objects.equals(interval, other.interval)) { + return false; + } + if (!Objects.equals(version, other.version)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return Objects.hash(partitionNumber, interval, version); + } + + @Override + public String toString() + { + return "LocatedSegmentDescriptor{" + + "interval=" + interval + + ", version='" + version + '\'' + + ", partitionNumber=" + partitionNumber + + ", size=" + size + + ", locations=" + locations + + '}'; + } +} diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java new file mode 100644 index 00000000000..d0ac0b1097c --- /dev/null +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -0,0 +1,102 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.inject.Inject; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.ServerViewUtil; +import io.druid.client.TimelineServerView; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.query.Query; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.RequestLogger; +import io.druid.server.security.AuthConfig; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; + +/** + */ +@Path("/druid/v2/") +public class BrokerQueryResource extends QueryResource +{ + private final TimelineServerView brokerServerView; + + @Inject + public BrokerQueryResource( + QueryToolChestWarehouse warehouse, + ServerConfig config, + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + QuerySegmentWalker texasRanger, + ServiceEmitter emitter, + RequestLogger requestLogger, + QueryManager queryManager, + AuthConfig authConfig, + TimelineServerView brokerServerView + ) + { + super(warehouse, config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager, authConfig); + this.brokerServerView = brokerServerView; + } + + @POST + @Path("/candidates") + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE}) + public Response getQueryTargets( + InputStream in, + @QueryParam("pretty") String pretty, + @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates, + @Context final HttpServletRequest req + ) throws IOException + { + final ResponseContext context = createContext(req.getContentType(), pretty != null); + try { + Query query = context.getObjectMapper().readValue(in, Query.class); + return context.ok( + ServerViewUtil.getTargetLocations( + brokerServerView, + query.getDataSource(), + query.getIntervals(), + numCandidates + ) + ); + } + catch (Exception e) { + return context.gotError(e); + } + } +} diff --git a/server/src/main/java/io/druid/server/ClientInfoResource.java b/server/src/main/java/io/druid/server/ClientInfoResource.java index 9b800b891d7..865d3a814f6 100644 --- a/server/src/main/java/io/druid/server/ClientInfoResource.java +++ b/server/src/main/java/io/druid/server/ClientInfoResource.java @@ -33,9 +33,11 @@ import com.sun.jersey.spi.container.ResourceFilters; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.FilteredServerInventoryView; -import io.druid.client.InventoryView; +import io.druid.client.ServerViewUtil; import io.druid.client.TimelineServerView; import io.druid.client.selector.ServerSelector; +import io.druid.common.utils.JodaUtils; +import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.TableDataSource; import io.druid.query.metadata.SegmentMetadataQueryConfig; import io.druid.server.http.security.DatasourceResourceFilter; @@ -53,6 +55,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -60,6 +63,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -300,6 +304,25 @@ public class ClientInfoResource return metrics; } + @GET + @Path("/{dataSourceName}/candidates") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(DatasourceResourceFilter.class) + public Iterable getQueryTargets( + @PathParam("dataSourceName") String datasource, + @QueryParam("intervals") String intervals, + @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates, + @Context final HttpServletRequest req + ) throws IOException + { + List intervalList = Lists.newArrayList(); + for (String interval : intervals.split(",")) { + intervalList.add(Interval.parse(interval.trim())); + } + List condensed = JodaUtils.condenseIntervals(intervalList); + return ServerViewUtil.getTargetLocations(timelineServerView, datasource, condensed, numCandidates); + } + protected DateTime getCurrentTime() { return new DateTime(); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 06681e65811..80490da8e5e 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -79,21 +79,21 @@ import java.util.UUID; @Path("/druid/v2/") public class QueryResource { - private static final EmittingLogger log = new EmittingLogger(QueryResource.class); + protected static final EmittingLogger log = new EmittingLogger(QueryResource.class); @Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE - private static final String APPLICATION_SMILE = "application/smile"; + protected static final String APPLICATION_SMILE = "application/smile"; - private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024; + protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024; - private final QueryToolChestWarehouse warehouse; - private final ServerConfig config; - private final ObjectMapper jsonMapper; - private final ObjectMapper smileMapper; - private final QuerySegmentWalker texasRanger; - private final ServiceEmitter emitter; - private final RequestLogger requestLogger; - private final QueryManager queryManager; - private final AuthConfig authConfig; + protected final QueryToolChestWarehouse warehouse; + protected final ServerConfig config; + protected final ObjectMapper jsonMapper; + protected final ObjectMapper smileMapper; + protected final QuerySegmentWalker texasRanger; + protected final ServiceEmitter emitter; + protected final RequestLogger requestLogger; + protected final QueryManager queryManager; + protected final AuthConfig authConfig; @Inject public QueryResource( @@ -167,19 +167,11 @@ public class QueryResource QueryToolChest toolChest = null; String queryId = null; - final String reqContentType = req.getContentType(); - final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType) - || APPLICATION_SMILE.equals(reqContentType); - final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; - - ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - final ObjectWriter jsonWriter = pretty != null - ? objectMapper.writerWithDefaultPrettyPrinter() - : objectMapper.writer(); + final ResponseContext context = createContext(req.getContentType(), pretty != null); final String currThreadName = Thread.currentThread().getName(); try { - query = objectMapper.readValue(in, Query.class); + query = context.getObjectMapper().readValue(in, Query.class); queryId = query.getId(); if (queryId == null) { queryId = UUID.randomUUID().toString(); @@ -244,6 +236,7 @@ public class QueryResource try { final Query theQuery = query; final QueryToolChest theToolChest = toolChest; + final ObjectWriter jsonWriter = context.newOutputWriter(); Response.ResponseBuilder builder = Response .ok( new StreamingOutput() @@ -285,7 +278,7 @@ public class QueryResource ); } }, - contentType + context.getContentType() ) .header("X-Druid-Query-Id", queryId); @@ -344,9 +337,7 @@ public class QueryResource catch (Exception e2) { log.error(e2, "Unable to log query [%s]!", query); } - return Response.serverError().type(contentType).entity( - jsonWriter.writeValueAsBytes(new QueryInterruptedException(e)) - ).build(); + return context.gotError(e); } catch (Exception e) { // Input stream has already been consumed by the json object mapper if query == null @@ -390,12 +381,60 @@ public class QueryResource .addData("peer", req.getRemoteAddr()) .emit(); - return Response.serverError().type(contentType).entity( - jsonWriter.writeValueAsBytes(new QueryInterruptedException(e)) - ).build(); + return context.gotError(e); } finally { Thread.currentThread().setName(currThreadName); } } + + protected ResponseContext createContext(String requestType, boolean pretty) + { + boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) || + APPLICATION_SMILE.equals(requestType); + String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; + return new ResponseContext(contentType, isSmile ? smileMapper : jsonMapper, pretty); + } + + protected static class ResponseContext + { + private final String contentType; + private final ObjectMapper inputMapper; + private final boolean isPretty; + + ResponseContext(String contentType, ObjectMapper inputMapper, boolean isPretty) + { + this.contentType = contentType; + this.inputMapper = inputMapper; + this.isPretty = isPretty; + } + + String getContentType() + { + return contentType; + } + + public ObjectMapper getObjectMapper() + { + return inputMapper; + } + + ObjectWriter newOutputWriter() + { + return isPretty ? inputMapper.writerWithDefaultPrettyPrinter() : inputMapper.writer(); + } + + Response ok(Object object) throws IOException + { + return Response.ok(newOutputWriter().writeValueAsString(object), contentType).build(); + } + + Response gotError(Exception e) throws IOException + { + return Response.serverError() + .type(contentType) + .entity(newOutputWriter().writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e))) + .build(); + } + } } diff --git a/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java b/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java index b5a3a8f4b7f..bc95553961c 100644 --- a/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java +++ b/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java @@ -22,6 +22,7 @@ package io.druid.client.selector; import com.google.common.collect.Lists; import io.druid.client.DirectDruidClient; import io.druid.client.DruidServer; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -31,6 +32,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; public class TierSelectorStrategyTest @@ -51,8 +53,7 @@ public class TierSelectorStrategyTest testTierSelectorStrategy( new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()), - Arrays.asList(lowPriority, highPriority), - highPriority + highPriority, lowPriority ); } @@ -71,8 +72,7 @@ public class TierSelectorStrategyTest testTierSelectorStrategy( new LowestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()), - Arrays.asList(lowPriority, highPriority), - lowPriority + lowPriority, highPriority ); } @@ -104,15 +104,13 @@ public class TierSelectorStrategyTest } } ), - Arrays.asList(lowPriority, mediumPriority, highPriority), - mediumPriority + mediumPriority, lowPriority, highPriority ); } private void testTierSelectorStrategy( TierSelectorStrategy tierSelectorStrategy, - List servers, - QueryableDruidServer expectedSelection + QueryableDruidServer... expectedSelection ) { final ServerSelector serverSelector = new ServerSelector( @@ -129,10 +127,21 @@ public class TierSelectorStrategyTest ), tierSelectorStrategy ); + + List servers = Lists.newArrayList(expectedSelection); + + List expectedCandidates = Lists.newArrayList(); + for (QueryableDruidServer server : servers) { + expectedCandidates.add(server.getServer().getMetadata()); + } + Collections.shuffle(servers); for (QueryableDruidServer server : servers) { serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment()); } - Assert.assertEquals(expectedSelection, serverSelector.pick()); + + Assert.assertEquals(expectedSelection[0], serverSelector.pick()); + Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1)); + Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2)); } } diff --git a/server/src/test/java/io/druid/query/LocatedSegmentDescriptorSerdeTest.java b/server/src/test/java/io/druid/query/LocatedSegmentDescriptorSerdeTest.java new file mode 100644 index 00000000000..e4c02119601 --- /dev/null +++ b/server/src/test/java/io/druid/query/LocatedSegmentDescriptorSerdeTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordination.DruidServerMetadata; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +/** + */ +public class LocatedSegmentDescriptorSerdeTest +{ + private final ObjectMapper mapper = new DefaultObjectMapper(); + + @Test + public void testDimensionsSpecSerde() throws Exception + { + LocatedSegmentDescriptor expected = new LocatedSegmentDescriptor( + new SegmentDescriptor(new Interval(100, 200), "version", 100), + 65535, + Arrays.asList( + new DruidServerMetadata("server1", "host1", 30000L, "historical", "tier1", 0), + new DruidServerMetadata("server2", "host2", 40000L, "historical", "tier1", 1), + new DruidServerMetadata("server3", "host3", 50000L, "realtime", "tier2", 2) + ) + ); + + LocatedSegmentDescriptor actual = mapper.readValue( + mapper.writeValueAsString(expected), + LocatedSegmentDescriptor.class + ); + + Assert.assertEquals(expected, actual); + } +} diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 3748c832bda..a10e5e5a4f8 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -44,9 +44,9 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.lookup.LookupModule; +import io.druid.server.BrokerQueryResource; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; -import io.druid.server.QueryResource; import io.druid.server.coordination.broker.DruidBroker; import io.druid.server.http.BrokerResource; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -101,10 +101,10 @@ public class CliBroker extends ServerRunnable binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); - Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, BrokerQueryResource.class); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); - LifecycleModule.register(binder, QueryResource.class); + LifecycleModule.register(binder, BrokerQueryResource.class); LifecycleModule.register(binder, DruidBroker.class); MetricsModule.register(binder, CacheMonitor.class);