Merge pull request #319 from metamx/select

A Select Query for Druid
This commit is contained in:
fjy 2014-01-20 15:31:25 -08:00
commit 1057e2e165
36 changed files with 3088 additions and 2030 deletions

View File

@ -25,6 +25,7 @@ import com.metamx.common.guava.Sequence;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.select.SelectQuery;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeseries.TimeseriesQuery;
@ -42,6 +43,7 @@ import java.util.Map;
@JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class),
@JsonSubTypes.Type(name = Query.GROUP_BY, value = GroupByQuery.class),
@JsonSubTypes.Type(name = Query.SEGMENT_METADATA, value = SegmentMetadataQuery.class),
@JsonSubTypes.Type(name = Query.SELECT, value = SelectQuery.class),
@JsonSubTypes.Type(name = Query.TOPN, value = TopNQuery.class)
})
public interface Query<T>
@ -51,6 +53,7 @@ public interface Query<T>
public static final String TIME_BOUNDARY = "timeBoundary";
public static final String GROUP_BY = "groupBy";
public static final String SEGMENT_METADATA = "segmentMetadata";
public static final String SELECT = "select";
public static final String TOPN = "topN";
public String getDataSource();

View File

@ -0,0 +1,117 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import org.joda.time.DateTime;
import java.util.Map;
/**
*/
public class EventHolder
{
public static final String timestampKey = "timestamp";
private final String segmentId;
private final int offset;
private final Map<String, Object> event;
@JsonCreator
public EventHolder(
@JsonProperty("segmentId") String segmentId,
@JsonProperty("offset") int offset,
@JsonProperty("event") Map<String, Object> event
)
{
this.segmentId = segmentId;
this.offset = offset;
this.event = event;
}
public DateTime getTimestamp()
{
return (DateTime) event.get(timestampKey);
}
@JsonProperty
public String getSegmentId()
{
return segmentId;
}
@JsonProperty
public int getOffset()
{
return offset;
}
@JsonProperty
public Map<String, Object> getEvent()
{
return event;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EventHolder that = (EventHolder) o;
if (offset != that.offset) {
return false;
}
if (!Maps.difference(event, ((EventHolder) o).event).areEqual()) {
return false;
}
if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = segmentId != null ? segmentId.hashCode() : 0;
result = 31 * result + offset;
result = 31 * result + (event != null ? event.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "EventHolder{" +
"segmentId='" + segmentId + '\'' +
", offset=" + offset +
", event=" + event +
'}';
}
}

View File

@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
/**
*/
public class PagingSpec
{
private final LinkedHashMap<String, Integer> pagingIdentifiers;
private final int threshold;
@JsonCreator
public PagingSpec(
@JsonProperty("pagingIdentifiers") LinkedHashMap<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold
)
{
this.pagingIdentifiers = pagingIdentifiers;
this.threshold = threshold;
}
@JsonProperty
public Map<String, Integer> getPagingIdentifiers()
{
return pagingIdentifiers;
}
@JsonProperty
public int getThreshold()
{
return threshold;
}
public byte[] getCacheKey()
{
final byte[][] pagingKeys = new byte[pagingIdentifiers.size()][];
final byte[][] pagingValues = new byte[pagingIdentifiers.size()][];
int index = 0;
int pagingKeysSize = 0;
int pagingValuesSize = 0;
for (Map.Entry<String, Integer> entry : pagingIdentifiers.entrySet()) {
pagingKeys[index] = entry.getKey().getBytes();
pagingValues[index] = ByteBuffer.allocate(Ints.BYTES).putInt(entry.getValue()).array();
pagingKeysSize += pagingKeys[index].length;
pagingValuesSize += Ints.BYTES;
index++;
}
final byte[] thresholdBytes = ByteBuffer.allocate(Ints.BYTES).putInt(threshold).array();
final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length);
for (byte[] pagingKey : pagingKeys) {
queryCacheKey.put(pagingKey);
}
for (byte[] pagingValue : pagingValues) {
queryCacheKey.put(pagingValue);
}
queryCacheKey.put(thresholdBytes);
return queryCacheKey.array();
}
@Override
public String toString()
{
return "PagingSpec{" +
"pagingIdentifiers=" + pagingIdentifiers +
", threshold=" + threshold +
'}';
}
}

View File

@ -0,0 +1,77 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.metamx.common.guava.nary.BinaryFn;
import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import org.joda.time.DateTime;
/**
*/
public class SelectBinaryFn
implements BinaryFn<Result<SelectResultValue>, Result<SelectResultValue>, Result<SelectResultValue>>
{
private final QueryGranularity gran;
private final PagingSpec pagingSpec;
public SelectBinaryFn(
QueryGranularity granularity,
PagingSpec pagingSpec
)
{
this.gran = granularity;
this.pagingSpec = pagingSpec;
}
@Override
public Result<SelectResultValue> apply(
Result<SelectResultValue> arg1, Result<SelectResultValue> arg2
)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
final DateTime timestamp = (gran instanceof AllGranularity)
? arg1.getTimestamp()
: gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis()));
SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec.getThreshold());
SelectResultValue arg1Val = arg1.getValue();
SelectResultValue arg2Val = arg2.getValue();
for (EventHolder event : arg1Val) {
builder.addEntry(event);
}
for (EventHolder event : arg2Val) {
builder.addEntry(event);
}
return builder.build();
}
}

View File

@ -0,0 +1,149 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.QuerySegmentSpec;
import java.util.List;
import java.util.Map;
/**
*/
@JsonTypeName("select")
public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
{
private final DimFilter dimFilter;
private final QueryGranularity granularity;
private final List<String> dimensions;
private final List<String> metrics;
private final PagingSpec pagingSpec;
@JsonCreator
public SelectQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("pagingSpec") PagingSpec pagingSpec,
@JsonProperty("context") Map<String, String> context
)
{
super(dataSource, querySegmentSpec, context);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;
this.metrics = metrics;
this.pagingSpec = pagingSpec;
}
@Override
public boolean hasFilters()
{
return dimFilter != null;
}
@Override
public String getType()
{
return Query.SELECT;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}
@JsonProperty
public QueryGranularity getGranularity()
{
return granularity;
}
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public PagingSpec getPagingSpec()
{
return pagingSpec;
}
@JsonProperty
public List<String> getMetrics()
{
return metrics;
}
public SelectQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return new SelectQuery(
getDataSource(),
querySegmentSpec,
dimFilter,
granularity,
dimensions,
metrics,
pagingSpec,
getContext()
);
}
public SelectQuery withOverriddenContext(Map<String, String> contextOverrides)
{
return new SelectQuery(
getDataSource(),
getQuerySegmentSpec(),
dimFilter,
granularity,
dimensions,
metrics,
pagingSpec,
computeOverridenContext(contextOverrides)
);
}
@Override
public String toString()
{
return "SelectQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", dimFilter=" + dimFilter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", metrics=" + metrics +
", pagingSpec=" + pagingSpec +
'}';
}
}

View File

