Merge branch 'master' of github.com:metamx/druid into task-stuff

This commit is contained in:
Fangjin Yang 2013-01-29 12:57:56 -08:00
commit 41ca33c734
53 changed files with 1402 additions and 1071 deletions

View File

@ -22,6 +22,7 @@ package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import org.codehaus.jackson.annotate.JsonProperty;
@ -72,7 +73,12 @@ public abstract class BaseQuery<T> implements Query<T>
@Override
public Sequence<T> run(QuerySegmentWalker walker)
{
return querySegmentSpec.lookup(this, walker).run(this);
return run(querySegmentSpec.lookup(this, walker));
}
public Sequence<T> run(QueryRunner<T> runner)
{
return runner.run(this);
}
@Override

View File

@ -20,6 +20,7 @@
package com.metamx.druid;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.search.SearchQuery;
@ -57,6 +58,8 @@ public interface Query<T>
public Sequence<T> run(QuerySegmentWalker walker);
public Sequence<T> run(QueryRunner<T> runner);
public List<Interval> getIntervals();
public Duration getDuration();

View File

@ -54,6 +54,7 @@ import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.result.BySegmentResultValueClass;
import com.metamx.druid.result.Result;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -110,7 +111,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
public Sequence<T> run(final Query<T> query)
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final CacheStrategy<T, Query<T>> strategy = toolChest.getCacheStrategy(query);
final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query);
final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
@ -241,6 +242,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) {
final byte[] cachedResult = cachedResultPair.rhs;
Sequence<Object> cachedSequence = new BaseSequence<Object, Iterator<Object>>(
@ -255,7 +257,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
return objectMapper.readValues(
objectMapper.getJsonFactory().createJsonParser(cachedResult), Object.class
objectMapper.getJsonFactory().createJsonParser(cachedResult),
cacheObjectClazz
);
}
catch (IOException e) {

View File

@ -148,6 +148,7 @@ public class QueryServlet extends HttpServlet
ImmutableMap.<String, Object>builder()
.put("exception", e.toString())
.put("query", queryString)
.put("host", req.getRemoteAddr())
.build()
)
);

View File

@ -22,16 +22,19 @@ package com.metamx.druid.query;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.Query;
import org.codehaus.jackson.type.TypeReference;
/**
*/
public interface CacheStrategy<T, QueryType extends Query<T>>
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
public byte[] computeCacheKey(QueryType query);
public Function<T, Object> prepareForCache();
public TypeReference<CacheType> getCacheObjectClazz();
public Function<Object, T> pullFromCache();
public Function<T, CacheType> prepareForCache();
public Function<CacheType, T> pullFromCache();
public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences);
}

View File

@ -44,7 +44,7 @@ public interface QueryToolChest<ResultType, QueryType extends Query<ResultType>>
public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public Function<ResultType, ResultType> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
public TypeReference<ResultType> getResultTypeReference();
public CacheStrategy<ResultType, QueryType> getCacheStrategy(QueryType query);
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query);
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner);
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner);
}

View File

@ -29,9 +29,11 @@ import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.input.Rows;
import com.metamx.druid.query.CacheStrategy;
@ -99,10 +101,11 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
}
);
final QueryGranularity gran = query.getGranularity();
final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex(
condensed.get(0).getStartMillis(),
query.getGranularity(),
gran.truncate(condensed.get(0).getStartMillis()),
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
new Accumulator<IncrementalIndex, Row>()
@ -119,7 +122,21 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
}
);
return Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs()));
// convert millis back to timestamp according to granularity to preserve time zone information
return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
private final QueryGranularity granularity = query.getGranularity();
@Override
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
}
}
);
}
};
}
@ -161,7 +178,7 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
}
@Override
public CacheStrategy<Row, GroupByQuery> getCacheStrategy(GroupByQuery query)
public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(GroupByQuery query)
{
return null;
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
/**
*/
public class AllColumnIncluderator implements ColumnIncluderator
{
@Override
public boolean include(String columnName)
{
return true;
}
@Override
public byte[] getCacheKey()
{
return ALL_CACHE_PREFIX;
}
}

View File

@ -0,0 +1,119 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Preconditions;
import com.metamx.druid.index.column.ValueType;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
/**
*/
public class ColumnAnalysis
{
private static final String ERROR_PREFIX = "error:";
public static ColumnAnalysis error(String reason)
{
return new ColumnAnalysis(ERROR_PREFIX + reason, -1, null);
}
private final String type;
private final long size;
private final Integer cardinality;
@JsonCreator
public ColumnAnalysis(
@JsonProperty("type") ValueType type,
@JsonProperty("size") long size,
@JsonProperty("cardinality") Integer cardinality
)
{
this(type.name(), size, cardinality);
}
private ColumnAnalysis(
String type,
long size,
Integer cardinality
)
{
this.type = type;
this.size = size;
this.cardinality = cardinality;
}
@JsonProperty
public String getType()
{
return type;
}
@JsonProperty
public long getSize()
{
return size;
}
@JsonProperty
public Integer getCardinality()
{
return cardinality;
}
public boolean isError()
{
return type.startsWith(ERROR_PREFIX);
}
public ColumnAnalysis fold(ColumnAnalysis rhs)
{
if (rhs == null) {
return this;
}
if (!type.equals(rhs.getType())) {
return ColumnAnalysis.error("cannot_merge_diff_types");
}
Integer cardinality = getCardinality();
final Integer rhsCardinality = rhs.getCardinality();
if (cardinality == null) {
cardinality = rhsCardinality;
}
else {
if (rhsCardinality != null) {
cardinality = Math.max(cardinality, rhsCardinality);
}
}
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality);
}
@Override
public String toString()
{
return "ColumnAnalysis{" +
"type='" + type + '\'' +
", size=" + size +
", cardinality=" + cardinality +
'}';
}
}

View File

@ -0,0 +1,41 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "none", value= NoneColumnIncluderator.class),
@JsonSubTypes.Type(name = "all", value= AllColumnIncluderator.class),
@JsonSubTypes.Type(name = "list", value= ListColumnIncluderator.class)
})
public interface ColumnIncluderator
{
public static final byte[] NONE_CACHE_PREFIX = new byte[]{0x0};
public static final byte[] ALL_CACHE_PREFIX = new byte[]{0x1};
public static final byte[] LIST_CACHE_PREFIX = new byte[]{0x2};
public boolean include(String columnName);
public byte[] getCacheKey();
}

View File

