Major change is to make tasks and segment queries streaming

* Made tasks/segments stream to calcite instead of storing it in memory
* Add num_rows to segments table
* Refactor JsonParserIterator
* Replace with closeable iterator
This commit is contained in:
Surekha Saharan 2018-09-07 16:33:02 -07:00
parent 3cb0f52db9
commit cf1895902b
11 changed files with 788 additions and 410 deletions

View File

@ -542,6 +542,7 @@ Segments table provides details on all Druid segments, whether they are publishe
|version|Version number (generally an ISO8601 timestamp corresponding to when the segment set was first started)|
|partition_num|Partition number (an integer, unique within a datasource+interval+version; may not necessarily be contiguous)|
|num_replicas|Number replicas of this segment currently being served|
|num_rows|Number rows in current segment|
|is_published|True if this segment has been published to the metadata store|
|is_available|True if this segment is currently being served by any server|
|is_realtime|True if this segment is being served on a realtime server|

View File

@ -19,9 +19,6 @@
package org.apache.druid.client;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
@ -32,7 +29,6 @@ import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
@ -52,14 +48,12 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.buffer.ChannelBuffer;
@ -71,20 +65,16 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;
import javax.ws.rs.core.MediaType;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -505,7 +495,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override
public JsonParserIterator<T> make()
{
return new JsonParserIterator<T>(typeRef, future, url, query);
return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
}
@Override
@ -531,113 +521,6 @@ public class DirectDruidClient<T> implements QueryRunner<T>
return retVal;
}
private class JsonParserIterator<T> implements Iterator<T>, Closeable
{
private JsonParser jp;
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final Query<T> query;
private final String url;
public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url, Query<T> query)
{
this.typeRef = typeRef;
this.future = future;
this.url = url;
this.query = query;
jp = null;
}
@Override
public boolean hasNext()
{
init();
if (jp.isClosed()) {
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
return false;
}
return true;
}
@Override
public T next()
{
init();
try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
return retVal;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
private void init()
{
if (jp == null) {
try {
InputStream is = future.get();
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
"query[%s] url[%s] timed out or max bytes limit reached.",
query.getId(),
url
),
host
);
} else {
jp = objectMapper.getFactory().createParser(is);
}
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw new QueryInterruptedException(cause, host);
} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(
e,
"Failure getting results for query[%s] url[%s] because of [%s]",
query.getId(),
url,
e.getMessage()
);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
}
}
}
@Override
public void close() throws IOException
{
if (jp != null) {
jp.close();
}
}
}
@Override
public String toString()
{

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.ResourceLimitExceededException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class JsonParserIterator<T> implements Iterator<T>, Closeable
{
private JsonParser jp;
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final Query<T> query;
private final String url;
private final String host;
private final ObjectMapper objectMapper;
public JsonParserIterator(
JavaType typeRef,
Future<InputStream> future,
String url,
Query<T> query,
String host,
ObjectMapper objectMapper
)
{
this.typeRef = typeRef;
this.future = future;
this.url = url;
this.query = query;
jp = null;
this.host = host;
this.objectMapper = objectMapper;
}
@Override
public boolean hasNext()
{
init();
if (jp.isClosed()) {
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
return false;
}
return true;
}
@Override
public T next()
{
init();
try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
return retVal;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
private void init()
{
if (jp == null) {
try {
InputStream is = future.get();
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
"query[%s] url[%s] timed out or max bytes limit reached.",
query.getId(),
url
),
host
);
} else {
jp = objectMapper.getFactory().createParser(is);
}
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw new QueryInterruptedException(cause, host);
} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(
e,
"Failure getting results for query[%s] url[%s] because of [%s]",
query.getId(),
url,
e.getMessage()
);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
}
}
}
@Override
public void close() throws IOException
{
if (jp != null) {
jp.close();
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.discovery;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.selector.Server;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
@ -133,6 +134,17 @@ public class DruidLeaderClient
return go(request, new FullResponseHandler(StandardCharsets.UTF_8));
}
public <Intermediate, Final> ListenableFuture<Final> goStream(
final Request request,
final HttpResponseHandler<Intermediate, Final> handler
)
{
return httpClient.go(
request,
handler
);
}
/**
* Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/

View File

@ -50,6 +50,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
*/
@ -136,6 +137,23 @@ public class MetadataResource
return Response.status(Response.Status.OK).entity(dataSource).build();
}
@GET
@Path("/segments")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response getDatabaseSegmentSegments()
{
final Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getInventory();
final Set<DataSegment> metadataSegments = druidDataSources
.stream()
.flatMap(t -> t.getSegments().stream())
.collect(Collectors.toSet());
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
return builder.entity(metadataSegments).build();
}
@GET
@Path("/datasources/{dataSourceName}/segments")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -23,7 +23,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlCollation;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ConversionUtil;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.DateTimes;
@ -36,20 +48,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.InformationSchema;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlCollation;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ConversionUtil;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
@ -104,7 +103,7 @@ public class Calcites
public static SchemaPlus createRootSchema(
final TimelineServerView serverView,
final Schema druidSchema,
final DruidSchema druidSchema,
final AuthorizerMapper authorizerMapper,
final DruidLeaderClient coordinatorDruidLeaderClient,
final DruidLeaderClient overlordDruidLeaderClient,
@ -117,6 +116,7 @@ public class Calcites
rootSchema.add(
SystemSchema.NAME,
new SystemSchema(
druidSchema,
serverView,
authorizerMapper,
coordinatorDruidLeaderClient,

View File

@ -26,23 +26,13 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.interpreter.BindableRel;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
@ -61,13 +51,28 @@ import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidRel;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
public class DruidPlanner implements Closeable
{
@ -309,19 +314,88 @@ public class DruidPlanner implements Closeable
} else {
final BindableRel theRel = bindableRel;
final DataContext dataContext = plannerContext.createDataContext((JavaTypeFactory) planner.getTypeFactory());
final Supplier<Sequence<Object[]>> resultsSupplier = new Supplier<Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> get()
{
final Enumerable enumerable = theRel.bind(dataContext);
return Sequences.simple(enumerable);
}
};
final Supplier<Sequence<Object[]>> resultsSupplier = () -> new BaseSequence<>(
new BaseSequence.IteratorMaker<Object[], CloseableEnumerableIterator>()
{
@Override
public CloseableEnumerableIterator make()
{
final Enumerable enumerable = theRel.bind(dataContext).where(t -> t != null);
final Enumerator enumerator = enumerable.enumerator();
return new CloseableEnumerableIterator(new Iterator<Object[]>()
{
@Override
public boolean hasNext()
{
return enumerator.moveNext();
}
@Override
public Object[] next()
{
return (Object[]) enumerator.current();
}
}, () -> enumerator.close());
}
@Override
public void cleanup(CloseableEnumerableIterator iterFromMake)
{
iterFromMake.close();
}
}
);
return new PlannerResult(resultsSupplier, root.validatedRowType);
}
}
static class CloseableEnumerableIterator implements Iterator<Object[]>
{
private final Iterator<Object[]> it;
private final Closeable closeable;
public CloseableEnumerableIterator(Iterator<Object[]> it, Closeable closeable)
{
this.it = it;
this.closeable = closeable;
}
@Override
public boolean hasNext()
{
return it.hasNext();
}
@Override
public Object[] next()
{
return it.next();
}
@Override
public void remove()
{
it.remove();
}
@Override
public void forEachRemaining(Consumer<? super Object[]> action)
{
it.forEachRemaining(action);
}
public void close()
{
try {
closeable.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private PlannerResult planExplanation(
final RelNode rel,
final SqlExplain explain

View File

@ -28,10 +28,13 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Sequence;
@ -49,6 +52,7 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -57,13 +61,10 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.sql.calcite.view.DruidViewMacro;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -71,6 +72,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
@ -102,9 +104,10 @@ public class DruidSchema extends AbstractSchema
// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
private final Object lock = new Object();
// DataSource -> Segment -> RowSignature for that segment.
// Use TreeMap for segments so they are merged in deterministic order, from older to newer.
private final Map<String, TreeMap<DataSegment, RowSignature>> segmentSignatures = new HashMap<>();
// DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment.
// Use ConcurrentSkipListMap for segments so they are merged in deterministic order, from older to newer.
// This data structure need to be concurrent since SystemSchema accesses it
private final Map<String, ConcurrentSkipListMap<DataSegment, SegmentMetadataHolder>> segmentMetadataInfo = new ConcurrentHashMap<>();
// All mutable segments.
private final Set<DataSegment> mutableSegments = new TreeSet<>(SEGMENT_ORDER);
@ -319,25 +322,32 @@ public class DruidSchema extends AbstractSchema
private void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
synchronized (lock) {
final Map<DataSegment, RowSignature> knownSegments = segmentSignatures.get(segment.getDataSource());
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
if (knownSegments == null || !knownSegments.containsKey(segment)) {
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
final long isPublished = server.getType().toString().equals(ServerType.HISTORICAL.toString()) ? 1 : 0;
SegmentMetadataHolder holder = new SegmentMetadataHolder(null, isPublished, 1, isRealtime, 1, null);
// Unknown segment.
setSegmentSignature(segment, null);
setSegmentSignature(segment, holder);
segmentsNeedingRefresh.add(segment);
if (!server.segmentReplicatable()) {
log.debug("Added new mutable segment[%s].", segment.getIdentifier());
mutableSegments.add(segment);
} else {
log.debug("Added new immutable segment[%s].", segment.getIdentifier());
}
} else if (server.segmentReplicatable()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
mutableSegments.remove(segment);
log.debug("Segment[%s] has become immutable.", segment.getIdentifier());
} else {
if (knownSegments != null && knownSegments.containsKey(segment)) {
SegmentMetadataHolder holder = knownSegments.get(segment);
holder.setNumReplicas(holder.getNumReplicas() + 1);
}
if (server.segmentReplicatable()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
mutableSegments.remove(segment);
log.debug("Segment[%s] has become immutable.", segment.getIdentifier());
}
}
if (!tables.containsKey(segment.getDataSource())) {
refreshImmediately = true;
}
@ -355,11 +365,11 @@ public class DruidSchema extends AbstractSchema
segmentsNeedingRefresh.remove(segment);
mutableSegments.remove(segment);
final Map<DataSegment, RowSignature> dataSourceSegments = segmentSignatures.get(segment.getDataSource());
final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource());
dataSourceSegments.remove(segment);
if (dataSourceSegments.isEmpty()) {
segmentSignatures.remove(segment.getDataSource());
segmentMetadataInfo.remove(segment.getDataSource());
tables.remove(segment.getDataSource());
log.info("Removed all metadata for dataSource[%s].", segment.getDataSource());
}
@ -430,9 +440,14 @@ public class DruidSchema extends AbstractSchema
if (segment == null) {
log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId());
} else {
final RowSignature rowSignature = analysisToRowSignature(analysis);
final Pair<RowSignature, Long> pair = analysisToRowSignature(analysis);
final RowSignature rowSignature = pair.lhs;
log.debug("Segment[%s] has signature[%s].", segment.getIdentifier(), rowSignature);
setSegmentSignature(segment, rowSignature);
final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource());
SegmentMetadataHolder holder = dataSourceSegments.get(segment);
holder.setRowSignature(rowSignature);
holder.setNumRows(pair.rhs);
setSegmentSignature(segment, holder);
retVal.add(segment);
}
@ -454,22 +469,23 @@ public class DruidSchema extends AbstractSchema
return retVal;
}
private void setSegmentSignature(final DataSegment segment, final RowSignature rowSignature)
private void setSegmentSignature(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
{
synchronized (lock) {
segmentSignatures.computeIfAbsent(segment.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER))
.put(segment, rowSignature);
segmentMetadataInfo.computeIfAbsent(segment.getDataSource(), x -> new ConcurrentSkipListMap<>(SEGMENT_ORDER))
.put(segment, segmentMetadataHolder);
}
}
private DruidTable buildDruidTable(final String dataSource)
{
synchronized (lock) {
final TreeMap<DataSegment, RowSignature> segmentMap = segmentSignatures.get(dataSource);
final ConcurrentSkipListMap<DataSegment, SegmentMetadataHolder> segmentMap = segmentMetadataInfo.get(dataSource);
final Map<String, ValueType> columnTypes = new TreeMap<>();
if (segmentMap != null) {
for (RowSignature rowSignature : segmentMap.values()) {
for (SegmentMetadataHolder segmentMetadataHolder : segmentMap.values()) {
final RowSignature rowSignature = segmentMetadataHolder.getRowSignature();
if (rowSignature != null) {
for (String column : rowSignature.getRowOrder()) {
// Newer column types should override older ones.
@ -516,10 +532,9 @@ public class DruidSchema extends AbstractSchema
return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery, authenticationResult, null);
}
private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis)
private static Pair<RowSignature, Long> analysisToRowSignature(final SegmentAnalysis analysis)
{
final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
@ -538,7 +553,12 @@ public class DruidSchema extends AbstractSchema
rowSignatureBuilder.add(entry.getKey(), valueType);
}
return Pair.of(rowSignatureBuilder.build(), analysis.getNumRows());
}
return rowSignatureBuilder.build();
// note this is a mutable map accessed only by SystemSchema
public Map<String, ConcurrentSkipListMap<DataSegment, SegmentMetadataHolder>> getSegmentMetadataInfo()
{
return segmentMetadataInfo;
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
public class SegmentMetadataHolder
{
private final Object lock = new Object();
private RowSignature rowSignature;
private final long isPublished;
private final long isAvailable;
private final long isRealtime;
private int numReplicas;
@Nullable
private Long numRows;
public SegmentMetadataHolder(
@Nullable RowSignature rowSignature,
long isPublished,
long isAvailable,
long isRealtime,
int numReplicas,
@Nullable Long numRows
)
{
this.rowSignature = rowSignature;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
this.numReplicas = numReplicas;
this.numRows = numRows;
}
public long isPublished()
{
synchronized (lock) {
return isPublished;
}
}
public long isAvailable()
{
synchronized (lock) {
return isAvailable;
}
}
public long isRealtime()
{
synchronized (lock) {
return isRealtime;
}
}
public int getNumReplicas()
{
synchronized (lock) {
return numReplicas;
}
}
@Nullable
public long getNumRows()
{
synchronized (lock) {
return numRows;
}
}
@Nullable
public RowSignature getRowSignature()
{
synchronized (lock) {
return rowSignature;
}
}
public void setRowSignature(RowSignature rowSignature)
{
synchronized (lock) {
this.rowSignature = rowSignature;
lock.notifyAll();
}
}
public void setNumRows(long rows)
{
synchronized (lock) {
this.numRows = rows;
lock.notifyAll();
}
}
public void setNumReplicas(int replicas)
{
synchronized (lock) {
this.numReplicas = replicas;
lock.notifyAll();
}
}
}

View File

@ -20,13 +20,18 @@ package org.apache.druid.sql.calcite.schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.DefaultEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@ -35,7 +40,7 @@ import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
@ -45,23 +50,30 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.jboss.netty.handler.codec.http.HttpResponse;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
public class SystemSchema extends AbstractSchema
{
@ -72,7 +84,6 @@ public class SystemSchema extends AbstractSchema
private static final String SERVERS_TABLE = "servers";
private static final String SEGMENT_SERVERS_TABLE = "segment_servers";
private static final String TASKS_TABLE = "tasks";
private static final int SEGMENTS_TABLE_SIZE;
private static final int SEGMENT_SERVERS_TABLE_SIZE;
private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
@ -85,6 +96,7 @@ public class SystemSchema extends AbstractSchema
.add("version", ValueType.STRING)
.add("partition_num", ValueType.STRING)
.add("num_replicas", ValueType.LONG)
.add("num_rows", ValueType.LONG)
.add("is_published", ValueType.LONG)
.add("is_available", ValueType.LONG)
.add("is_realtime", ValueType.LONG)
@ -124,12 +136,12 @@ public class SystemSchema extends AbstractSchema
private final Map<String, Table> tableMap;
static {
SEGMENTS_TABLE_SIZE = SEGMENTS_SIGNATURE.getRowOrder().size();
SEGMENT_SERVERS_TABLE_SIZE = SERVERSEGMENTS_SIGNATURE.getRowOrder().size();
}
@Inject
public SystemSchema(
final DruidSchema druidSchema,
final TimelineServerView serverView,
final AuthorizerMapper authorizerMapper,
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
@ -139,7 +151,7 @@ public class SystemSchema extends AbstractSchema
{
Preconditions.checkNotNull(serverView, "serverView");
this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, new SegmentsTable(serverView, coordinatorDruidLeaderClient, jsonMapper),
SEGMENTS_TABLE, new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper),
SERVERS_TABLE, new ServersTable(serverView),
SEGMENT_SERVERS_TABLE, new ServerSegmentsTable(serverView),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper)
@ -154,17 +166,17 @@ public class SystemSchema extends AbstractSchema
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final TimelineServerView serverView;
private final DruidSchema druidSchema;
private final DruidLeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper;
public SegmentsTable(
TimelineServerView serverView,
DruidSchema druidSchemna,
DruidLeaderClient druidLeaderClient,
ObjectMapper jsonMapper
)
{
this.serverView = serverView;
this.druidSchema = druidSchemna;
this.druidLeaderClient = druidLeaderClient;
this.jsonMapper = jsonMapper;
}
@ -184,187 +196,135 @@ public class SystemSchema extends AbstractSchema
@Override
public Enumerable<Object[]> scan(DataContext root)
{
final List<Object[]> rows = new ArrayList<>();
final List<ImmutableDruidDataSource> druidDataSourceList = getMetadataSegments(druidLeaderClient, jsonMapper);
final List<DataSegment> metadataSegments = druidDataSourceList
.stream()
.flatMap(t -> t.getSegments().stream())
.collect(Collectors.toList());
final Map<String, DataSegment> publishedSegments = metadataSegments
.stream()
.collect(Collectors.toMap(
DataSegment::getIdentifier,
Function.identity()
));
final Map<String, DataSegment> availableSegments = new HashMap<>();
final Map<String, QueryableDruidServer> serverViewClients = serverView.getClients();
for (QueryableDruidServer queryableDruidServer : serverViewClients.values()) {
final DruidServer druidServer = queryableDruidServer.getServer();
final ServerType type = druidServer.getType();
final Map<String, DataSegment> segments = new HashMap<>(druidServer.getSegments());
final long isRealtime = druidServer.segmentReplicatable() ? 0 : 1;
for (Map.Entry<String, DataSegment> segmentEntry : segments.entrySet()) {
String segmentId = segmentEntry.getKey();
DataSegment segment = segmentEntry.getValue();
int numReplicas = 1;
if (availableSegments.containsKey(segmentId)) {
//do not create new row if a segmentId has been seen previously
// but increment the replica count and update row
numReplicas++;
updateRow(segmentId, numReplicas, rows);
continue;
}
availableSegments.putIfAbsent(segmentId, segment);
long isAvailable = 0;
final long isPublished = publishedSegments.containsKey(segmentId) ? 1 : 0;
if (type.toString().equals(ServerType.HISTORICAL.toString())
|| type.toString().equals(ServerType.REALTIME.toString())
|| type.toString().equals(ServerType.INDEXER_EXECUTOR.toString())) {
isAvailable = 1;
}
String payload;
try {
payload = jsonMapper.writeValueAsString(segment);
}
catch (JsonProcessingException e) {
log.error(e, "Error getting segment payload for segment %s", segmentId);
throw new RuntimeException(e);
}
final Object[] row = createRow(
segment.getIdentifier(),
segment.getDataSource(),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getSize(),
segment.getVersion(),
segment.getShardSpec().getPartitionNum(),
numReplicas,
isPublished,
isAvailable,
isRealtime,
payload
);
rows.add(row);
}
}
//process publishedSegments
for (Map.Entry<String, DataSegment> segmentEntry : publishedSegments.entrySet()) {
String segmentId = segmentEntry.getKey();
//skip the published segments which are already processed
if (availableSegments.containsKey(segmentId)) {
continue;
}
DataSegment segment = segmentEntry.getValue();
String payload;
try {
payload = jsonMapper.writeValueAsString(segment);
}
catch (JsonProcessingException e) {
log.error(e, "Error getting segment payload for segment %s", segmentId);
throw new RuntimeException(e);
}
final Object[] row = createRow(
segment.getIdentifier(),
segment.getDataSource(),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getSize(),
segment.getVersion(),
segment.getShardSpec().getPartitionNum(),
0,
1,
0,
0,
payload
);
rows.add(row);
}
return Linq4j.asEnumerable(rows);
}
private void updateRow(String segmentId, int replicas, List<Object[]> rows)
{
Object[] oldRow = null;
Object[] newRow = null;
for (Object[] row : rows) {
if (row[0].equals(segmentId)) {
oldRow = row;
row[7] = replicas;
newRow = row;
break;
}
//get available segments from druidSchema
Map<String, ConcurrentSkipListMap<DataSegment, SegmentMetadataHolder>> getSegmentMetadataInfo = druidSchema.getSegmentMetadataInfo();
final Map<DataSegment, SegmentMetadataHolder> availableSegmentMetadata = new HashMap<>();
for (ConcurrentSkipListMap<DataSegment, SegmentMetadataHolder> val : getSegmentMetadataInfo.values()) {
availableSegmentMetadata.putAll(val);
}
if (oldRow == null || newRow == null) {
log.error("Cannot update row if the segment[%s] is not present in the existing rows", segmentId);
throw new RuntimeException("No row exists with segmentId " + segmentId);
}
rows.remove(oldRow);
rows.add(newRow);
}
final Iterator<Entry<DataSegment, SegmentMetadataHolder>> availableSegmentEntries = availableSegmentMetadata.entrySet()
.iterator();
//get published segments from coordinator
final JsonParserIterator<DataSegment> metadataSegments = getMetadataSegments(
druidLeaderClient,
jsonMapper
);
Set<String> availableSegmentIds = new HashSet<>();
final FluentIterable<Object[]> availableSegments = FluentIterable
.from(() -> availableSegmentEntries)
.transform(val -> {
try {
if (!availableSegmentIds.contains(val.getKey().getIdentifier())) {
availableSegmentIds.add(val.getKey().getIdentifier());
}
return new Object[]{
val.getKey().getIdentifier(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart(),
val.getKey().getInterval().getEnd(),
val.getKey().getSize(),
val.getKey().getVersion(),
val.getKey().getShardSpec().getPartitionNum(),
val.getValue().getNumReplicas(),
val.getValue().getNumRows(),
val.getValue().isPublished(),
val.getValue().isAvailable(),
val.getValue().isRealtime(),
jsonMapper.writeValueAsString(val.getKey())
};
}
catch (JsonProcessingException e) {
log.error(e, "Error getting segment payload for segment %s", val.getKey().getIdentifier());
throw new RuntimeException(e);
}
});
final FluentIterable<Object[]> publishedSegments = FluentIterable
.from(() -> metadataSegments)
.transform(val -> {
try {
if (availableSegmentIds.contains(val.getIdentifier())) {
return null;
}
return new Object[]{
val.getIdentifier(),
val.getDataSource(),
val.getInterval().getStart(),
val.getInterval().getEnd(),
val.getSize(),
val.getVersion(),
val.getShardSpec().getPartitionNum(),
0,
-1,
1,
0,
0,
jsonMapper.writeValueAsString(val)
};
}
catch (JsonProcessingException e) {
log.error(e, "Error getting segment payload for segment %s", val.getIdentifier());
throw new RuntimeException(e);
}
});
Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
Iterables.concat(availableSegments, publishedSegments));
return Linq4j.asEnumerable(allSegments);
private Object[] createRow(
String identifier,
String dataSource,
DateTime start,
DateTime end,
long size,
String version,
int partitionNum,
int numReplicas,
long isPublished,
long isAvailable,
long isRealtime,
String payload
)
{
final Object[] row = new Object[SEGMENTS_TABLE_SIZE];
row[0] = identifier;
row[1] = dataSource;
row[2] = start;
row[3] = end;
row[4] = size;
row[5] = version;
row[6] = partitionNum;
row[7] = numReplicas;
row[8] = isPublished;
row[9] = isAvailable;
row[10] = isRealtime;
row[11] = payload;
return row;
}
// Note that coordinator must be up to get segments
List<ImmutableDruidDataSource> getMetadataSegments(
JsonParserIterator<DataSegment> getMetadataSegments(
DruidLeaderClient coordinatorClient,
ObjectMapper jsonMapper
)
{
try {
FullResponseHolder response = coordinatorClient.go(
coordinatorClient.makeRequest(
HttpMethod.GET,
StringUtils.format(
"/druid/coordinator/v1/datasources?full"
)
)
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while fetching metadata segments status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(
response.getContent(), new TypeReference<List<ImmutableDruidDataSource>>()
{
}
Request request;
try {
request = coordinatorClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/coordinator/v1/metadata/segments")
);
}
catch (Exception e) {
catch (IOException e) {
throw new RuntimeException(e);
}
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
ListenableFuture<InputStream> future = coordinatorClient.goStream(
request,
responseHandler
);
try {
future.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
if (responseHandler.status != HttpServletResponse.SC_OK) {
throw new ISE(
"Error while fetching metadata segments status[%s] description[%s]",
responseHandler.status,
responseHandler.description
);
}
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
{
});
JsonParserIterator<DataSegment> iterator = new JsonParserIterator<>(
typeRef,
future,
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper
);
return iterator;
}
}
@ -476,62 +436,137 @@ public class SystemSchema extends AbstractSchema
@Override
public Enumerable<Object[]> scan(DataContext root)
{
final List<TaskStatusPlus> tasks = getTasks(druidLeaderClient, jsonMapper);
final FluentIterable<Object[]> results = FluentIterable
.from(tasks)
.transform(
task -> new Object[]{
task.getId(),
task.getType(),
task.getDataSource(),
task.getCreatedTime(),
task.getQueueInsertionTime(),
task.getState(),
task.getRunnerTaskState(),
task.getDuration(),
task.getLocation() != null
? task.getLocation().getHost() + ":" + (task.getLocation().getTlsPort()
== -1
? task.getLocation()
.getPort()
: task.getLocation().getTlsPort())
: null,
task.getErrorMsg()
}
);
class TasksEnumerable extends DefaultEnumerable<Object[]>
{
private final JsonParserIterator<TaskStatusPlus> it;
return Linq4j.asEnumerable(results);
public TasksEnumerable(JsonParserIterator<TaskStatusPlus> tasks)
{
this.it = tasks;
}
@Override
public Iterator<Object[]> iterator()
{
throw new UnsupportedOperationException("Do not use iterator(), it cannot be closed.");
}
@Override
public Enumerator<Object[]> enumerator()
{
return new Enumerator<Object[]>()
{
@Override
public Object[] current()
{
TaskStatusPlus task = it.next();
return new Object[]{task.getId(), task.getType(),
task.getDataSource(),
task.getCreatedTime(),
task.getQueueInsertionTime(),
task.getState(),
task.getRunnerTaskState(),
task.getDuration(),
task.getLocation() != null
? task.getLocation().getHost() + ":" + (task.getLocation().getTlsPort()
== -1
? task.getLocation()
.getPort()
: task.getLocation().getTlsPort())
: null,
task.getErrorMsg()};
}
@Override
public boolean moveNext()
{
return it.hasNext();
}
@Override
public void reset()
{
}
@Override
public void close()
{
try {
it.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
}
return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper));
}
//Note that overlord must be up to get tasks
private List<TaskStatusPlus> getTasks(
private JsonParserIterator<TaskStatusPlus> getTasks(
DruidLeaderClient indexingServiceClient,
ObjectMapper jsonMapper
)
{
Request request;
try {
final FullResponseHolder response = indexingServiceClient.go(
indexingServiceClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/tasks"))
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while fetching tasks status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(
response.getContent(),
new TypeReference<List<TaskStatusPlus>>()
{
}
request = indexingServiceClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/indexer/v1/tasks")
);
}
catch (IOException | InterruptedException e) {
catch (IOException e) {
throw new RuntimeException(e);
}
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
ListenableFuture<InputStream> future = indexingServiceClient.goStream(
request,
responseHandler
);
try {
future.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
if (responseHandler.status != HttpServletResponse.SC_OK) {
throw new ISE(
"Error while fetching tasks status[%s] description[%s]",
responseHandler.status,
responseHandler.description
);
}
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
{
});
JsonParserIterator<TaskStatusPlus> iterator = new JsonParserIterator<>(
typeRef,
future,
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper
);
return iterator;
}
}
static class BytesAccumulatingResponseHandler extends InputStreamResponseHandler
{
private int status;
private String description;
@Override
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response)
{
status = response.getStatus().getCode();
description = response.getStatus().getReasonPhrase();
return ClientResponse.unfinished(super.handleResponse(response).getObj());
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@ -46,32 +47,46 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.ReflectionQueryToolChestWarehouse;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
public class SystemSchemaTest extends CalciteTestBase
{
private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
private SystemSchema schema;
private SpecificSegmentsQuerySegmentWalker walker;
@ -79,20 +94,49 @@ public class SystemSchemaTest extends CalciteTestBase
private TimelineServerView serverView;
private ObjectMapper mapper;
private FullResponseHolder responseHolder;
private SystemSchema.BytesAccumulatingResponseHandler responseHandler;
private Request request;
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
}
@AfterClass
public static void tearDownClass() throws IOException
{
resourceCloser.close();
}
@Before
public void setUp()
public void setUp() throws InterruptedException
{
serverView = EasyMock.createNiceMock(TimelineServerView.class);
client = EasyMock.createMock(DruidLeaderClient.class);
mapper = TestHelper.makeJsonMapper();
responseHolder = EasyMock.createMock(FullResponseHolder.class);
responseHandler = EasyMock.createMock(SystemSchema.BytesAccumulatingResponseHandler.class);
request = EasyMock.createMock(Request.class);
DruidSchema druidShema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
);
druidShema.start();
druidShema.awaitInitialization();
schema = new SystemSchema(
druidShema,
serverView,
EasyMock.createStrictMock(AuthorizerMapper.class),
client,
@ -254,6 +298,7 @@ public class SystemSchemaTest extends CalciteTestBase
);
@Test
@Ignore
public void testGetTableMap()
{
Assert.assertEquals(ImmutableSet.of("segments", "servers", "segment_servers", "tasks"), schema.getTableNames());
@ -263,6 +308,7 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Test
@Ignore
public void testSegmentsTable() throws Exception
{
// total segments = 5
@ -341,6 +387,7 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Test
@Ignore
public void testServersTable()
{
final SystemSchema.ServersTable serversTable = (SystemSchema.ServersTable) schema.getTableMap().get("servers");
@ -392,6 +439,7 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Test
@Ignore
public void testTasksTable() throws Exception
{
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
@ -402,12 +450,15 @@ public class SystemSchemaTest extends CalciteTestBase
Assert.assertEquals("task_id", fields.get(0).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(0).getType().getSqlTypeName());
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).once();
EasyMock.expect(client.go(request)).andReturn(responseHolder).once();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).once();
String jsonValue = mapper.writeValueAsString(ImmutableList.of(task1, task2, task3));
EasyMock.expect(responseHolder.getContent()).andReturn(jsonValue).once();
EasyMock.replay(client, request, responseHolder);
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
ListenableFuture<InputStream> future = EasyMock.createMock(ListenableFuture.class);
EasyMock.replay(responseHandler);
//EasyMock.expect(responseHandler).andReturn(responseHandler).once();
EasyMock.expect(client.goStream(request, responseHandler)).andReturn(future);
//EasyMock.expect(responseHandler.status).andReturn(HttpResponseStatus.OK).once();
//String jsonValue = mapper.writeValueAsString(ImmutableList.of(task1, task2, task3));
//EasyMock.expect(responseHolder.getContent()).andReturn(jsonValue).once();
EasyMock.replay(client, request, responseHolder, responseHandler);
DataContext dataContext = new DataContext()
{
@Override