@ -0,0 +1,167 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.Result;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import org.joda.time.DateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class SelectQueryEngine
{
public Sequence<Result<SelectResultValue>> process(final SelectQuery query, final Segment segment)
{
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Result<SelectResultValue>, Iterator<Result<SelectResultValue>>>()
{
@Override
public Iterator<Result<SelectResultValue>> make()
{
final StorageAdapter adapter = segment.asStorageAdapter();
final Iterable<String> dims;
if (query.getDimensions() == null || query.getDimensions().isEmpty()) {
dims = adapter.getAvailableDimensions();
} else {
dims = query.getDimensions();
}
final Iterable<String> metrics;
if (query.getMetrics() == null || query.getMetrics().isEmpty()) {
metrics = adapter.getAvailableMetrics();
} else {
metrics = query.getMetrics();
}
return QueryRunnerHelper.makeCursorBasedQuery(
adapter,
query.getQuerySegmentSpec().getIntervals(),
Filters.convertDimensionFilters(query.getDimensionsFilter()),
query.getGranularity(),
new Function<Cursor, Result<SelectResultValue>>()
{
@Override
public Result<SelectResultValue> apply(Cursor cursor)
{
final SelectResultValueBuilder builder = new SelectResultValueBuilder(
cursor.getTime(),
query.getPagingSpec()
.getThreshold()
);
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim, dimSelector);
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
}
int startOffset;
if (query.getPagingSpec().getPagingIdentifiers() == null) {
startOffset = 0;
} else {
Integer offset = query.getPagingSpec().getPagingIdentifiers().get(segment.getIdentifier());
startOffset = (offset == null) ? 0 : offset;
}
cursor.advanceTo(startOffset);
int offset = 0;
while (!cursor.isDone() && offset < query.getPagingSpec().getThreshold()) {
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.getTimestamp()));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
builder.addEntry(
new EventHolder(
segment.getIdentifier(),
startOffset + offset,
theEvent
)
);
cursor.advance();
offset++;
}
return builder.build();
}
}
).iterator();
}
@Override
public void cleanup(Iterator<Result<SelectResultValue>> toClean)
{
// https://github.com/metamx/druid/issues/128
while (toClean.hasNext()) {
toClean.next();
}
}
}
);
}
}

View File