@ -0,0 +1,82 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
*/
public class ListColumnIncluderator implements ColumnIncluderator
{
private final Set<String> columns;
@JsonCreator
public ListColumnIncluderator(
@JsonProperty("columns") List<String> columns
)
{
this.columns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
this.columns.addAll(columns);
}
@JsonProperty
public Set<String> getColumns()
{
return Collections.unmodifiableSet(columns);
}
@Override
public boolean include(String columnName)
{
return columns.contains(columnName);
}
@Override
public byte[] getCacheKey()
{
int size = 1;
List<byte[]> columns = Lists.newArrayListWithExpectedSize(this.columns.size());
for (String column : this.columns) {
final byte[] bytes = column.getBytes(Charsets.UTF_8);
columns.add(bytes);
size += bytes.length;
}
final ByteBuffer bytes = ByteBuffer.allocate(size).put(LIST_CACHE_PREFIX);
for (byte[] column : columns) {
bytes.put(column);
}
return bytes.array();
}
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
/**
*/
public class NoneColumnIncluderator implements ColumnIncluderator
{
@Override
public boolean include(String columnName)
{
return false;
}
@Override
public byte[] getCacheKey()
{
return NONE_CACHE_PREFIX;
}
}

View File

@ -17,61 +17,34 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.result;
package com.metamx.druid.query.metadata;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
public class SegmentMetadataResultValue
public class SegmentAnalysis
{
public static class Dimension {
@JsonProperty public long size;
@JsonProperty public int cardinality;
@JsonCreator
public Dimension(
@JsonProperty("size") long size,
@JsonProperty("cardinality") int cardinality
)
{
this.size = size;
this.cardinality = cardinality;
}
}
public static class Metric {
@JsonProperty public String type;
@JsonProperty public long size;
@JsonCreator
public Metric(
@JsonProperty("type") String type,
@JsonProperty("size") long size
)
{
this.type = type;
this.size = size;
}
}
private final String id;
private final Map<String, Dimension> dimensions;
private final Map<String, Metric> metrics;
private final List<Interval> interval;
private final Map<String, ColumnAnalysis> columns;
private final long size;
@JsonCreator
public SegmentMetadataResultValue(
public SegmentAnalysis(
@JsonProperty("id") String id,
@JsonProperty("dimensions") Map<String, Dimension> dimensions,
@JsonProperty("metrics") Map<String, Metric> metrics,
@JsonProperty("intervals") List<Interval> interval,
@JsonProperty("columns") Map<String, ColumnAnalysis> columns,
@JsonProperty("size") long size
)
{
this.id = id;
this.dimensions = dimensions;
this.metrics = metrics;
this.interval = interval;
this.columns = columns;
this.size = size;
}
@ -82,15 +55,15 @@ public class SegmentMetadataResultValue
}
@JsonProperty
public Map<String, Dimension> getDimensions()
public List<Interval> getIntervals()
{
return dimensions;
return interval;
}
@JsonProperty
public Map<String, Metric> getMetrics()
public Map<String, ColumnAnalysis> getColumns()
{
return metrics;
return columns;
}
@JsonProperty
@ -98,4 +71,24 @@ public class SegmentMetadataResultValue
{
return size;
}
public String toDetailedString()
{
return "SegmentAnalysis{" +
"id='" + id + '\'' +
", interval=" + interval +
", columns=" + columns +
", size=" + size +
'}';
}
@Override
public String toString()
{
return "SegmentAnalysis{" +
"id='" + id + '\'' +
", interval=" + interval +
", size=" + size +
'}';
}
}

View File

@ -22,26 +22,40 @@ package com.metamx.druid.query.metadata;
import com.metamx.druid.BaseQuery;
import com.metamx.druid.Query;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Map;
public class SegmentMetadataQuery extends BaseQuery<Result<SegmentMetadataResultValue>>
public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
{
private final ColumnIncluderator toInclude;
private final boolean merge;
public SegmentMetadataQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("toInclude") ColumnIncluderator toInclude,
@JsonProperty("merge") Boolean merge,
@JsonProperty("context") Map<String, String> context
)
{
super(
dataSource,
querySegmentSpec,
context
);
super(dataSource, querySegmentSpec, context);
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
this.merge = merge == null ? false : merge;
}
@JsonProperty
public ColumnIncluderator getToInclude()
{
return toInclude;
}
@JsonProperty
public boolean isMerge()
{
return merge;
}
@Override
@ -57,22 +71,16 @@ public class SegmentMetadataQuery extends BaseQuery<Result<SegmentMetadataResult
}
@Override
public Query<Result<SegmentMetadataResultValue>> withOverriddenContext(Map<String, String> contextOverride)
public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
{
return new SegmentMetadataQuery(
getDataSource(),
getQuerySegmentSpec(),
computeOverridenContext(contextOverride)
getDataSource(), getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
);
}
@Override
public Query<Result<SegmentMetadataResultValue>> withQuerySegmentSpec(QuerySegmentSpec spec)
public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new SegmentMetadataQuery(
getDataSource(),
spec,
getContext()
);
return new SegmentMetadataQuery(getDataSource(), spec, toInclude, merge, getContext());
}
}

View File

@ -22,32 +22,116 @@ package com.metamx.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.metamx.common.guava.ConcatSequence;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.druid.Query;
import com.metamx.druid.collect.OrderedMergeSequence;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import com.metamx.druid.query.ResultMergeQueryRunner;
import com.metamx.druid.utils.JodaUtils;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
{
private static final TypeReference<Result<SegmentMetadataResultValue>> TYPE_REFERENCE = new TypeReference<Result<SegmentMetadataResultValue>>(){};
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(final QueryRunner<Result<SegmentMetadataResultValue>> runner)
public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
{
@Override
protected Ordering<SegmentAnalysis> makeOrdering(Query<SegmentAnalysis> query)
{
if (((SegmentMetadataQuery) query).isMerge()) {
// Merge everything always
return new Ordering<SegmentAnalysis>()
{
@Override
public int compare(
@Nullable SegmentAnalysis left, @Nullable SegmentAnalysis right
)
{
return 0;
}
};
}
return getOrdering(); // No two elements should be equal, so it should never merge
}
@Override
protected BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis> createMergeFn(final Query<SegmentAnalysis> inQ)
{
return new BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis>()
{
private final SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
@Override
public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
if (!query.isMerge()) {
throw new ISE("Merging when a merge isn't supposed to happen[%s], [%s]", arg1, arg2);
}
List<Interval> newIntervals = JodaUtils.condenseIntervals(
Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
);
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
final String columnName = entry.getKey();
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
rightColumnNames.remove(columnName);
}
for (String columnName : rightColumnNames) {
columns.put(columnName, rightColumns.get(columnName));
}
return new SegmentAnalysis("merged", newIntervals, columns, arg1.getSize() + arg2.getSize());
}
};
}
};
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{
return new OrderedMergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
}
@Override
@ -67,13 +151,7 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result
}
@Override
public Sequence<Result<SegmentMetadataResultValue>> mergeSequences(Sequence<Sequence<Result<SegmentMetadataResultValue>>> seqOfSequences)
{
return new ConcatSequence<Result<SegmentMetadataResultValue>>(seqOfSequences);
}
@Override
public Function<Result<SegmentMetadataResultValue>, Result<SegmentMetadataResultValue>> makeMetricManipulatorFn(
public Function<SegmentAnalysis, SegmentAnalysis> makeMetricManipulatorFn(
SegmentMetadataQuery query, MetricManipulationFn fn
)
{
@ -81,26 +159,87 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result
}
@Override
public TypeReference<Result<SegmentMetadataResultValue>> getResultTypeReference()
public TypeReference<SegmentAnalysis> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Result<SegmentMetadataResultValue>, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
{
return null;
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public byte[] computeCacheKey(SegmentMetadataQuery query)
{
byte[] includerBytes = query.getToInclude().getCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
}
@Override
public TypeReference<SegmentAnalysis> getCacheObjectClazz()
{
return getResultTypeReference();
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache()
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
{
return input;
}
};
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache()
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
{
return input;
}
};
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{
return new MergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
}
};
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> preMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
public QueryRunner<SegmentAnalysis> preMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
{
return runner;
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> postMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
public QueryRunner<SegmentAnalysis> postMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
{
return runner;
}
private Ordering<SegmentAnalysis> getOrdering()
{
return new Ordering<SegmentAnalysis>()
{
@Override
public int compare(SegmentAnalysis left, SegmentAnalysis right)
{
return left.getId().compareTo(right.getId());
}
};
}
}