@ -0,0 +1,291 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery>
{
private static final byte SELECT_QUERY = 0x13;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
};
private static final TypeReference<Result<SelectResultValue>> TYPE_REFERENCE =
new TypeReference<Result<SelectResultValue>>()
{
};
private final QueryConfig config;
private final ObjectMapper jsonMapper;
@Inject
public SelectQueryQueryToolChest(QueryConfig config, ObjectMapper jsonMapper)
{
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public QueryRunner<Result<SelectResultValue>> mergeResults(QueryRunner<Result<SelectResultValue>> queryRunner)
{
return new ResultMergeQueryRunner<Result<SelectResultValue>>(queryRunner)
{
@Override
protected Ordering<Result<SelectResultValue>> makeOrdering(Query<Result<SelectResultValue>> query)
{
return Ordering.from(
new ResultGranularTimestampComparator<SelectResultValue>(
((SelectQuery) query).getGranularity()
)
);
}
@Override
protected BinaryFn<Result<SelectResultValue>, Result<SelectResultValue>, Result<SelectResultValue>> createMergeFn(
Query<Result<SelectResultValue>> input
)
{
SelectQuery query = (SelectQuery) input;
return new SelectBinaryFn(
query.getGranularity(),
query.getPagingSpec()
);
}
};
}
@Override
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<Result<SelectResultValue>>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SelectQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
}
@Override
public Function<Result<SelectResultValue>, Result<SelectResultValue>> makeMetricManipulatorFn(
final SelectQuery query, final MetricManipulationFn fn
)
{
return Functions.identity();
}
@Override
public TypeReference<Result<SelectResultValue>> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Result<SelectResultValue>, Object, SelectQuery> getCacheStrategy(final SelectQuery query)
{
return new CacheStrategy<Result<SelectResultValue>, Object, SelectQuery>()
{
@Override
public byte[] computeCacheKey(SelectQuery query)
{
final DimFilter dimFilter = query.getDimensionsFilter();
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
final byte[] granularityBytes = query.getGranularity().cacheKey();
final Set<String> dimensions = Sets.newTreeSet();
if (query.getDimensions() != null) {
dimensions.addAll(query.getDimensions());
}
final byte[][] dimensionsBytes = new byte[dimensions.size()][];
int dimensionsBytesSize = 0;
int index = 0;
for (String dimension : dimensions) {
dimensionsBytes[index] = dimension.getBytes();
dimensionsBytesSize += dimensionsBytes[index].length;
++index;
}
final Set<String> metrics = Sets.newTreeSet();
if (query.getMetrics() != null) {
dimensions.addAll(query.getMetrics());
}
final byte[][] metricBytes = new byte[metrics.size()][];
int metricBytesSize = 0;
index = 0;
for (String metric : metrics) {
metricBytes[index] = metric.getBytes();
metricBytesSize += metricBytes[index].length;
++index;
}
final ByteBuffer queryCacheKey = ByteBuffer
.allocate(
1
+ granularityBytes.length
+ filterBytes.length
+ query.getPagingSpec().getCacheKey().length
+ dimensionsBytesSize
+ metricBytesSize
)
.put(SELECT_QUERY)
.put(granularityBytes)
.put(filterBytes)
.put(query.getPagingSpec().getCacheKey());
for (byte[] dimensionsByte : dimensionsBytes) {
queryCacheKey.put(dimensionsByte);
}
for (byte[] metricByte : metricBytes) {
queryCacheKey.put(metricByte);
}
return queryCacheKey.array();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Result<SelectResultValue>, Object> prepareForCache()
{
return new Function<Result<SelectResultValue>, Object>()
{
@Override
public Object apply(final Result<SelectResultValue> input)
{
return Arrays.asList(
input.getTimestamp().getMillis(),
input.getValue().getPagingIdentifiers(),
input.getValue().getEvents()
);
}
};
}
@Override
public Function<Object, Result<SelectResultValue>> pullFromCache()
{
return new Function<Object, Result<SelectResultValue>>()
{
private final QueryGranularity granularity = query.getGranularity();
@Override
public Result<SelectResultValue> apply(Object input)
{
List<Object> results = (List<Object>) input;
Iterator<Object> resultIter = results.iterator();
DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue());
return new Result<SelectResultValue>(
timestamp,
new SelectResultValue(
(Map<String, Integer>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<Map<String, Integer>>()
{
}
),
(List<EventHolder>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<List<EventHolder>>()
{
}
)
)
);
}
};
}
@Override
public Sequence<Result<SelectResultValue>> mergeSequences(Sequence<Sequence<Result<SelectResultValue>>> seqOfSequences)
{
return new MergeSequence<Result<SelectResultValue>>(getOrdering(), seqOfSequences);
}
};
}
@Override
public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(QueryRunner<Result<SelectResultValue>> runner)
{
return new IntervalChunkingQueryRunner<Result<SelectResultValue>>(runner, config.getChunkPeriod());
}
public Ordering<Result<SelectResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -0,0 +1,106 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.segment.Segment;
import java.util.concurrent.ExecutorService;
/**
*/
public class SelectQueryRunnerFactory
implements QueryRunnerFactory<Result<SelectResultValue>, SelectQuery>
{
public static SelectQueryRunnerFactory create(ObjectMapper jsonMapper)
{
return new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper),
new SelectQueryEngine()
);
}
private final SelectQueryQueryToolChest toolChest;
private final SelectQueryEngine engine;
@Inject
public SelectQueryRunnerFactory(
SelectQueryQueryToolChest toolChest,
SelectQueryEngine engine
)
{
this.toolChest = toolChest;
this.engine = engine;
}
@Override
public QueryRunner<Result<SelectResultValue>> createRunner(final Segment segment)
{
return new SelectQueryRunner(engine, segment);
}
@Override
public QueryRunner<Result<SelectResultValue>> mergeRunners(
ExecutorService queryExecutor, Iterable<QueryRunner<Result<SelectResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<Result<SelectResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
);
}
@Override
public QueryToolChest<Result<SelectResultValue>, SelectQuery> getToolchest()
{
return toolChest;
}
private static class SelectQueryRunner implements QueryRunner<Result<SelectResultValue>>
{
private final SelectQueryEngine engine;
private final Segment segment;
private SelectQueryRunner(SelectQueryEngine engine, Segment segment)
{
this.engine = engine;
this.segment = segment;
}
@Override
public Sequence<Result<SelectResultValue>> run(Query<Result<SelectResultValue>> input)
{
if (!(input instanceof SelectQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SelectQuery.class);
}
return engine.process((SelectQuery) input, segment);
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class SelectResultValue implements Iterable<EventHolder>
{
private final Map<String, Integer> pagingIdentifiers;
private final List<EventHolder> events;
@JsonCreator
public SelectResultValue(
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("events") List<EventHolder> events)
{
this.pagingIdentifiers = pagingIdentifiers;
this.events = events;
}
@JsonProperty
public Map<String, Integer> getPagingIdentifiers()
{
return pagingIdentifiers;
}
@JsonProperty
public List<EventHolder> getEvents()
{
return events;
}
@Override
public Iterator<EventHolder> iterator()
{
return events.iterator();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SelectResultValue that = (SelectResultValue) o;
if (events != null ? !events.equals(that.events) : that.events != null) {
return false;
}
if (pagingIdentifiers != null
? !pagingIdentifiers.equals(that.pagingIdentifiers)
: that.pagingIdentifiers != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = pagingIdentifiers != null ? pagingIdentifiers.hashCode() : 0;
result = 31 * result + (events != null ? events.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "SelectResultValue{" +
"pagingIdentifiers=" + pagingIdentifiers +
", events=" + events +
'}';
}
}

View File

@ -0,0 +1,98 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.primitives.Longs;
import io.druid.query.Result;
import org.joda.time.DateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
*/
public class SelectResultValueBuilder
{
private static final Comparator<EventHolder> comparator = new Comparator<EventHolder>()
{
@Override
public int compare(EventHolder o1, EventHolder o2)
{
int retVal = Longs.compare(o1.getTimestamp().getMillis(), o2.getTimestamp().getMillis());
if (retVal == 0) {
retVal = o1.getSegmentId().compareTo(o2.getSegmentId());
}
if (retVal == 0) {
retVal = Integer.compare(o1.getOffset(), o2.getOffset());
}
return retVal;
}
};
private final DateTime timestamp;
private MinMaxPriorityQueue<EventHolder> pQueue = null;
public SelectResultValueBuilder(
DateTime timestamp,
int threshold
)
{
this.timestamp = timestamp;
instantiatePQueue(threshold, comparator);
}
public void addEntry(
EventHolder event
)
{
pQueue.add(event);
}
public Result<SelectResultValue> build()
{
// Pull out top aggregated values
List<EventHolder> values = Lists.newArrayListWithCapacity(pQueue.size());
Map<String, Integer> pagingIdentifiers = Maps.newLinkedHashMap();
while (!pQueue.isEmpty()) {
EventHolder event = pQueue.remove();
pagingIdentifiers.put(event.getSegmentId(), event.getOffset());
values.add(event);
}
return new Result<SelectResultValue>(
timestamp,
new SelectResultValue(pagingIdentifiers, values)
);
}
private void instantiatePQueue(int threshold, final Comparator comparator)
{
this.pQueue = MinMaxPriorityQueue.orderedBy(comparator).maximumSize(threshold).create();
}
}

View File

@ -24,6 +24,7 @@ package io.druid.segment;
*/
public interface ColumnSelectorFactory
{
public TimestampColumnSelector makeTimestampColumnSelector();
public DimensionSelector makeDimensionSelector(String dimensionName);
public FloatColumnSelector makeFloatColumnSelector(String columnName);
public ObjectColumnSelector makeObjectColumnSelector(String columnName);

View File

@ -21,10 +21,12 @@ package io.druid.segment;import org.joda.time.DateTime;
/**
*/
public interface Cursor extends ColumnSelectorFactory
{
public DateTime getTime();
public void advance();
public void advanceTo(int offset);
public boolean isDone();
public void reset();
}

View File

@ -30,18 +30,21 @@ import java.io.IOException;
public class IncrementalIndexSegment implements Segment
{
private final IncrementalIndex index;
private final String segmentIdentifier;
public IncrementalIndexSegment(
IncrementalIndex index
IncrementalIndex index,
String segmentIdentifier
)
{
this.index = index;
this.segmentIdentifier = segmentIdentifier;
}
@Override
public String getIdentifier()
{
throw new UnsupportedOperationException();
return segmentIdentifier;
}
@Override

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.collect.MoreIterators;
import com.metamx.common.guava.FunctionalIterable;
@ -77,6 +78,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return index.getAvailableDimensions();
}
@Override
public Iterable<String> getAvailableMetrics()
{
return Sets.difference(Sets.newHashSet(index.getColumnNames()), Sets.newHashSet(index.getAvailableDimensions()));
}
@Override
public int getDimensionCardinality(String dimension)
{
@ -224,6 +231,16 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
cursorOffset.increment();
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
@ -236,6 +253,19 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
cursorOffset = initOffset.clone();
}
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return timestamps.getLongSingleValueRow(cursorOffset.getOffset());
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
@ -249,8 +279,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (column == null) {
return null;
}
else if (columnDesc.getCapabilities().hasMultipleValues()) {
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
return new DimensionSelector()
{
@Override
@ -608,6 +637,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
++currRow;
}
@Override
public void advanceTo(int offset)
{
currRow += offset;
}
@Override
public boolean isDone()
{
@ -620,6 +655,19 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
currRow = initRow;
}
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return timestamps.getLongSingleValueRow(currRow);
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
@ -633,8 +681,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (dict == null) {
return null;
}
else if (column.getCapabilities().hasMultipleValues()) {
} else if (column.getCapabilities().hasMultipleValues()) {
return new DimensionSelector()
{
@Override

View File

@ -17,7 +17,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;import io.druid.segment.data.Indexed;
package io.druid.segment;
import io.druid.segment.data.Indexed;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -28,6 +30,7 @@ public interface StorageAdapter extends CursorFactory
public String getSegmentIdentifier();
public Interval getInterval();
public Indexed<String> getAvailableDimensions();
public Iterable<String> getAvailableMetrics();
public int getDimensionCardinality(String dimension);
public DateTime getMinTime();
public DateTime getMaxTime();

View File

@ -0,0 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
/**
*/
public interface TimestampColumnSelector
{
public long getTimestamp();
}

View File

@ -45,6 +45,7 @@ import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
@ -197,6 +198,19 @@ public class IncrementalIndex implements Iterable<Row>
aggs[i] = agg.factorize(
new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return in.getTimestampFromEpoch();
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{

View File

@ -1,3 +1,4 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
@ -38,6 +39,7 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
@ -87,6 +89,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new ListIndexed<String>(index.getDimensions(), String.class);
}
@Override
public Iterable<String> getAvailableMetrics()
{
return index.getMetricNames();
}
@Override
public int getDimensionCardinality(String dimension)
{
@ -205,6 +213,16 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
@ -237,6 +255,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return currEntry.getKey().getTimestamp();
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{

View File

@ -48,11 +48,13 @@ import java.util.List;
*/
public class QueryRunnerTestHelper
{
public static final String segmentId= "testSegment";
public static final String dataSource = "testing";
public static final QueryGranularity dayGran = QueryGranularity.DAY;
public static final QueryGranularity allGran = QueryGranularity.ALL;
public static final String providerDimension = "proVider";
public static final String qualityDimension = "quality";
public static final String placementDimension = "placement";
public static final String placementishDimension = "placementish";
public static final String indexMetric = "index";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
@ -110,13 +112,13 @@ public class QueryRunnerTestHelper
return Arrays.asList(
new Object[][]{
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex))
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mMappedTestIndex))
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mergedRealtimeIndex))
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
}
}
);

View File

@ -47,7 +47,7 @@ public class SegmentAnalyzerTest
public void testIncrementalDoesNotWork() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex())
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), null)
);
Assert.assertEquals(0, results.size());

View File

@ -0,0 +1,224 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
/**
*/
public class SelectBinaryFnTest
{
private static final String segmentId1 = "testSegment";
private static final String segmentId2 = "testSegment";
@Test
public void testApply() throws Exception
{
SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularity.ALL, new PagingSpec(null, 5));
Result<SelectResultValue> res1 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Arrays.asList(
new EventHolder(
segmentId1,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"first"
)
),
new EventHolder(
segmentId1,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T03"),
"dim",
"fourth"
)
),
new EventHolder(
segmentId1,
2,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T05"),
"dim",
"sixth"
)
)
)
)
);
Result<SelectResultValue> res2 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Arrays.asList(
new EventHolder(
segmentId2,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"second"
)
),
new EventHolder(
segmentId2,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T02"),
"dim",
"third"
)
),
new EventHolder(
segmentId2,
2,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T04"),
"dim",
"fifth"
)
)
)
)
);
Result<SelectResultValue> merged = binaryFn.apply(res1, res2);
Assert.assertEquals(res1.getTimestamp(), merged.getTimestamp());
LinkedHashMap<String, Integer> expectedPageIds = Maps.newLinkedHashMap();
expectedPageIds.put(segmentId1, 0);
expectedPageIds.put(segmentId2, 0);
expectedPageIds.put(segmentId2, 1);
expectedPageIds.put(segmentId1, 1);
expectedPageIds.put(segmentId2, 2);
Iterator<String> exSegmentIter = expectedPageIds.keySet().iterator();
Iterator<String> acSegmentIter = merged.getValue().getPagingIdentifiers().keySet().iterator();
verifyIters(exSegmentIter, acSegmentIter);
Iterator<Integer> exOffsetIter = expectedPageIds.values().iterator();
Iterator<Integer> acOffsetIter = merged.getValue().getPagingIdentifiers().values().iterator();
verifyIters(exOffsetIter, acOffsetIter);
List<EventHolder> exEvents = Arrays.<EventHolder>asList(
new EventHolder(
segmentId1,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"), "dim", "first"
)
),
new EventHolder(
segmentId2,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"second"
)
),
new EventHolder(
segmentId2,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T02"),
"dim",
"third"
)
),
new EventHolder(
segmentId1,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T03"),
"dim",
"fourth"
)
),
new EventHolder(
segmentId2,
2,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T04"),
"dim",
"fifth"
)
)
);
List<EventHolder> acEvents = merged.getValue().getEvents();
verifyEvents(exEvents, acEvents);
}
private void verifyIters(Iterator iter1, Iterator iter2)
{
while (iter1.hasNext()) {
Assert.assertEquals(iter1.next(), iter2.next());
}
if (iter2.hasNext()) {
throw new ISE("This should be empty!");
}
}
private void verifyEvents(List<EventHolder> events1, List<EventHolder> events2)
{
Iterator<EventHolder> ex = events1.iterator();
Iterator<EventHolder> ac = events2.iterator();
verifyIters(ex, ac);
}
}

View File

@ -0,0 +1,403 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.LegacySegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
@RunWith(Parameterized.class)
public class SelectQueryRunnerTest
{
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
SelectQueryRunnerFactory.create(new DefaultObjectMapper())
);
}
private static final String providerLowercase = "provider";
private final QueryRunner runner;
public SelectQueryRunnerTest(
QueryRunner runner
)
{
this.runner = runner;
}
@Test
public void testFullOnSelect()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new PagingSpec(null, 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
0,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.qualityDimension, "automotive")
.put(QueryRunnerTestHelper.placementDimension, "preferred")
.put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("a", "preferred"))
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
1,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.qualityDimension, "business")
.put(QueryRunnerTestHelper.placementDimension, "preferred")
.put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("b", "preferred"))
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
2,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.qualityDimension, "entertainment")
.put(QueryRunnerTestHelper.placementDimension, "preferred")
.put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("e", "preferred"))
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
@Test
public void testSelectWithDimsAndMets()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
Lists.<String>newArrayList(providerLowercase),
Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric),
new PagingSpec(null, 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
0,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
1,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
2,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
@Test
public void testSelectPagination()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
Lists.<String>newArrayList(QueryRunnerTestHelper.qualityDimension),
Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric),
new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
3,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "health")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
4,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "mezzanine")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
5,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "news")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
@Test
public void testFullOnSelectWithFilter()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")),
new SelectorDimFilter(QueryRunnerTestHelper.providerDimension, "spot"),
QueryRunnerTestHelper.dayGran,
Lists.<String>newArrayList(QueryRunnerTestHelper.qualityDimension),
Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric),
new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
3,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "health")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
4,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "mezzanine")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
5,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "news")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
),
new Result<SelectResultValue>(
new DateTime("2011-01-13T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
3,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "health")
.put(QueryRunnerTestHelper.indexMetric, 114.947403F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
4,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "mezzanine")
.put(QueryRunnerTestHelper.indexMetric, 104.465767F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
5,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "news")
.put(QueryRunnerTestHelper.indexMetric, 102.851683F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
private static void verify(
Iterable<Result<SelectResultValue>> expectedResults,
Iterable<Result<SelectResultValue>> actualResults
)
{
Iterator<Result<SelectResultValue>> expectedIter = expectedResults.iterator();
Iterator<Result<SelectResultValue>> actualIter = actualResults.iterator();
while (expectedIter.hasNext()) {
Result<SelectResultValue> expected = expectedIter.next();
Result<SelectResultValue> actual = actualIter.next();
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
for (Map.Entry<String, Integer> entry : expected.getValue().getPagingIdentifiers().entrySet()) {
Assert.assertEquals(entry.getValue(), actual.getValue().getPagingIdentifiers().get(entry.getKey()));
}
Iterator<EventHolder> expectedEvts = expected.getValue().getEvents().iterator();
Iterator<EventHolder> actualEvts = actual.getValue().getEvents().iterator();
while (expectedEvts.hasNext()) {
EventHolder exHolder = expectedEvts.next();
EventHolder acHolder = actualEvts.next();
Assert.assertEquals(exHolder.getTimestamp(), acHolder.getTimestamp());
Assert.assertEquals(exHolder.getOffset(), acHolder.getOffset());
for (Map.Entry<String, Object> ex : exHolder.getEvent().entrySet()) {
Object actVal = acHolder.getEvent().get(ex.getKey());
// work around for current II limitations
if (acHolder.getEvent().get(ex.getKey()) instanceof Double) {
actVal = ((Double) actVal).floatValue();
}
Assert.assertEquals(ex.getValue(), actVal);
}
}
if (actualEvts.hasNext()) {
throw new ISE("This event iterator should be exhausted!");
}
}
if (actualIter.hasNext()) {
throw new ISE("This iterator should be exhausted!");
}
}
}

View File

@ -90,7 +90,7 @@ public class TimeseriesQueryRunnerBonusTest
final QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
final QueryRunner<Result<TimeseriesResultValue>> runner = makeQueryRunner(
factory,
new IncrementalIndexSegment(index)
new IncrementalIndexSegment(index, null)
);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()

View File

@ -48,7 +48,7 @@ public class TopNQueryRunnerTestHelper
return Arrays.asList(
new Object[][]{
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex))
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, null))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mMappedTestIndex))