View File

@ -82,6 +82,10 @@ public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchRe
maxSearchLimit = PropUtils.getPropertyAsInt(props, "com.metamx.query.search.maxSearchLimit", 1000);
}
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
@Override
public QueryRunner<Result<SearchResultValue>> mergeResults(QueryRunner<Result<SearchResultValue>> runner)
{
@ -143,9 +147,9 @@ public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchRe
}
@Override
public CacheStrategy<Result<SearchResultValue>, SearchQuery> getCacheStrategy(SearchQuery query)
public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> getCacheStrategy(SearchQuery query)
{
return new CacheStrategy<Result<SearchResultValue>, SearchQuery>()
return new CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>()
{
@Override
public byte[] computeCacheKey(SearchQuery query)
@ -183,6 +187,12 @@ public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchRe
return queryCacheKey.array();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Result<SearchResultValue>, Object> prepareForCache()
{

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.segment;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.List;
/**
*/
public class QuerySegmentSpecs
{
public static QuerySegmentSpec create(String isoInterval)
{
return new LegacySegmentSpec(isoInterval);
}
public static QuerySegmentSpec create(Interval interval)
{
return create(Arrays.asList(interval));
}
public static QuerySegmentSpec create(List<Interval> intervals)
{
return new MultipleIntervalSegmentSpec(intervals);
}
}

View File

@ -53,6 +53,9 @@ public class TimeBoundaryQueryQueryToolChest
private static final TypeReference<Result<TimeBoundaryResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeBoundaryResultValue>>()
{
};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> mergeResults(
@ -106,9 +109,9 @@ public class TimeBoundaryQueryQueryToolChest
}
@Override
public CacheStrategy<Result<TimeBoundaryResultValue>, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query)
public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query)
{
return new CacheStrategy<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>()
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{
@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
@ -119,6 +122,12 @@ public class TimeBoundaryQueryQueryToolChest
.array();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache()
{

View File

@ -28,6 +28,7 @@ import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.ResultGranularTimestampComparator;
import com.metamx.druid.TimeseriesBinaryFn;
import com.metamx.druid.aggregation.AggregatorFactory;
@ -49,6 +50,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import org.joda.time.Period;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -66,6 +68,9 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeseriesResultValue>>()
{
};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
@Override
public QueryRunner<Result<TimeseriesResultValue>> mergeResults(QueryRunner<Result<TimeseriesResultValue>> queryRunner)
@ -100,10 +105,7 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
@Override
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(
getOrdering(),
seqOfSequences
);
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
}
@Override
@ -156,9 +158,9 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
}
@Override
public CacheStrategy<Result<TimeseriesResultValue>, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query)
public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query)
{
return new CacheStrategy<Result<TimeseriesResultValue>, TimeseriesQuery>()
return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@ -180,6 +182,12 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
.array();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Result<TimeseriesResultValue>, Object> prepareForCache()
{
@ -206,6 +214,8 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
{
return new Function<Object, Result<TimeseriesResultValue>>()
{
private final QueryGranularity granularity = query.getGranularity();
@Override
public Result<TimeseriesResultValue> apply(@Nullable Object input)
{
@ -215,7 +225,8 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Iterator<Object> resultIter = results.iterator();
DateTime timestamp = new DateTime(resultIter.next());
DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue());
while (aggsIter.hasNext() && resultIter.hasNext()) {
final AggregatorFactory factory = aggsIter.next();
retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
@ -257,6 +268,4 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
{
return Ordering.natural();
}
}

View File

@ -71,7 +71,7 @@ public class Result<T> implements Comparable<Result<T>>
Result result = (Result) o;
if (timestamp != null ? !timestamp.equals(result.timestamp) : result.timestamp != null) {
if (timestamp != null ? !(timestamp.isEqual(result.timestamp) && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) : result.timestamp != null) {
return false;
}
if (value != null ? !value.equals(result.value) : result.value != null) {

View File

@ -37,7 +37,7 @@ import java.util.Map;
*/
public class MapBasedRow implements Row
{
private final long timestamp;
private final DateTime timestamp;
private final Map<String, Object> event;
@JsonCreator
@ -46,22 +46,21 @@ public class MapBasedRow implements Row
@JsonProperty("event") Map<String, Object> event
)
{
this(timestamp.getMillis(), event);
this.timestamp = timestamp;
this.event = event;
}
public MapBasedRow(
long timestamp,
Map<String, Object> event
)
{
this.timestamp = timestamp;
this.event = event;
) {
this(new DateTime(timestamp), event);
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
return timestamp.getMillis();
}
@Override
@ -120,7 +119,7 @@ public class MapBasedRow implements Row
@JsonProperty
public DateTime getTimestamp()
{
return new DateTime(timestamp);
return timestamp;
}
@JsonProperty
@ -133,9 +132,38 @@ public class MapBasedRow implements Row
public String toString()
{
return "MapBasedRow{" +
"timestamp=" + new DateTime(timestamp) +
"timestamp=" + timestamp +
", event=" + event +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MapBasedRow that = (MapBasedRow) o;
if (!event.equals(that.event)) {
return false;
}
if (!timestamp.equals(that.timestamp)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = timestamp.hashCode();
result = 31 * result + event.hashCode();
return result;
}
}

View File

@ -52,6 +52,12 @@ public class Rows
{
return row.getFloatMetric(metric);
}
@Override
public String toString()
{
return row.toString();
}
};
}
}

View File

@ -25,5 +25,7 @@ import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
*/
public interface BitmapIndex
{
public int getCardinality();
public String getValue(int index);
public ImmutableConciseSet getConciseSet(String value);
}

View File

@ -25,7 +25,7 @@ import com.metamx.druid.kv.IndexedInts;
*/
public interface DictionaryEncodedColumn
{
public int size();
public int length();
public boolean hasMultipleValues();
public int getSingleValueRow(int rowNum);
public IndexedInts getMultiValueRow(int rowNum);

View File

@ -29,7 +29,7 @@ import java.io.Closeable;
*/
public interface GenericColumn extends Closeable
{
public int size();
public int length();
public ValueType getType();
public boolean hasMultipleValues();

View File

@ -38,7 +38,7 @@ public class IndexedFloatsGenericColumn implements GenericColumn
}
@Override
public int size()
public int length()
{
return column.size();
}

View File

@ -38,7 +38,7 @@ public class IndexedLongsGenericColumn implements GenericColumn
}
@Override
public int size()
public int length()
{
return column.size();
}

View File

@ -62,7 +62,7 @@ class SimpleColumn implements Column
GenericColumn column = null;
try {
column = genericColumn.get();
return column.size();
return column.length();
}
finally {
Closeables.closeQuietly(column);

View File

@ -44,7 +44,7 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
}
@Override
public int size()
public int length()
{
return hasMultipleValues() ? multiValueColumn.size() : column.size();
}

View File

@ -67,7 +67,7 @@ public class StringMultiValueColumn extends AbstractColumn
return new DictionaryEncodedColumn()
{
@Override
public int size()
public int length()
{
return column.size();
}

View File

@ -22,6 +22,7 @@ package com.metamx.druid.index.serde;
import com.google.common.base.Supplier;
import com.metamx.druid.index.column.BitmapIndex;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
/**
@ -46,6 +47,18 @@ public class BitmapIndexColumnPartSupplier implements Supplier<BitmapIndex>
{
return new BitmapIndex()
{
@Override
public int getCardinality()
{
return dictionary.size();
}
@Override
public String getValue(int index)
{
return dictionary.get(index);
}
@Override
public ImmutableConciseSet getConciseSet(String value)
{

View File

@ -649,8 +649,12 @@ public class IndexIO
}
Set<String> colSet = Sets.newTreeSet();
colSet.addAll(Lists.newArrayList(index.getAvailableDimensions()));
colSet.addAll(Lists.newArrayList(index.getAvailableMetrics()));
for (String dimension : index.getAvailableDimensions()) {
colSet.add(dimension.toLowerCase());
}
for (String metric : index.getAvailableMetrics()) {
colSet.add(metric.toLowerCase());
}
String[] cols = colSet.toArray(new String[colSet.size()]);

View File

@ -19,6 +19,7 @@
package com.metamx.druid.index.v1.serde;
import com.google.common.base.Function;
import com.metamx.druid.index.column.ColumnBuilder;
import com.metamx.druid.index.serde.ColumnPartSerde;
import com.metamx.druid.kv.ObjectStrategy;
@ -27,10 +28,10 @@ import java.nio.ByteBuffer;
/**
*/
public interface ComplexMetricSerde
public abstract class ComplexMetricSerde
{
public String getTypeName();
public ComplexMetricExtractor getExtractor();
public abstract String getTypeName();
public abstract ComplexMetricExtractor getExtractor();
/**
* Deserializes a ByteBuffer and adds it to the ColumnBuilder. This method allows for the ComplexMetricSerde
@ -42,7 +43,7 @@ public interface ComplexMetricSerde
* @param buffer the buffer to deserialize
* @return a ColumnPartSerde that can serialize out the object that was read from the buffer to the builder
*/
public ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder);
public abstract ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder);
/**
* This is deprecated because its usage is going to be removed from the code.
@ -55,5 +56,20 @@ public interface ComplexMetricSerde
* @return an ObjectStrategy as used by GenericIndexed
*/
@Deprecated
public ObjectStrategy getObjectStrategy();
public abstract ObjectStrategy getObjectStrategy();
/**
* Returns a function that can convert the Object provided by the ComplexColumn created through deserializeColumn
* into a number of expected input bytes to produce that object.
*
* This is used to approximate the size of the input data via the SegmentMetadataQuery and does not need to be
* overridden if you do not care about the query.
*
* @return A function that can compute the size of the complex object or null if you cannot/do not want to compute it
*/
public Function<Object, Long> inputSizeFn()
{
return null;
}
}

View File

@ -33,7 +33,6 @@ import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerView;
@ -44,9 +43,6 @@ import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.index.v1.MMappedIndexQueryableIndex;
import com.metamx.druid.index.v1.MMappedIndexStorageAdapter;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.query.MetricsEmittingQueryRunner;
import com.metamx.druid.query.QueryRunner;

View File

@ -29,7 +29,6 @@ import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.StorageAdapterLoadingException;
import com.metamx.druid.partition.PartitionChunk;
@ -330,7 +329,7 @@ public class ServerManager implements QuerySegmentWalker
}
},
new BySegmentQueryRunner<T>(
adapter.getSegmentIdentifier(),
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
)