View File

@ -83,7 +83,7 @@ public class SpatialFilterBonusTest
return Arrays.asList(
new Object[][]{
{
new IncrementalIndexSegment(rtIndex)
new IncrementalIndexSegment(rtIndex, null)
},
{
new QueryableIndexSegment(null, mMappedTestIndex)

View File

@ -83,7 +83,7 @@ public class SpatialFilterTest
return Arrays.asList(
new Object[][]{
{
new IncrementalIndexSegment(rtIndex)
new IncrementalIndexSegment(rtIndex, null)
},
{
new QueryableIndexSegment(null, mMappedTestIndex)

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,8 @@ import io.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.search.SearchQueryRunnerFactory;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.select.SelectQuery;
import io.druid.query.select.SelectQueryRunnerFactory;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQuery;
@ -51,6 +53,7 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule
.put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class)
.put(SegmentMetadataQuery.class, SegmentMetadataQueryRunnerFactory.class)
.put(GroupByQuery.class, GroupByQueryRunnerFactory.class)
.put(SelectQuery.class, SelectQueryRunnerFactory.class)
.put(TopNQuery.class, TopNQueryRunnerFactory.class)
.build();

View File

@ -34,6 +34,8 @@ import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.select.SelectQuery;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQuery;
@ -55,6 +57,7 @@ public class QueryToolChestModule implements Module
.put(TimeBoundaryQuery.class, TimeBoundaryQueryQueryToolChest.class)
.put(SegmentMetadataQuery.class, SegmentMetadataQueryQueryToolChest.class)
.put(GroupByQuery.class, GroupByQueryQueryToolChest.class)
.put(SelectQuery.class, SelectQueryQueryToolChest.class)
.put(TopNQuery.class, TopNQueryQueryToolChest.class)
.build();

View File

@ -33,11 +33,12 @@ public class FireHydrant
public FireHydrant(
IncrementalIndex index,
int count
int count,
String segmentIdentifier
)
{
this.index = index;
this.adapter = new IncrementalIndexSegment(index);
this.adapter = new IncrementalIndexSegment(index, segmentIdentifier);
this.count = count;
}

View File

@ -0,0 +1,197 @@
package io.druid.segment.realtime.plumber;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexGranularity;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class FlushingPlumber extends RealtimePlumber
{
private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class);
private final Duration flushDuration;
private volatile ScheduledExecutorService flushScheduledExec = null;
private volatile boolean stopped = false;
public FlushingPlumber(
Duration flushDuration,
Period windowPeriod,
File basePersistDirectory,
IndexGranularity segmentGranularity,
Schema schema,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
VersioningPolicy versioningPolicy,
int maxPendingPersists
)
{
super(
windowPeriod,
basePersistDirectory,
segmentGranularity,
schema,
metrics,
rejectionPolicy,
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
versioningPolicy,
null,
null,
null,
maxPendingPersists
);
this.flushDuration = flushDuration;
}
@Override
public void startJob()
{
log.info("Starting job for %s", getSchema().getDataSource());
computeBaseDir(getSchema()).mkdirs();
initializeExecutors();
if (flushScheduledExec == null) {
flushScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("flushing_scheduled_%d")
.build()
);
}
bootstrapSinksFromDisk();
startFlushThread();
}
protected void flushAfterDuration(final long truncatedTime, final Sink sink)
{
log.info(
"Abandoning segment %s at %s",
sink.getSegment().getIdentifier(),
new DateTime().plusMillis((int) flushDuration.getMillis())
);
ScheduledExecutors.scheduleWithFixedDelay(
flushScheduledExec,
flushDuration,
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call() throws Exception
{
log.info("Abandoning segment %s", sink.getSegment().getIdentifier());
abandonSegment(truncatedTime, sink);
return ScheduledExecutors.Signal.STOP;
}
}
);
}
private void startFlushThread()
{
final long truncatedNow = getSegmentGranularity().truncate(new DateTime()).getMillis();
final long windowMillis = getWindowPeriod().toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis)
)
);
ScheduledExecutors
.scheduleAtFixedRate(
flushScheduledExec,
new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis),
new Duration(truncatedNow, getSegmentGranularity().increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(
"%s-flusher-%d",
getSchema().getDataSource(),
getSchema().getShardSpec().getPartitionNum()
)
)
{
@Override
public ScheduledExecutors.Signal doCall()
{
if (stopped) {
log.info("Stopping flusher thread");
return ScheduledExecutors.Signal.STOP;
}
long minTimestamp = getSegmentGranularity().truncate(
getRejectionPolicy().getCurrMaxTime().minus(windowMillis)
).getMillis();
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : getSinks().entrySet()) {
final Long intervalStart = entry.getKey();
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] to flush.", entry);
sinksToPush.add(entry);
}
}
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
flushAfterDuration(entry.getKey(), entry.getValue());
}
if (stopped) {
log.info("Stopping flusher thread");
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
@Override
public void finishJob()
{
log.info("Stopping job");
for (final Map.Entry<Long, Sink> entry : getSinks().entrySet()) {
abandonSegment(entry.getKey(), entry.getValue());
}
shutdownExecutors();
if (flushScheduledExec != null) {
flushScheduledExec.shutdown();
}
stopped = true;
}
}

View File

@ -0,0 +1,131 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexGranularity;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.util.concurrent.ExecutorService;
/**
* This plumber just drops segments at the end of a flush duration instead of handing them off. It is only useful if you want to run
* a real time node without the rest of the Druid cluster.
*/
public class FlushingPlumberSchool implements PlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(FlushingPlumberSchool.class);
private final Duration flushDuration;
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final int maxPendingPersists;
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@JsonCreator
public FlushingPlumberSchool(
@JsonProperty("flushDuration") Duration flushDuration,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.flushDuration = flushDuration;
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = maxPendingPersists;
Preconditions.checkArgument(maxPendingPersists > 0, "FlushingPlumberSchool requires maxPendingPersists > 0");
Preconditions.checkNotNull(flushDuration, "FlushingPlumberSchool requires a flushDuration.");
Preconditions.checkNotNull(windowPeriod, "FlushingPlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "FlushingPlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "FlushingPlumberSchool requires a segmentGranularity.");
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{
verifyState();
final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod);
log.info("Creating plumber using rejectionPolicy[%s]", rejectionPolicy);
return new FlushingPlumber(
flushDuration,
windowPeriod,
basePersistDirectory,
segmentGranularity,
schema,
metrics,
rejectionPolicy,
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
versioningPolicy,
maxPendingPersists
);
}
private void verifyState()
{
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");
Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action.");
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
}
}

View File

@ -27,9 +27,10 @@ import io.druid.segment.realtime.Schema;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
@JsonSubTypes.Type(name = "realtime", value = RealtimePlumberSchool.class)
)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "realtime", value = RealtimePlumberSchool.class),
@JsonSubTypes.Type(name = "flushing", value = FlushingPlumberSchool.class)
})
public interface PlumberSchool
{
/**

View File

@ -0,0 +1,735 @@
package io.druid.segment.realtime.plumber;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexGranularity;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class RealtimePlumber implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Schema schema;
private final FireDepartmentMetrics metrics;
private final RejectionPolicy rejectionPolicy;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final VersioningPolicy versioningPolicy;
private final DataSegmentPusher dataSegmentPusher;
private final SegmentPublisher segmentPublisher;
private final ServerView serverView;
private final int maxPendingPersists;
private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
private volatile boolean shuttingDown = false;
private volatile boolean stopped = false;
private volatile ExecutorService persistExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
public RealtimePlumber(
Period windowPeriod,
File basePersistDirectory,
IndexGranularity segmentGranularity,
Schema schema,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
VersioningPolicy versioningPolicy,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
ServerView serverView,
int maxPendingPersists
)
{
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.schema = schema;
this.metrics = metrics;
this.rejectionPolicy = rejectionPolicy;
this.emitter = emitter;
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.versioningPolicy = versioningPolicy;
this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.maxPendingPersists = maxPendingPersists;
}
public Schema getSchema()
{
return schema;
}
public Period getWindowPeriod()
{
return windowPeriod;
}
public IndexGranularity getSegmentGranularity()
{
return segmentGranularity;
}
public VersioningPolicy getVersioningPolicy()
{
return versioningPolicy;
}
public RejectionPolicy getRejectionPolicy()
{
return rejectionPolicy;
}
public Map<Long, Sink> getSinks()
{
return sinks;
}
@Override
public void startJob()
{
computeBaseDir(schema).mkdirs();
initializeExecutors();
bootstrapSinksFromDisk();
registerServerViewCallback();
startPersistThread();
}
@Override
public Sink getSink(long timestamp)
{
if (!rejectionPolicy.accept(timestamp)) {
return null;
}
final long truncatedTime = segmentGranularity.truncate(timestamp);
Sink retVal = sinks.get(truncatedTime);
if (retVal == null) {
final Interval sinkInterval = new Interval(
new DateTime(truncatedTime),
segmentGranularity.increment(new DateTime(truncatedTime))
);
retVal = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval));
try {
segmentAnnouncer.announceSegment(retVal.getSegment());
sinks.put(truncatedTime, retVal);
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal));
}
catch (IOException e) {
log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource())
.addData("interval", retVal.getInterval())
.emit();
}
}
return retVal;
}
@Override
public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolchest.makeMetricBuilder(query);
}
};
List<TimelineObjectHolder<String, Sink>> querySinks = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
querySinks.addAll(sinkTimeline.lookup(interval));
}
return toolchest.mergeResults(
factory.mergeRunners(
queryExecutorService,
FunctionalIterable
.create(querySinks)
.transform(
new Function<TimelineObjectHolder<String, Sink>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
{
final Sink theSink = holder.getObject().getChunk(0).getObject();
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(FireHydrant input)
{
return factory.createRunner(input.getSegment());
}
}
)
)
),
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
theSink.getSegment().getVersion(),
theSink.getSegment().getShardSpec().getPartitionNum()
)
)
);
}
}
)
)
);
}
@Override
public void persist(final Runnable commitRunnable)
{
final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();
for (Sink sink : sinks.values()) {
if (sink.swappable()) {
indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval()));
}
}
log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
persistExecutor.execute(
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
{
@Override
public void doRun()
{
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
}
commitRunnable.run();
}
}
);
}
// Submits persist-n-merge task for a Sink to the persistExecutor
private void persistAndMerge(final long truncatedTime, final Sink sink)
{
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime)
);
persistExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount);
}
}
final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) {
log.info("Skipping already-merged sink: %s", sink);
return;
}
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
if (shuttingDown) {
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
abandonSegment(truncatedTime, sink);
}
}
if (mergedFile != null) {
try {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);
}
@Override
public void finishJob()
{
log.info("Shutting down...");
shuttingDown = true;
for (final Map.Entry<Long, Sink> entry : sinks.entrySet()) {
persistAndMerge(entry.getKey(), entry.getValue());
}
while (!sinks.isEmpty()) {
try {
log.info(
"Cannot shut down yet! Sinks remaining: %s",
Joiner.on(", ").join(
Iterables.transform(
sinks.values(),
new Function<Sink, String>()
{
@Override
public String apply(Sink input)
{
return input.getSegment().getIdentifier();
}
}
)
)
);
synchronized (handoffCondition) {
while (!sinks.isEmpty()) {
handoffCondition.wait();
}
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
shutdownExecutors();
stopped = true;
}
protected void initializeExecutors()
{
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.newBlockingSingleThreaded(
"plumber_persist_%d", maxPendingPersists
);
}
if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_scheduled_%d")
.build()
);
}
}
protected void shutdownExecutors()
{
// scheduledExecutor is shutdown here, but persistExecutor is shutdown when the
// ServerView sends it a new segment callback
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
}
protected void bootstrapSinksFromDisk()
{
File baseDir = computeBaseDir(schema);
if (baseDir == null || !baseDir.exists()) {
return;
}
File[] files = baseDir.listFiles();
if (files == null) {
return;
}
for (File sinkDir : files) {
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
//final File[] sinkFiles = sinkDir.listFiles();
// To avoid reading and listing of "merged" dir
final File[] sinkFiles = sinkDir.listFiles(
new FilenameFilter()
{
@Override
public boolean accept(File dir, String fileName)
{
return !(Ints.tryParse(fileName) == null);
}
}
);
Arrays.sort(
sinkFiles,
new Comparator<File>()
{
@Override
public int compare(File o1, File o2)
{
try {
return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()));
}
catch (NumberFormatException e) {
log.error(e, "Couldn't compare as numbers? [%s][%s]", o1, o2);
return o1.compareTo(o2);
}
}
}
);
try {
List<FireHydrant> hydrants = Lists.newArrayList();
for (File segmentDir : sinkFiles) {
log.info("Loading previously persisted segment at [%s]", segmentDir);
// Although this has been tackled at start of this method.
// Just a doubly-check added to skip "merged" dir. from being added to hydrants
// If 100% sure that this is not needed, this check can be removed.
if (Ints.tryParse(segmentDir.getName()) == null) {
continue;
}
hydrants.add(
new FireHydrant(
new QueryableIndexSegment(
DataSegment.makeDataSegmentIdentifier(
schema.getDataSource(),
sinkInterval.getStart(),
sinkInterval.getEnd(),
versioningPolicy.getVersion(sinkInterval),
schema.getShardSpec()
),
IndexIO.loadIndex(segmentDir)
),
Integer.parseInt(segmentDir.getName())
)
);
}
Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
currSink.getInterval(),
currSink.getVersion(),
new SingleElementPartitionChunk<Sink>(currSink)
);
segmentAnnouncer.announceSegment(currSink.getSegment());
}
catch (IOException e) {
log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource())
.addData("interval", sinkInterval)
.emit();
}
}
}
protected void startPersistThread()
{
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis)
)
);
ScheduledExecutors
.scheduleAtFixedRate(
scheduledExecutor,
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis),
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(
"%s-overseer-%d",
schema.getDataSource(),
schema.getShardSpec().getPartitionNum()
)
)
{
@Override
public ScheduledExecutors.Signal doCall()
{
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
return ScheduledExecutors.Signal.STOP;
}
log.info("Starting merge and push.");
long minTimestamp = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
).getMillis();
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long intervalStart = entry.getKey();
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
}
}
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
persistAndMerge(entry.getKey(), entry.getValue());
}
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
/**
* Unannounces a given sink and removes all local references to it.
*/
protected void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<>(sink)
);
synchronized (handoffCondition) {
handoffCondition.notifyAll();
}
}
catch (IOException e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
protected File computeBaseDir(Schema schema)
{
return new File(basePersistDirectory, schema.getDataSource());
}
protected File computePersistDir(Schema schema, Interval interval)
{
return new File(computeBaseDir(schema), interval.toString().replace("/", "_"));
}
/**
* Persists the given hydrant and returns the number of rows persisted
*
* @param indexToPersist
* @param schema
* @param interval
*
* @return the number of rows persisted
*/
protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
{
if (indexToPersist.hasSwapped()) {
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
);
return 0;
}
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
try {
int numRows = indexToPersist.getIndex().size();
File persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile)
)
);
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
throw Throwables.propagate(e);
}
}
private void registerServerViewCallback()
{
serverView.registerSegmentCallback(
persistExecutor,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment)
{
if (stopped) {
log.info("Unregistering ServerViewCallback");
persistExecutor.shutdown();
return ServerView.CallbackAction.UNREGISTER;
}
if ("realtime".equals(server.getType())) {
return ServerView.CallbackAction.CONTINUE;
}
log.debug("Checking segment[%s] on server[%s]", segment, server);
if (schema.getDataSource().equals(segment.getDataSource())) {
final Interval interval = segment.getInterval();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long sinkKey = entry.getKey();
if (interval.contains(sinkKey)) {
final Sink sink = entry.getValue();
log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server);
final String segmentVersion = segment.getVersion();
final String sinkVersion = sink.getSegment().getVersion();
if (segmentVersion.compareTo(sinkVersion) >= 0) {
log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion);
abandonSegment(sinkKey, sink);
}
}
}
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
}

View File

@ -22,81 +22,36 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Processing;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexGranularity;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class RealtimePlumberSchool implements PlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private static final int defaultPending = 2;
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
private final int maxPendingPersists;
private volatile boolean shuttingDown = false;
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@ -135,9 +90,9 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = maxPendingPersists;
this.maxPendingPersists = (maxPendingPersists > 0) ? maxPendingPersists : defaultPending;
Preconditions.checkArgument(maxPendingPersists > 0);
Preconditions.checkArgument(maxPendingPersists > 0, "RealtimePlumberSchool requires maxPendingPersists > 0");
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
@ -198,564 +153,23 @@ public class RealtimePlumberSchool implements PlumberSchool
final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod);
log.info("Creating plumber using rejectionPolicy[%s]", rejectionPolicy);
return new Plumber()
{
private volatile boolean stopped = false;
private volatile ExecutorService persistExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
@Override
public void startJob()
{
computeBaseDir(schema).mkdirs();
initializeExecutors();
bootstrapSinksFromDisk();
registerServerViewCallback();
startPersistThread();
}
@Override
public Sink getSink(long timestamp)
{
if (!rejectionPolicy.accept(timestamp)) {
return null;
}
final long truncatedTime = segmentGranularity.truncate(timestamp);
Sink retVal = sinks.get(truncatedTime);
if (retVal == null) {
final Interval sinkInterval = new Interval(
new DateTime(truncatedTime),
segmentGranularity.increment(new DateTime(truncatedTime))
);
retVal = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval));
try {
segmentAnnouncer.announceSegment(retVal.getSegment());
sinks.put(truncatedTime, retVal);
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal));
}
catch (IOException e) {
log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource())
.addData("interval", retVal.getInterval())
.emit();
}
}
return retVal;
}
@Override
public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolchest.makeMetricBuilder(query);
}
};
List<TimelineObjectHolder<String, Sink>> querySinks = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
querySinks.addAll(sinkTimeline.lookup(interval));
}
return toolchest.mergeResults(
factory.mergeRunners(
queryExecutorService,
FunctionalIterable
.create(querySinks)
.transform(
new Function<TimelineObjectHolder<String, Sink>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
{
final Sink theSink = holder.getObject().getChunk(0).getObject();
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(FireHydrant input)
{
return factory.createRunner(input.getSegment());
}
}
)
)
),
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
theSink.getSegment().getVersion(),
theSink.getSegment().getShardSpec().getPartitionNum()
)
)
);
}
}
)
)
);
}
@Override
public void persist(final Runnable commitRunnable)
{
final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();
for (Sink sink : sinks.values()) {
if (sink.swappable()) {
indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval()));
}
}
log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
persistExecutor.execute(
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
{
@Override
public void doRun()
{
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
}
commitRunnable.run();
}
}
);
}
// Submits persist-n-merge task for a Sink to the persistExecutor
private void persistAndMerge(final long truncatedTime, final Sink sink)
{
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime)
);
persistExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount);
}
}
final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) {
log.info("Skipping already-merged sink: %s", sink);
return;
}
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
if (shuttingDown) {
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
abandonSegment(truncatedTime, sink);
}
}
if (mergedFile != null) {
try {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);
}
@Override
public void finishJob()
{
log.info("Shutting down...");
shuttingDown = true;
for (final Map.Entry<Long, Sink> entry : sinks.entrySet()) {
persistAndMerge(entry.getKey(), entry.getValue());
}
while (!sinks.isEmpty()) {
try {
log.info(
"Cannot shut down yet! Sinks remaining: %s",
Joiner.on(", ").join(
Iterables.transform(
sinks.values(),
new Function<Sink, String>()
{
@Override
public String apply(Sink input)
{
return input.getSegment().getIdentifier();
}
}
)
)
);
synchronized (handoffCondition) {
while (!sinks.isEmpty()) {
handoffCondition.wait();
}
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
// scheduledExecutor is shutdown here, but persistExecutor is shutdown when the
// ServerView sends it a new segment callback
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
stopped = true;
}
private void initializeExecutors()
{
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.newBlockingSingleThreaded(
"plumber_persist_%d", maxPendingPersists
);
}
if (scheduledExecutor == null) {
scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
}
}
private void bootstrapSinksFromDisk()
{
File baseDir = computeBaseDir(schema);
if (baseDir == null || !baseDir.exists()) {
return;
}
File[] files = baseDir.listFiles();
if (files == null) {
return;
}
for (File sinkDir : files) {
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
//final File[] sinkFiles = sinkDir.listFiles();
// To avoid reading and listing of "merged" dir
final File[] sinkFiles = sinkDir.listFiles(
new FilenameFilter()
{
@Override
public boolean accept(File dir, String fileName)
{
return !(Ints.tryParse(fileName) == null);
}
}
);
Arrays.sort(
sinkFiles,
new Comparator<File>()
{
@Override
public int compare(File o1, File o2)
{
try {
return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()));
}
catch (NumberFormatException e) {
log.error(e, "Couldn't compare as numbers? [%s][%s]", o1, o2);
return o1.compareTo(o2);
}
}
}
);
try {
List<FireHydrant> hydrants = Lists.newArrayList();
for (File segmentDir : sinkFiles) {
log.info("Loading previously persisted segment at [%s]", segmentDir);
// Although this has been tackled at start of this method.
// Just a doubly-check added to skip "merged" dir. from being added to hydrants
// If 100% sure that this is not needed, this check can be removed.
if (Ints.tryParse(segmentDir.getName()) == null) {
continue;
}
hydrants.add(
new FireHydrant(
new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)),
Integer.parseInt(segmentDir.getName())
)
);
}
Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
currSink.getInterval(),
currSink.getVersion(),
new SingleElementPartitionChunk<Sink>(currSink)
);
segmentAnnouncer.announceSegment(currSink.getSegment());
}
catch (IOException e) {
log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource())
.addData("interval", sinkInterval)
.emit();
}
}
}
private void registerServerViewCallback()
{
serverView.registerSegmentCallback(
persistExecutor,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment)
{
if (stopped) {
log.info("Unregistering ServerViewCallback");
persistExecutor.shutdown();
return ServerView.CallbackAction.UNREGISTER;
}
if ("realtime".equals(server.getType())) {
return ServerView.CallbackAction.CONTINUE;
}
log.debug("Checking segment[%s] on server[%s]", segment, server);
if (schema.getDataSource().equals(segment.getDataSource())) {
final Interval interval = segment.getInterval();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long sinkKey = entry.getKey();
if (interval.contains(sinkKey)) {
final Sink sink = entry.getValue();
log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server);
final String segmentVersion = segment.getVersion();
final String sinkVersion = sink.getSegment().getVersion();
if (segmentVersion.compareTo(sinkVersion) >= 0) {
log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion);
abandonSegment(sinkKey, sink);
}
}
}
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
private void startPersistThread()
{
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis)
)
);
ScheduledExecutors
.scheduleAtFixedRate(
scheduledExecutor,
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis),
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(
"%s-overseer-%d",
schema.getDataSource(),
schema.getShardSpec().getPartitionNum()
)
)
{
@Override
public ScheduledExecutors.Signal doCall()
{
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
return ScheduledExecutors.Signal.STOP;
}
log.info("Starting merge and push.");
long minTimestamp = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
).getMillis();
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long intervalStart = entry.getKey();
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
}
}
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
persistAndMerge(entry.getKey(), entry.getValue());
}
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
/**
* Unannounces a given sink and removes all local references to it.
*/
private void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<>(sink)
);
synchronized (handoffCondition) {
handoffCondition.notifyAll();
}
}
catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
};
}
private File computeBaseDir(Schema schema)
{
return new File(basePersistDirectory, schema.getDataSource());
}
private File computePersistDir(Schema schema, Interval interval)
{
return new File(computeBaseDir(schema), interval.toString().replace("/", "_"));
}
/**
* Persists the given hydrant and returns the number of rows persisted
*
* @param indexToPersist
* @param schema
* @param interval
*
* @return the number of rows persisted
*/
private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
{
if (indexToPersist.hasSwapped()) {
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
);
return 0;
}
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
try {
int numRows = indexToPersist.getIndex().size();
File persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile)));
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
throw Throwables.propagate(e);
}
return new RealtimePlumber(
windowPeriod,
basePersistDirectory,
segmentGranularity,
schema,
metrics,
rejectionPolicy,
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
versioningPolicy,
dataSegmentPusher,
segmentPublisher,
serverView,
maxPendingPersists
);
}
private void verifyState()

View File

@ -178,12 +178,12 @@ public class Sink implements Iterable<FireHydrant>
FireHydrant old;
if (currIndex == null) { // Only happens on initialization, cannot synchronize on null
old = currIndex;
currIndex = new FireHydrant(newIndex, hydrants.size());
currIndex = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currIndex);
} else {
synchronized (currIndex) {
old = currIndex;
currIndex = new FireHydrant(newIndex, hydrants.size());
currIndex = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currIndex);
}
}