View File

@ -38,7 +38,7 @@ public class IncrementalIndexSegment implements Segment
}
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
throw new UnsupportedOperationException();
}

View File

@ -37,7 +37,7 @@ public class QueryableIndexSegment implements Segment
}
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
return identifier;
}

View File

@ -26,7 +26,7 @@ import org.joda.time.Interval;
*/
public interface Segment
{
public String getSegmentIdentifier();
public String getIdentifier();
public Interval getDataInterval();
public QueryableIndex asQueryableIndex();
public StorageAdapter asStorageAdapter();

View File

@ -1,103 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumnImpl;
import com.metamx.druid.index.column.FloatColumn;
import com.metamx.druid.index.column.LongColumn;
import com.metamx.druid.index.column.StringMultiValueColumn;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.VSizeIndexed;
import org.joda.time.Interval;
/**
*/
public class MMappedIndexQueryableIndex implements QueryableIndex
{
private final MMappedIndex index;
public MMappedIndexQueryableIndex(
MMappedIndex index
)
{
this.index = index;
}
public MMappedIndex getIndex()
{
return index;
}
@Override
public Interval getDataInterval()
{
return index.getDataInterval();
}
@Override
public int getNumRows()
{
return index.getTimestamps().size();
}
@Override
public Indexed<String> getColumnNames()
{
return null;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return index.getAvailableDimensions();
}
@Override
public Column getTimeColumn()
{
return new LongColumn(index.timestamps);
}
@Override
public Column getColumn(String columnName)
{
final MetricHolder metricHolder = index.getMetricHolder(columnName);
if (metricHolder == null) {
final VSizeIndexed dimColumn = index.getDimColumn(columnName);
if (dimColumn == null) {
return null;
}
return new StringMultiValueColumn(
index.getDimValueLookup(columnName),
dimColumn,
index.getInvertedIndexes().get(columnName)
);
}
else if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) {
return new FloatColumn(metricHolder.floatType);
}
else {
return new ComplexColumnImpl(metricHolder.getTypeName(), metricHolder.getComplexType());
}
}
}

View File

@ -1,666 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.collect.MoreIterators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.Capabilities;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.index.brita.BitmapIndexSelector;
import com.metamx.druid.index.brita.Filter;
import com.metamx.druid.index.v1.processing.Cursor;
import com.metamx.druid.index.v1.processing.DimensionSelector;
import com.metamx.druid.index.v1.processing.Offset;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedFloats;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
/**
*/
public class MMappedIndexStorageAdapter extends BaseStorageAdapter
{
private final MMappedIndex index;
public MMappedIndexStorageAdapter(
MMappedIndex index
)
{
this.index = index;
}
public MMappedIndex getIndex()
{
return index;
}
@Override
public String getSegmentIdentifier()
{
throw new UnsupportedOperationException();
}
@Override
public Interval getInterval()
{
return index.getDataInterval();
}
@Override
public int getDimensionCardinality(String dimension)
{
final Indexed<String> dimValueLookup = index.getDimValueLookup(dimension.toLowerCase());
if (dimValueLookup == null) {
return 0;
}
return dimValueLookup.size();
}
@Override
public DateTime getMinTime()
{
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final DateTime retVal = new DateTime(timestamps.get(0));
Closeables.closeQuietly(timestamps);
return retVal;
}
@Override
public DateTime getMaxTime()
{
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final DateTime retVal = new DateTime(timestamps.get(timestamps.size() - 1));
Closeables.closeQuietly(timestamps);
return retVal;
}
@Override
public Capabilities getCapabilities()
{
return Capabilities.builder().dimensionValuesSorted(true).build();
}
@Override
public Iterable<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
{
Interval actualInterval = interval;
if (!actualInterval.overlaps(index.dataInterval)) {
return ImmutableList.of();
}
if (actualInterval.getStart().isBefore(index.dataInterval.getStart())) {
actualInterval = actualInterval.withStart(index.dataInterval.getStart());
}
if (actualInterval.getEnd().isAfter(index.dataInterval.getEnd())) {
actualInterval = actualInterval.withEnd(index.dataInterval.getEnd());
}
final Iterable<Cursor> iterable;
if (filter == null) {
iterable = new NoFilterCursorIterable(index, actualInterval, gran);
} else {
Offset offset = new ConciseOffset(filter.goConcise(new MMappedBitmapIndexSelector(index)));
iterable = new CursorIterable(index, actualInterval, gran, offset);
}
return FunctionalIterable.create(iterable).keep(Functions.<Cursor>identity());
}
@Override
public Indexed<String> getAvailableDimensions()
{
return index.getAvailableDimensions();
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
return index.getDimValueLookup(dimension.toLowerCase());
}
@Override
public ImmutableConciseSet getInvertedIndex(String dimension, String dimVal)
{
return index.getInvertedIndex(dimension.toLowerCase(), dimVal);
}
@Override
public Offset getFilterOffset(Filter filter)
{
return new ConciseOffset(
filter.goConcise(
new MMappedBitmapIndexSelector(index)
)
);
}
private static class CursorIterable implements Iterable<Cursor>
{
private final MMappedIndex index;
private final Interval interval;
private final QueryGranularity gran;
private final Offset offset;
public CursorIterable(
MMappedIndex index,
Interval interval,
QueryGranularity gran,
Offset offset
)
{
this.index = index;
this.interval = interval;
this.gran = gran;
this.offset = offset;
}
@Override
public Iterator<Cursor> iterator()
{
final Offset baseOffset = offset.clone();
final Map<String, Object> metricHolderCache = Maps.newHashMap();
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
.create(gran.iterable(interval.getStartMillis(), interval.getEndMillis()).iterator())
.transform(
new Function<Long, Cursor>()
{
@Override
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (baseOffset.withinBounds()
&& timestamps.get(baseOffset.getOffset()) < timeStart) {
baseOffset.increment();
}
final Offset offset = new TimestampCheckingOffset(
baseOffset, timestamps, Math.min(interval.getEndMillis(), gran.next(timeStart))
);
return new Cursor()
{
private final Offset initOffset = offset.clone();
private final DateTime myBucket = gran.toDateTime(input);
private Offset cursorOffset = offset;
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advance()
{
cursorOffset.increment();
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public void reset()
{
cursorOffset = initOffset.clone();
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
final String dimensionName = dimension.toLowerCase();
final Indexed<? extends IndexedInts> rowVals = index.getDimColumn(dimensionName);
final Indexed<String> dimValueLookup = index.getDimValueLookup(dimensionName);
if (rowVals == null) {
return null;
}
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
return rowVals.get(cursorOffset.getOffset());
}
@Override
public int getValueCardinality()
{
return dimValueLookup.size();
}
@Override
public String lookupName(int id)
{
final String retVal = dimValueLookup.get(id);
return retVal == null ? "" : retVal;
}
@Override
public int lookupId(String name)
{
return ("".equals(name)) ? dimValueLookup.indexOf(null) : dimValueLookup.indexOf(name);
}
};
}
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
IndexedFloats cachedMetricVals = (IndexedFloats) metricHolderCache.get(metricName);
if (cachedMetricVals == null) {
MetricHolder holder = index.getMetricHolder(metricName);
if (holder != null) {
cachedMetricVals = holder.getFloatType();
metricHolderCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return new FloatMetricSelector()
{
@Override
public float get()
{
return 0.0f;
}
};
}
final IndexedFloats metricVals = cachedMetricVals;
return new FloatMetricSelector()
{
@Override
public float get()
{
return metricVals.get(cursorOffset.getOffset());
}
};
}
@Override
public ComplexMetricSelector makeComplexMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
Indexed cachedMetricVals = (Indexed) metricHolderCache.get(metricName);
if (cachedMetricVals == null) {
MetricHolder holder = index.getMetricHolder(metricName);
if (holder != null) {
cachedMetricVals = holder.getComplexType();
metricHolderCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return null;
}
final Indexed metricVals = cachedMetricVals;
return new ComplexMetricSelector()
{
@Override
public Class classOfObject()
{
return metricVals.getClazz();
}
@Override
public Object get()
{
return metricVals.get(cursorOffset.getOffset());
}
};
}
};
}
}
);
// This after call is not perfect, if there is an exception during processing, it will never get called,
// but it's better than nothing and doing this properly all the time requires a lot more fixerating
return MoreIterators.after(
retVal,
new Runnable()
{
@Override
public void run()
{
Closeables.closeQuietly(timestamps);
for (Object object : metricHolderCache.values()) {
if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object);
}
}
}
}
);
}
}
private static class TimestampCheckingOffset implements Offset
{
private final Offset baseOffset;
private final IndexedLongs timestamps;
private final long threshold;
public TimestampCheckingOffset(
Offset baseOffset,
IndexedLongs timestamps,
long threshold
)
{
this.baseOffset = baseOffset;
this.timestamps = timestamps;
this.threshold = threshold;
}
@Override
public int getOffset()
{
return baseOffset.getOffset();
}
@Override
public Offset clone()
{
return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold);
}
@Override
public boolean withinBounds()
{
return baseOffset.withinBounds() && timestamps.get(baseOffset.getOffset()) < threshold;
}
@Override
public void increment()
{
baseOffset.increment();
}
}
private static class NoFilterCursorIterable implements Iterable<Cursor>
{
private final MMappedIndex index;
private final Interval interval;
private final QueryGranularity gran;
public NoFilterCursorIterable(
MMappedIndex index,
Interval interval,
QueryGranularity gran
)
{
this.index = index;
this.interval = interval;
this.gran = gran;
}
/**
* This produces iterators of Cursor objects that must be fully processed (until isDone() returns true) before the
* next Cursor is processed. It is *not* safe to pass these cursors off to another thread for parallel processing
*
* @return
*/
@Override
public Iterator<Cursor> iterator()
{
final Map<String, Object> metricCacheMap = Maps.newHashMap();
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
.create(gran.iterable(interval.getStartMillis(), interval.getEndMillis()).iterator())
.transform(
new Function<Long, Cursor>()
{
private int currRow = 0;
@Override
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (currRow < timestamps.size() && timestamps.get(currRow) < timeStart) {
++currRow;
}
return new Cursor()
{
private final DateTime myBucket = gran.toDateTime(input);
private final long nextBucket = Math.min(gran.next(myBucket.getMillis()), interval.getEndMillis());
private final int initRow = currRow;
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advance()
{
++currRow;
}
@Override
public boolean isDone()
{
return currRow >= timestamps.size() || timestamps.get(currRow) >= nextBucket;
}
@Override
public void reset()
{
currRow = initRow;
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
final String dimensionName = dimension.toLowerCase();
final Indexed<? extends IndexedInts> rowVals = index.getDimColumn(dimensionName);
final Indexed<String> dimValueLookup = index.getDimValueLookup(dimensionName);
if (rowVals == null) {
return null;
}
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
return rowVals.get(currRow);
}
@Override
public int getValueCardinality()
{
return dimValueLookup.size();
}
@Override
public String lookupName(int id)
{
final String retVal = dimValueLookup.get(id);
return retVal == null ? "" : retVal;
}
@Override
public int lookupId(String name)
{
return ("".equals(name)) ? dimValueLookup.indexOf(null) : dimValueLookup.indexOf(name);
}
};
}
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
IndexedFloats cachedMetricVals = (IndexedFloats) metricCacheMap.get(metricName);
if (cachedMetricVals == null) {
final MetricHolder metricHolder = index.getMetricHolder(metricName);
if (metricHolder != null) {
cachedMetricVals = metricHolder.getFloatType();
if (cachedMetricVals != null) {
metricCacheMap.put(metricName, cachedMetricVals);
}
}
}
if (cachedMetricVals == null) {
return new FloatMetricSelector()
{
@Override
public float get()
{
return 0.0f;
}
};
}
final IndexedFloats metricVals = cachedMetricVals;
return new FloatMetricSelector()
{
@Override
public float get()
{
return metricVals.get(currRow);
}
};
}
@Override
public ComplexMetricSelector makeComplexMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
Indexed cachedMetricVals = (Indexed) metricCacheMap.get(metricName);
if (cachedMetricVals == null) {
final MetricHolder metricHolder = index.getMetricHolder(metricName);
if (metricHolder != null) {
cachedMetricVals = metricHolder.getComplexType();
if (cachedMetricVals != null) {
metricCacheMap.put(metricName, cachedMetricVals);
}
}
}
if (cachedMetricVals == null) {
return null;
}
final Indexed metricVals = cachedMetricVals;
return new ComplexMetricSelector()
{
@Override
public Class classOfObject()
{
return metricVals.getClazz();
}
@Override
public Object get()
{
return metricVals.get(currRow);
}
};
}
};
}
}
);
return MoreIterators.after(
retVal,
new Runnable()
{
@Override
public void run()
{
Closeables.closeQuietly(timestamps);
for (Object object : metricCacheMap.values()) {
if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object);
}
}
}
}
);
}
}
private static class MMappedBitmapIndexSelector implements BitmapIndexSelector
{
private final MMappedIndex index;
public MMappedBitmapIndexSelector(final MMappedIndex index)
{
this.index = index;
}
@Override
public Indexed<String> getDimensionValues(String dimension)
{
return index.getDimValueLookup(dimension.toLowerCase());
}
@Override
public int getNumRows()
{
return index.getReadOnlyTimestamps().size();
}
@Override
public ImmutableConciseSet getConciseInvertedIndex(String dimension, String value)
{
return index.getInvertedIndex(dimension.toLowerCase(), value);
}
}
}

View File

@ -113,7 +113,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
GenericColumn column = null;
try {
column = index.getTimeColumn().getGenericColumn();
return new DateTime(column.getLongSingleValueRow(column.size() - 1));
return new DateTime(column.getLongSingleValueRow(column.length() - 1));
}
finally {
Closeables.closeQuietly(column);
@ -572,7 +572,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (currRow < timestamps.size() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
while (currRow < timestamps.length() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
++currRow;
}
@ -597,7 +597,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
@Override
public boolean isDone()
{
return currRow >= timestamps.size() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
return currRow >= timestamps.length() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
}
@Override
@ -848,7 +848,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
GenericColumn column = null;
try {
column = index.getTimeColumn().getGenericColumn();
return column.size();
return column.length();
}
finally {
Closeables.closeQuietly(column);

View File

@ -43,6 +43,7 @@ import com.metamx.druid.index.v1.processing.DimensionSelector;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -347,7 +348,7 @@ public class GroupByQueryEngine
.transform(
new Function<Map.Entry<ByteBuffer, Integer>, Row>()
{
private final long timestamp = cursor.getTime().getMillis();
private final DateTime timestamp = cursor.getTime();
private final int[] increments = positionMaintainer.getIncrements();
@Override

View File

@ -0,0 +1,160 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Longs;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.column.BitmapIndex;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ColumnCapabilities;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import java.util.Map;
public class SegmentAnalyzer
{
private static final Logger log = new Logger(SegmentAnalyzer.class);
/**
* This is based on the minimum size of a timestamp (POSIX seconds). An ISO timestamp will actually be more like 24+
*/
private static final int NUM_BYTES_IN_TIMESTAMP = 10;
/**
* This is based on assuming 6 units of precision, one decimal point and a single value left of the decimal
*/
private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
public Map<String, ColumnAnalysis> analyze(QueryableIndex index)
{
Preconditions.checkNotNull(index, "Index cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
for (String columnName : index.getColumnNames()) {
final Column column = index.getColumn(columnName);
final ColumnCapabilities capabilities = column.getCapabilities();
final ColumnAnalysis analysis;
final ValueType type = capabilities.getType();
switch(type) {
case LONG:
analysis = analyzeLongColumn(column);
break;
case FLOAT:
analysis = analyzeFloatColumn(column);
break;
case STRING:
analysis = analyzeStringColumn(column);
break;
case COMPLEX:
analysis = analyzeComplexColumn(column);
break;
default:
log.warn("Unknown column type[%s].", type);
analysis = ColumnAnalysis.error(String.format("unknown_type_%s", type));
}
columns.put(columnName, analysis);
}
columns.put("__time", lengthBasedAnalysis(index.getTimeColumn(), NUM_BYTES_IN_TIMESTAMP));
return columns;
}
public ColumnAnalysis analyzeLongColumn(Column column)
{
return lengthBasedAnalysis(column, Longs.BYTES);
}
public ColumnAnalysis analyzeFloatColumn(Column column)
{
return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT);
}
private ColumnAnalysis lengthBasedAnalysis(Column column, final int numBytes)
{
final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null);
}
public ColumnAnalysis analyzeStringColumn(Column column)
{
final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasBitmapIndexes()) {
final BitmapIndex bitmapIndex = column.getBitmapIndex();
int cardinality = bitmapIndex.getCardinality();
long size = 0;
for (int i = 0; i < cardinality; ++i) {
String value = bitmapIndex.getValue(i);
if (value != null) {
size += value.getBytes(Charsets.UTF_8).length * bitmapIndex.getConciseSet(value).size();
}
}
return new ColumnAnalysis(capabilities.getType(), size, cardinality);
}
return ColumnAnalysis.error("string_no_bitmap");
}
public ColumnAnalysis analyzeComplexColumn(Column column)
{
final ColumnCapabilities capabilities = column.getCapabilities();
final ComplexColumn complexColumn = column.getComplexColumn();
final String typeName = complexColumn.getTypeName();
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
}
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
if (inputSizeFn == null) {
return ColumnAnalysis.error("noSizeFn");
}
final int length = column.getLength();
long size = 0;
for (int i = 0; i < length; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
return new ColumnAnalysis(capabilities.getType(), size, null);
}
}

View File

@ -1,109 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.SimpleSequence;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
public class SegmentMetadataQueryEngine
{
public Sequence<Result<SegmentMetadataResultValue>> process(
final SegmentMetadataQuery query,
StorageAdapter storageAdapter
)
{
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}
if(!(storageAdapter instanceof SegmentIdAttachedStorageAdapter) ||
!(((SegmentIdAttachedStorageAdapter)storageAdapter).getDelegate() instanceof BaseStorageAdapter)) {
return Sequences.empty();
}
final BaseStorageAdapter adapter = (BaseStorageAdapter)
((SegmentIdAttachedStorageAdapter) storageAdapter).getDelegate();
Function<String, SegmentMetadataResultValue.Dimension> sizeDimension = new Function<String, SegmentMetadataResultValue.Dimension>()
{
@Override
public SegmentMetadataResultValue.Dimension apply(@Nullable String input)
{
long size = 0;
final Indexed<String> lookup = adapter.getDimValueLookup(input);
for (String dimVal : lookup) {
ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal);
size += (dimVal == null) ? 0 : index.size() * Charsets.UTF_8.encode(dimVal).capacity();
}
return new SegmentMetadataResultValue.Dimension(
size,
adapter.getDimensionCardinality(input)
);
}
};
// TODO: add metric storage size
long totalSize = 0;
HashMap<String, SegmentMetadataResultValue.Dimension> dimensions = Maps.newHashMap();
for(String input : adapter.getAvailableDimensions()) {
SegmentMetadataResultValue.Dimension d = sizeDimension.apply(input);
dimensions.put(input, d);
totalSize += d.size;
}
return new SimpleSequence<Result<SegmentMetadataResultValue>>(
ImmutableList.of(
new Result<SegmentMetadataResultValue>(
adapter.getMinTime(),
new SegmentMetadataResultValue(
storageAdapter.getSegmentIdentifier(),
dimensions,
ImmutableMap.<String, SegmentMetadataResultValue.Metric>of(),
totalSize
)
)
)
);
}
}

View File

@ -21,83 +21,105 @@ package com.metamx.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.ISE;
import com.google.common.collect.Maps;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.Segment;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryEngine;
import com.metamx.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import com.metamx.druid.result.SegmentMetadataResultValue;
import com.metamx.druid.result.Result;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest()
{
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(QueryRunner<Result<SegmentMetadataResultValue>> runner)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
}
};
private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest();
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> createRunner(final Segment adapter)
public QueryRunner<SegmentAnalysis> createRunner(final Segment segment)
{
return new QueryRunner<Result<SegmentMetadataResultValue>>()
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> run(Query<Result<SegmentMetadataResultValue>> query)
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ)
{
if (!(query instanceof SegmentMetadataQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), SegmentMetadataQuery.class);
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
final QueryableIndex index = segment.asQueryableIndex();
if (index == null) {
return Sequences.empty();
}
return new SegmentMetadataQueryEngine().process((SegmentMetadataQuery) query, adapter.asStorageAdapter());
final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(index);
// Initialize with the size of the whitespace, 1 byte per
long totalSize = analyzedColumns.size() * index.getNumRows();
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
ColumnIncluderator includerator = query.getToInclude();
for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
final String columnName = entry.getKey();
final ColumnAnalysis column = entry.getValue();
if (!column.isError()) {
totalSize += column.getSize();
}
if (includerator.include(columnName)) {
columns.put(columnName, column);
}
}
return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
segment.getIdentifier(),
Arrays.asList(segment.getDataInterval()),
columns,
totalSize
)
)
);
}
};
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeRunners(
final ExecutorService queryExecutor, Iterable<QueryRunner<Result<SegmentMetadataResultValue>>> queryRunners
public QueryRunner<SegmentAnalysis> mergeRunners(
final ExecutorService queryExecutor, Iterable<QueryRunner<SegmentAnalysis>> queryRunners
)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(
return new ConcatQueryRunner<SegmentAnalysis>(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<Result<SegmentMetadataResultValue>>, QueryRunner<Result<SegmentMetadataResultValue>>>()
new Function<QueryRunner<SegmentAnalysis>, QueryRunner<SegmentAnalysis>>()
{
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> apply(final QueryRunner<Result<SegmentMetadataResultValue>> input)
public QueryRunner<SegmentAnalysis> apply(final QueryRunner<SegmentAnalysis> input)
{
return new QueryRunner<Result<SegmentMetadataResultValue>>()
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> run(final Query<Result<SegmentMetadataResultValue>> query)
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
{
Future<Sequence<Result<SegmentMetadataResultValue>>> future = queryExecutor.submit(
new Callable<Sequence<Result<SegmentMetadataResultValue>>>()
Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new Callable<Sequence<SegmentAnalysis>>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> call() throws Exception
public Sequence<SegmentAnalysis> call() throws Exception
{
return new ExecutorExecutingSequence<Result<SegmentMetadataResultValue>>(
return new ExecutorExecutingSequence<SegmentAnalysis>(
input.run(query),
queryExecutor
);
@ -122,7 +144,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Res
}
@Override
public QueryToolChest getToolchest()
public QueryToolChest<SegmentAnalysis, SegmentMetadataQuery> getToolchest()
{
return toolChest;
}

View File

@ -46,6 +46,16 @@ public class TestHelper
assertResults(expectedResults, results, failMsg);
}
public static <T> void assertExpectedObjects(Iterable<T> expectedResults, Iterable<T> results, String failMsg)
{
assertObjects(expectedResults, results, failMsg);
}
public static <T> void assertExpectedObjects(Iterable<T> expectedResults, Sequence<T> results, String failMsg)
{
assertObjects(expectedResults, Sequences.toList(results, Lists.<T>newArrayList()), failMsg);
}
private static <T> void assertResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> actualResults, String failMsg)
{
Iterator<? extends Result> resultsIter = actualResults.iterator();
@ -86,6 +96,46 @@ public class TestHelper
}
}
private static <T> void assertObjects(Iterable<T> expectedResults, Iterable<T> actualResults, String failMsg)
{
Iterator resultsIter = actualResults.iterator();
Iterator resultsIter2 = actualResults.iterator();
Iterator expectedResultsIter = expectedResults.iterator();
while (resultsIter.hasNext() && resultsIter2.hasNext() && expectedResultsIter.hasNext()) {
Object expectedNext = expectedResultsIter.next();
final Object next = resultsIter.next();
final Object next2 = resultsIter2.next();
Assert.assertEquals(failMsg, expectedNext, next);
Assert.assertEquals(
String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg),
expectedNext,
next2
);
}
if (resultsIter.hasNext()) {
Assert.fail(
String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
);
}
if (resultsIter2.hasNext()) {
Assert.fail(
String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
);
}
if (expectedResultsIter.hasNext()) {
Assert.fail(
String.format(
"%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
)
);
}
}
private static void assertResult(String msg, Result<?> expected, Result actual)
{
Assert.assertEquals(msg, expected, actual);

View File

@ -328,7 +328,7 @@ public class ServerManagerTest
}
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
return version;
}
@ -421,7 +421,7 @@ public class ServerManagerTest
}
@Override
public CacheStrategy<T, QueryType> getCacheStrategy(QueryType query)
public <Typer> CacheStrategy<T, Typer, QueryType> getCacheStrategy(QueryType query)
{
return null;
}

View File

@ -35,7 +35,7 @@ public class NoopSegmentLoader implements SegmentLoader
return new Segment()
{
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
return segment.getIdentifier();
}

View File

@ -22,7 +22,6 @@ package com.metamx.druid.query;
import com.google.common.collect.Lists;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
@ -35,13 +34,6 @@ import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexStorageAdapter;
import com.metamx.druid.index.v1.Index;
import com.metamx.druid.index.v1.IndexStorageAdapter;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.index.v1.MMappedIndexQueryableIndex;
import com.metamx.druid.index.v1.MMappedIndexStorageAdapter;
import com.metamx.druid.index.v1.QueryableIndexStorageAdapter;
import com.metamx.druid.index.v1.TestIndex;
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentSpec;
@ -132,7 +124,7 @@ public class QueryRunnerTestHelper
);
}
private static <T> QueryRunner<T> makeQueryRunner(
public static <T> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter
)

View File

@ -0,0 +1,215 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.group;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.Query;
import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerTestHelper;
import com.metamx.druid.query.dimension.DefaultDimensionSpec;
import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerTest
{
private final QueryRunner<Row> runner;
private GroupByQueryRunnerFactory factory;
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
new GroupByQueryEngine(
new GroupByQueryEngineConfig()
{
@Override
public int getMaxIntermediateRows()
{
return 10000;
}
},
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
)
);
return Lists.newArrayList(
Iterables.transform(
QueryRunnerTestHelper.makeQueryRunners(factory), new Function<Object, Object>()
{
@Override
public Object apply(@Nullable Object input)
{
return new Object[]{factory, ((Object[]) input)[0]};
}
}
)
);
}
public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner) {
this.factory = factory;
this.runner = runner;
}
@Test
public void testGroupBy() {
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.<Row>asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
Iterable<Row> results = Sequences.toList(runner.run(query), Lists.<Row>newArrayList());
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testMergeResults() {
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
final GroupByQuery fullQuery = builder.build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
new QueryRunner<Row>()
{
@Override
public Sequence run(Query<Row> query)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(runner.run(query1), runner.run(query2));
}
}
);
List<Row> expectedResults = Arrays.<Row>asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
);
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
}
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)
{
Preconditions.checkArgument(vals.length % 2 == 0);
Map<String, Object> theVals = Maps.newHashMap();
for (int i = 0; i < vals.length; i+=2) {
theVals.put(vals[i].toString(), vals[i+1]);
}
return new MapBasedRow(new DateTime(timestamp), theVals);
}
}

View File

@ -112,7 +112,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
MapBasedRow row = (MapBasedRow) input;
return new Result<TimeseriesResultValue>(
new DateTime(input.getTimestampFromEpoch()), new TimeseriesResultValue(row.getEvent())
row.getTimestamp(), new TimeseriesResultValue(row.getEvent())
);
}
}

View File

@ -0,0 +1,102 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.index.IncrementalIndexSegment;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.TestIndex;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerTestHelper;
import com.metamx.druid.query.segment.QuerySegmentSpecs;
import junit.framework.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
public class SegmentAnalyzerTest
{
@Test
public void testIncrementalDoesNotWork() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex())
);
Assert.assertEquals(0, results.size());
}
@Test
public void testMappedWorks() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new QueryableIndexSegment("test_1", TestIndex.getMMappedTestIndex())
);
Assert.assertEquals(1, results.size());
final SegmentAnalysis analysis = results.get(0);
Assert.assertEquals("test_1", analysis.getId());
final Map<String,ColumnAnalysis> columns = analysis.getColumns();
Assert.assertEquals(TestIndex.COLUMNS.length, columns.size()); // All columns including time
for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension.toLowerCase());
Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
Assert.assertTrue(dimension, columnAnalysis.getSize() > 0);
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
}
for (String metric : TestIndex.METRICS) {
final ColumnAnalysis columnAnalysis = columns.get(metric.toLowerCase());
Assert.assertEquals(metric, ValueType.FLOAT.name(), columnAnalysis.getType());
Assert.assertTrue(metric, columnAnalysis.getSize() > 0);
Assert.assertNull(metric, columnAnalysis.getCardinality());
}
}
/**
* *Awesome* method name auto-generated by IntelliJ! I love IntelliJ!
*
* @param index
* @return
*/
private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
{
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(), index
);
final SegmentMetadataQuery query = new SegmentMetadataQuery(
"test", QuerySegmentSpecs.create("2011/2012"), null, null, null
);
return Sequences.toList(query.run(runner), Lists.<SegmentAnalysis>newArrayList());
}
}

View File

@ -40,6 +40,7 @@ import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
@ -257,6 +258,46 @@ public class TimeseriesQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesWithTimeZone()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.intervals("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00")
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.granularity(new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Los_Angeles")))
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-03-31", DateTimeZone.forID("America/Los_Angeles")),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01T", DateTimeZone.forID("America/Los_Angeles")),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
)
)
);
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesWithVaryingGran()