1) Make SegmentMetadataQuery work

This commit is contained in:
Eric Tschetter 2013-01-29 11:29:26 -06:00
parent 5f18f368e3
commit f2cce28297
37 changed files with 875 additions and 1036 deletions

View File

@ -22,6 +22,7 @@ package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import org.codehaus.jackson.annotate.JsonProperty;
@ -72,7 +73,12 @@ public abstract class BaseQuery<T> implements Query<T>
@Override
public Sequence<T> run(QuerySegmentWalker walker)
{
return querySegmentSpec.lookup(this, walker).run(this);
return run(querySegmentSpec.lookup(this, walker));
}
public Sequence<T> run(QueryRunner<T> runner)
{
return runner.run(this);
}
@Override

View File

@ -20,6 +20,7 @@
package com.metamx.druid;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.search.SearchQuery;
@ -57,6 +58,8 @@ public interface Query<T>
public Sequence<T> run(QuerySegmentWalker walker);
public Sequence<T> run(QueryRunner<T> runner);
public List<Interval> getIntervals();
public Duration getDuration();

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
/**
*/
public class AllColumnIncluderator implements ColumnIncluderator
{
@Override
public boolean include(String columnName)
{
return true;
}
}

View File

@ -0,0 +1,119 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Preconditions;
import com.metamx.druid.index.column.ValueType;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
/**
*/
public class ColumnAnalysis
{
private static final String ERROR_PREFIX = "error:";
public static ColumnAnalysis error(String reason)
{
return new ColumnAnalysis(ERROR_PREFIX + reason, -1, null);
}
private final String type;
private final long size;
private final Integer cardinality;
@JsonCreator
public ColumnAnalysis(
@JsonProperty("type") ValueType type,
@JsonProperty("size") long size,
@JsonProperty("cardinality") Integer cardinality
)
{
this(type.name(), size, cardinality);
}
private ColumnAnalysis(
String type,
long size,
Integer cardinality
)
{
this.type = type;
this.size = size;
this.cardinality = cardinality;
}
@JsonProperty
public String getType()
{
return type;
}
@JsonProperty
public long getSize()
{
return size;
}
@JsonProperty
public Integer getCardinality()
{
return cardinality;
}
public boolean isError()
{
return type.startsWith(ERROR_PREFIX);
}
public ColumnAnalysis fold(ColumnAnalysis rhs)
{
if (rhs == null) {
return this;
}
if (!type.equals(rhs.getType())) {
return ColumnAnalysis.error("cannot_merge_diff_types");
}
Integer cardinality = getCardinality();
final Integer rhsCardinality = rhs.getCardinality();
if (cardinality == null) {
cardinality = rhsCardinality;
}
else {
if (rhsCardinality != null) {
cardinality = Math.max(cardinality, rhsCardinality);
}
}
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality);
}
@Override
public String toString()
{
return "ColumnAnalysis{" +
"type='" + type + '\'' +
", size=" + size +
", cardinality=" + cardinality +
'}';
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "none", value= NoneColumnIncluderator.class),
@JsonSubTypes.Type(name = "all", value= AllColumnIncluderator.class),
@JsonSubTypes.Type(name = "list", value= ListColumnIncluderator.class)
})
public interface ColumnIncluderator
{
public boolean include(String columnName);
}

View File

@ -0,0 +1,56 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.collect.Sets;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
*/
public class ListColumnIncluderator implements ColumnIncluderator
{
private final Set<String> columns;
@JsonCreator
public ListColumnIncluderator(
@JsonProperty("columns") List<String> columns
)
{
this.columns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
this.columns.addAll(columns);
}
@JsonProperty
public Set<String> getColumns()
{
return Collections.unmodifiableSet(columns);
}
@Override
public boolean include(String columnName)
{
return columns.contains(columnName);
}
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
/**
*/
public class NoneColumnIncluderator implements ColumnIncluderator
{
@Override
public boolean include(String columnName)
{
return false;
}
}

View File

@ -17,61 +17,34 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.result;
package com.metamx.druid.query.metadata;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
public class SegmentMetadataResultValue
public class SegmentAnalysis
{
public static class Dimension {
@JsonProperty public long size;
@JsonProperty public int cardinality;
@JsonCreator
public Dimension(
@JsonProperty("size") long size,
@JsonProperty("cardinality") int cardinality
)
{
this.size = size;
this.cardinality = cardinality;
}
}
public static class Metric {
@JsonProperty public String type;
@JsonProperty public long size;
@JsonCreator
public Metric(
@JsonProperty("type") String type,
@JsonProperty("size") long size
)
{
this.type = type;
this.size = size;
}
}
private final String id;
private final Map<String, Dimension> dimensions;
private final Map<String, Metric> metrics;
private final List<Interval> interval;
private final Map<String, ColumnAnalysis> columns;
private final long size;
@JsonCreator
public SegmentMetadataResultValue(
public SegmentAnalysis(
@JsonProperty("id") String id,
@JsonProperty("dimensions") Map<String, Dimension> dimensions,
@JsonProperty("metrics") Map<String, Metric> metrics,
@JsonProperty("intervals") List<Interval> interval,
@JsonProperty("columns") Map<String, ColumnAnalysis> columns,
@JsonProperty("size") long size
)
{
this.id = id;
this.dimensions = dimensions;
this.metrics = metrics;
this.interval = interval;
this.columns = columns;
this.size = size;
}
@ -82,15 +55,15 @@ public class SegmentMetadataResultValue
}
@JsonProperty
public Map<String, Dimension> getDimensions()
public List<Interval> getIntervals()
{
return dimensions;
return interval;
}
@JsonProperty
public Map<String, Metric> getMetrics()
public Map<String, ColumnAnalysis> getColumns()
{
return metrics;
return columns;
}
@JsonProperty
@ -98,4 +71,24 @@ public class SegmentMetadataResultValue
{
return size;
}
public String toDetailedString()
{
return "SegmentAnalysis{" +
"id='" + id + '\'' +
", interval=" + interval +
", columns=" + columns +
", size=" + size +
'}';
}
@Override
public String toString()
{
return "SegmentAnalysis{" +
"id='" + id + '\'' +
", interval=" + interval +
", size=" + size +
'}';
}
}

View File

@ -22,26 +22,40 @@ package com.metamx.druid.query.metadata;
import com.metamx.druid.BaseQuery;
import com.metamx.druid.Query;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Map;
public class SegmentMetadataQuery extends BaseQuery<Result<SegmentMetadataResultValue>>
public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
{
private final ColumnIncluderator toInclude;
private final boolean merge;
public SegmentMetadataQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("toInclude") ColumnIncluderator toInclude,
@JsonProperty("merge") Boolean merge,
@JsonProperty("context") Map<String, String> context
)
{
super(
dataSource,
querySegmentSpec,
context
);
super(dataSource, querySegmentSpec, context);
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
this.merge = merge == null ? false : merge;
}
@JsonProperty
public ColumnIncluderator getToInclude()
{
return toInclude;
}
@JsonProperty
public boolean isMerge()
{
return merge;
}
@Override
@ -57,22 +71,16 @@ public class SegmentMetadataQuery extends BaseQuery<Result<SegmentMetadataResult
}
@Override
public Query<Result<SegmentMetadataResultValue>> withOverriddenContext(Map<String, String> contextOverride)
public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
{
return new SegmentMetadataQuery(
getDataSource(),
getQuerySegmentSpec(),
computeOverridenContext(contextOverride)
getDataSource(), getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
);
}
@Override
public Query<Result<SegmentMetadataResultValue>> withQuerySegmentSpec(QuerySegmentSpec spec)
public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new SegmentMetadataQuery(
getDataSource(),
spec,
getContext()
);
return new SegmentMetadataQuery(getDataSource(), spec, toInclude, merge, getContext());
}
}

View File

@ -22,32 +22,116 @@ package com.metamx.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.metamx.common.guava.ConcatSequence;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.druid.Query;
import com.metamx.druid.collect.OrderedMergeSequence;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import com.metamx.druid.query.ResultMergeQueryRunner;
import com.metamx.druid.utils.JodaUtils;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
{
private static final TypeReference<Result<SegmentMetadataResultValue>> TYPE_REFERENCE = new TypeReference<Result<SegmentMetadataResultValue>>(){};
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){};
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(final QueryRunner<Result<SegmentMetadataResultValue>> runner)
public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
{
@Override
protected Ordering<SegmentAnalysis> makeOrdering(Query<SegmentAnalysis> query)
{
if (((SegmentMetadataQuery) query).isMerge()) {
// Merge everything always
return new Ordering<SegmentAnalysis>()
{
@Override
public int compare(
@Nullable SegmentAnalysis left, @Nullable SegmentAnalysis right
)
{
return 0;
}
};
}
return getOrdering(); // No two elements should be equal, so it should never merge
}
@Override
protected BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis> createMergeFn(final Query<SegmentAnalysis> inQ)
{
return new BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis>()
{
private final SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
@Override
public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
if (!query.isMerge()) {
throw new ISE("Merging when a merge isn't supposed to happen[%s], [%s]", arg1, arg2);
}
List<Interval> newIntervals = JodaUtils.condenseIntervals(
Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
);
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
final String columnName = entry.getKey();
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
rightColumnNames.remove(columnName);
}
for (String columnName : rightColumnNames) {
columns.put(columnName, rightColumns.get(columnName));
}
return new SegmentAnalysis("merged", newIntervals, columns, arg1.getSize() + arg2.getSize());
}
};
}
};
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{
return new OrderedMergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
}
@Override
@ -67,13 +151,7 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result
}
@Override
public Sequence<Result<SegmentMetadataResultValue>> mergeSequences(Sequence<Sequence<Result<SegmentMetadataResultValue>>> seqOfSequences)
{
return new ConcatSequence<Result<SegmentMetadataResultValue>>(seqOfSequences);
}
@Override
public Function<Result<SegmentMetadataResultValue>, Result<SegmentMetadataResultValue>> makeMetricManipulatorFn(
public Function<SegmentAnalysis, SegmentAnalysis> makeMetricManipulatorFn(
SegmentMetadataQuery query, MetricManipulationFn fn
)
{
@ -81,26 +159,38 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result
}
@Override
public TypeReference<Result<SegmentMetadataResultValue>> getResultTypeReference()
public TypeReference<SegmentAnalysis> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Result<SegmentMetadataResultValue>, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
public CacheStrategy<SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
{
return null;
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> preMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
public QueryRunner<SegmentAnalysis> preMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
{
return runner;
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> postMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
public QueryRunner<SegmentAnalysis> postMergeQueryDecoration(QueryRunner<SegmentAnalysis> runner)
{
return runner;
}
private Ordering<SegmentAnalysis> getOrdering()
{
return new Ordering<SegmentAnalysis>()
{
@Override
public int compare(SegmentAnalysis left, SegmentAnalysis right)
{
return left.getId().compareTo(right.getId());
}
};
}
}

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.segment;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.List;
/**
*/
public class QuerySegmentSpecs
{
public static QuerySegmentSpec create(String isoInterval)
{
return new LegacySegmentSpec(isoInterval);
}
public static QuerySegmentSpec create(Interval interval)
{
return create(Arrays.asList(interval));
}
public static QuerySegmentSpec create(List<Interval> intervals)
{
return new MultipleIntervalSegmentSpec(intervals);
}
}

View File

@ -102,10 +102,7 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
@Override
public Sequence<Result<TimeseriesResultValue>> mergeSequences(Sequence<Sequence<Result<TimeseriesResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(
getOrdering(),
seqOfSequences
);
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
}
@Override

View File

@ -25,5 +25,7 @@ import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
*/
public interface BitmapIndex
{
public int getCardinality();
public String getValue(int index);
public ImmutableConciseSet getConciseSet(String value);
}

View File

@ -25,7 +25,7 @@ import com.metamx.druid.kv.IndexedInts;
*/
public interface DictionaryEncodedColumn
{
public int size();
public int length();
public boolean hasMultipleValues();
public int getSingleValueRow(int rowNum);
public IndexedInts getMultiValueRow(int rowNum);

View File

@ -29,7 +29,7 @@ import java.io.Closeable;
*/
public interface GenericColumn extends Closeable
{
public int size();
public int length();
public ValueType getType();
public boolean hasMultipleValues();

View File

@ -38,7 +38,7 @@ public class IndexedFloatsGenericColumn implements GenericColumn
}
@Override
public int size()
public int length()
{
return column.size();
}

View File

@ -38,7 +38,7 @@ public class IndexedLongsGenericColumn implements GenericColumn
}
@Override
public int size()
public int length()
{
return column.size();
}

View File

@ -62,7 +62,7 @@ class SimpleColumn implements Column
GenericColumn column = null;
try {
column = genericColumn.get();
return column.size();
return column.length();
}
finally {
Closeables.closeQuietly(column);

View File

@ -44,7 +44,7 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
}
@Override
public int size()
public int length()
{
return hasMultipleValues() ? multiValueColumn.size() : column.size();
}

View File

@ -67,7 +67,7 @@ public class StringMultiValueColumn extends AbstractColumn
return new DictionaryEncodedColumn()
{
@Override
public int size()
public int length()
{
return column.size();
}

View File

@ -22,6 +22,7 @@ package com.metamx.druid.index.serde;
import com.google.common.base.Supplier;
import com.metamx.druid.index.column.BitmapIndex;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
/**
@ -46,6 +47,18 @@ public class BitmapIndexColumnPartSupplier implements Supplier<BitmapIndex>
{
return new BitmapIndex()
{
@Override
public int getCardinality()
{
return dictionary.size();
}
@Override
public String getValue(int index)
{
return dictionary.get(index);
}
@Override
public ImmutableConciseSet getConciseSet(String value)
{

View File

@ -19,6 +19,7 @@
package com.metamx.druid.index.v1.serde;
import com.google.common.base.Function;
import com.metamx.druid.index.column.ColumnBuilder;
import com.metamx.druid.index.serde.ColumnPartSerde;
import com.metamx.druid.kv.ObjectStrategy;
@ -27,10 +28,10 @@ import java.nio.ByteBuffer;
/**
*/
public interface ComplexMetricSerde
public abstract class ComplexMetricSerde
{
public String getTypeName();
public ComplexMetricExtractor getExtractor();
public abstract String getTypeName();
public abstract ComplexMetricExtractor getExtractor();
/**
* Deserializes a ByteBuffer and adds it to the ColumnBuilder. This method allows for the ComplexMetricSerde
@ -42,7 +43,7 @@ public interface ComplexMetricSerde
* @param buffer the buffer to deserialize
* @return a ColumnPartSerde that can serialize out the object that was read from the buffer to the builder
*/
public ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder);
public abstract ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder);
/**
* This is deprecated because its usage is going to be removed from the code.
@ -55,5 +56,20 @@ public interface ComplexMetricSerde
* @return an ObjectStrategy as used by GenericIndexed
*/
@Deprecated
public ObjectStrategy getObjectStrategy();
public abstract ObjectStrategy getObjectStrategy();
/**
* Returns a function that can convert the Object provided by the ComplexColumn created through deserializeColumn
* into a number of expected input bytes to produce that object.
*
* This is used to approximate the size of the input data via the SegmentMetadataQuery and does not need to be
* overridden if you do not care about the query.
*
* @return A function that can compute the size of the complex object or null if you cannot/do not want to compute it
*/
public Function<Object, Long> inputSizeFn()
{
return null;
}
}

View File

@ -33,7 +33,6 @@ import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerView;
@ -44,9 +43,6 @@ import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.index.v1.MMappedIndexQueryableIndex;
import com.metamx.druid.index.v1.MMappedIndexStorageAdapter;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.query.MetricsEmittingQueryRunner;
import com.metamx.druid.query.QueryRunner;

View File

@ -29,7 +29,6 @@ import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.StorageAdapterLoadingException;
import com.metamx.druid.partition.PartitionChunk;
@ -330,7 +329,7 @@ public class ServerManager implements QuerySegmentWalker
}
},
new BySegmentQueryRunner<T>(
adapter.getSegmentIdentifier(),
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
)

View File

@ -38,7 +38,7 @@ public class IncrementalIndexSegment implements Segment
}
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
throw new UnsupportedOperationException();
}

View File

@ -37,7 +37,7 @@ public class QueryableIndexSegment implements Segment
}
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
return identifier;
}

View File

@ -26,7 +26,7 @@ import org.joda.time.Interval;
*/
public interface Segment
{
public String getSegmentIdentifier();
public String getIdentifier();
public Interval getDataInterval();
public QueryableIndex asQueryableIndex();
public StorageAdapter asStorageAdapter();

View File

@ -1,103 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumnImpl;
import com.metamx.druid.index.column.FloatColumn;
import com.metamx.druid.index.column.LongColumn;
import com.metamx.druid.index.column.StringMultiValueColumn;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.VSizeIndexed;
import org.joda.time.Interval;
/**
*/
public class MMappedIndexQueryableIndex implements QueryableIndex
{
private final MMappedIndex index;
public MMappedIndexQueryableIndex(
MMappedIndex index
)
{
this.index = index;
}
public MMappedIndex getIndex()
{
return index;
}
@Override
public Interval getDataInterval()
{
return index.getDataInterval();
}
@Override
public int getNumRows()
{
return index.getTimestamps().size();
}
@Override
public Indexed<String> getColumnNames()
{
return null;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return index.getAvailableDimensions();
}
@Override
public Column getTimeColumn()
{
return new LongColumn(index.timestamps);
}
@Override
public Column getColumn(String columnName)
{
final MetricHolder metricHolder = index.getMetricHolder(columnName);
if (metricHolder == null) {
final VSizeIndexed dimColumn = index.getDimColumn(columnName);
if (dimColumn == null) {
return null;
}
return new StringMultiValueColumn(
index.getDimValueLookup(columnName),
dimColumn,
index.getInvertedIndexes().get(columnName)
);
}
else if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) {
return new FloatColumn(metricHolder.floatType);
}
else {
return new ComplexColumnImpl(metricHolder.getTypeName(), metricHolder.getComplexType());
}
}
}

View File

@ -1,666 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.collect.MoreIterators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.Capabilities;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.index.brita.BitmapIndexSelector;
import com.metamx.druid.index.brita.Filter;
import com.metamx.druid.index.v1.processing.Cursor;
import com.metamx.druid.index.v1.processing.DimensionSelector;
import com.metamx.druid.index.v1.processing.Offset;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedFloats;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
/**
*/
public class MMappedIndexStorageAdapter extends BaseStorageAdapter
{
private final MMappedIndex index;
public MMappedIndexStorageAdapter(
MMappedIndex index
)
{
this.index = index;
}
public MMappedIndex getIndex()
{
return index;
}
@Override
public String getSegmentIdentifier()
{
throw new UnsupportedOperationException();
}
@Override
public Interval getInterval()
{
return index.getDataInterval();
}
@Override
public int getDimensionCardinality(String dimension)
{
final Indexed<String> dimValueLookup = index.getDimValueLookup(dimension.toLowerCase());
if (dimValueLookup == null) {
return 0;
}
return dimValueLookup.size();
}
@Override
public DateTime getMinTime()
{
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final DateTime retVal = new DateTime(timestamps.get(0));
Closeables.closeQuietly(timestamps);
return retVal;
}
@Override
public DateTime getMaxTime()
{
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final DateTime retVal = new DateTime(timestamps.get(timestamps.size() - 1));
Closeables.closeQuietly(timestamps);
return retVal;
}
@Override
public Capabilities getCapabilities()
{
return Capabilities.builder().dimensionValuesSorted(true).build();
}
@Override
public Iterable<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
{
Interval actualInterval = interval;
if (!actualInterval.overlaps(index.dataInterval)) {
return ImmutableList.of();
}
if (actualInterval.getStart().isBefore(index.dataInterval.getStart())) {
actualInterval = actualInterval.withStart(index.dataInterval.getStart());
}
if (actualInterval.getEnd().isAfter(index.dataInterval.getEnd())) {
actualInterval = actualInterval.withEnd(index.dataInterval.getEnd());
}
final Iterable<Cursor> iterable;
if (filter == null) {
iterable = new NoFilterCursorIterable(index, actualInterval, gran);
} else {
Offset offset = new ConciseOffset(filter.goConcise(new MMappedBitmapIndexSelector(index)));
iterable = new CursorIterable(index, actualInterval, gran, offset);
}
return FunctionalIterable.create(iterable).keep(Functions.<Cursor>identity());
}
@Override
public Indexed<String> getAvailableDimensions()
{
return index.getAvailableDimensions();
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
return index.getDimValueLookup(dimension.toLowerCase());
}
@Override
public ImmutableConciseSet getInvertedIndex(String dimension, String dimVal)
{
return index.getInvertedIndex(dimension.toLowerCase(), dimVal);
}
@Override
public Offset getFilterOffset(Filter filter)
{
return new ConciseOffset(
filter.goConcise(
new MMappedBitmapIndexSelector(index)
)
);
}
private static class CursorIterable implements Iterable<Cursor>
{
private final MMappedIndex index;
private final Interval interval;
private final QueryGranularity gran;
private final Offset offset;
public CursorIterable(
MMappedIndex index,
Interval interval,
QueryGranularity gran,
Offset offset
)
{
this.index = index;
this.interval = interval;
this.gran = gran;
this.offset = offset;
}
@Override
public Iterator<Cursor> iterator()
{
final Offset baseOffset = offset.clone();
final Map<String, Object> metricHolderCache = Maps.newHashMap();
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
.create(gran.iterable(interval.getStartMillis(), interval.getEndMillis()).iterator())
.transform(
new Function<Long, Cursor>()
{
@Override
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (baseOffset.withinBounds()
&& timestamps.get(baseOffset.getOffset()) < timeStart) {
baseOffset.increment();
}
final Offset offset = new TimestampCheckingOffset(
baseOffset, timestamps, Math.min(interval.getEndMillis(), gran.next(timeStart))
);
return new Cursor()
{
private final Offset initOffset = offset.clone();
private final DateTime myBucket = gran.toDateTime(input);
private Offset cursorOffset = offset;
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advance()
{
cursorOffset.increment();
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public void reset()
{
cursorOffset = initOffset.clone();
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
final String dimensionName = dimension.toLowerCase();
final Indexed<? extends IndexedInts> rowVals = index.getDimColumn(dimensionName);
final Indexed<String> dimValueLookup = index.getDimValueLookup(dimensionName);
if (rowVals == null) {
return null;
}
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
return rowVals.get(cursorOffset.getOffset());
}
@Override
public int getValueCardinality()
{
return dimValueLookup.size();
}
@Override
public String lookupName(int id)
{
final String retVal = dimValueLookup.get(id);
return retVal == null ? "" : retVal;
}
@Override
public int lookupId(String name)
{
return ("".equals(name)) ? dimValueLookup.indexOf(null) : dimValueLookup.indexOf(name);
}
};
}
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
IndexedFloats cachedMetricVals = (IndexedFloats) metricHolderCache.get(metricName);
if (cachedMetricVals == null) {
MetricHolder holder = index.getMetricHolder(metricName);
if (holder != null) {
cachedMetricVals = holder.getFloatType();
metricHolderCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return new FloatMetricSelector()
{
@Override
public float get()
{
return 0.0f;
}
};
}
final IndexedFloats metricVals = cachedMetricVals;
return new FloatMetricSelector()
{
@Override
public float get()
{
return metricVals.get(cursorOffset.getOffset());
}
};
}
@Override
public ComplexMetricSelector makeComplexMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
Indexed cachedMetricVals = (Indexed) metricHolderCache.get(metricName);
if (cachedMetricVals == null) {
MetricHolder holder = index.getMetricHolder(metricName);
if (holder != null) {
cachedMetricVals = holder.getComplexType();
metricHolderCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return null;
}
final Indexed metricVals = cachedMetricVals;
return new ComplexMetricSelector()
{
@Override
public Class classOfObject()
{
return metricVals.getClazz();
}
@Override
public Object get()
{
return metricVals.get(cursorOffset.getOffset());
}
};
}
};
}
}
);
// This after call is not perfect, if there is an exception during processing, it will never get called,
// but it's better than nothing and doing this properly all the time requires a lot more fixerating
return MoreIterators.after(
retVal,
new Runnable()
{
@Override
public void run()
{
Closeables.closeQuietly(timestamps);
for (Object object : metricHolderCache.values()) {
if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object);
}
}
}
}
);
}
}
private static class TimestampCheckingOffset implements Offset
{
private final Offset baseOffset;
private final IndexedLongs timestamps;
private final long threshold;
public TimestampCheckingOffset(
Offset baseOffset,
IndexedLongs timestamps,
long threshold
)
{
this.baseOffset = baseOffset;
this.timestamps = timestamps;
this.threshold = threshold;
}
@Override
public int getOffset()
{
return baseOffset.getOffset();
}
@Override
public Offset clone()
{
return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold);
}
@Override
public boolean withinBounds()
{
return baseOffset.withinBounds() && timestamps.get(baseOffset.getOffset()) < threshold;
}
@Override
public void increment()
{
baseOffset.increment();
}
}
private static class NoFilterCursorIterable implements Iterable<Cursor>
{
private final MMappedIndex index;
private final Interval interval;
private final QueryGranularity gran;
public NoFilterCursorIterable(
MMappedIndex index,
Interval interval,
QueryGranularity gran
)
{
this.index = index;
this.interval = interval;
this.gran = gran;
}
/**
* This produces iterators of Cursor objects that must be fully processed (until isDone() returns true) before the
* next Cursor is processed. It is *not* safe to pass these cursors off to another thread for parallel processing
*
* @return
*/
@Override
public Iterator<Cursor> iterator()
{
final Map<String, Object> metricCacheMap = Maps.newHashMap();
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final FunctionalIterator<Cursor> retVal = FunctionalIterator
.create(gran.iterable(interval.getStartMillis(), interval.getEndMillis()).iterator())
.transform(
new Function<Long, Cursor>()
{
private int currRow = 0;
@Override
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (currRow < timestamps.size() && timestamps.get(currRow) < timeStart) {
++currRow;
}
return new Cursor()
{
private final DateTime myBucket = gran.toDateTime(input);
private final long nextBucket = Math.min(gran.next(myBucket.getMillis()), interval.getEndMillis());
private final int initRow = currRow;
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advance()
{
++currRow;
}
@Override
public boolean isDone()
{
return currRow >= timestamps.size() || timestamps.get(currRow) >= nextBucket;
}
@Override
public void reset()
{
currRow = initRow;
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
final String dimensionName = dimension.toLowerCase();
final Indexed<? extends IndexedInts> rowVals = index.getDimColumn(dimensionName);
final Indexed<String> dimValueLookup = index.getDimValueLookup(dimensionName);
if (rowVals == null) {
return null;
}
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
return rowVals.get(currRow);
}
@Override
public int getValueCardinality()
{
return dimValueLookup.size();
}
@Override
public String lookupName(int id)
{
final String retVal = dimValueLookup.get(id);
return retVal == null ? "" : retVal;
}
@Override
public int lookupId(String name)
{
return ("".equals(name)) ? dimValueLookup.indexOf(null) : dimValueLookup.indexOf(name);
}
};
}
@Override
public FloatMetricSelector makeFloatMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
IndexedFloats cachedMetricVals = (IndexedFloats) metricCacheMap.get(metricName);
if (cachedMetricVals == null) {
final MetricHolder metricHolder = index.getMetricHolder(metricName);
if (metricHolder != null) {
cachedMetricVals = metricHolder.getFloatType();
if (cachedMetricVals != null) {
metricCacheMap.put(metricName, cachedMetricVals);
}
}
}
if (cachedMetricVals == null) {
return new FloatMetricSelector()
{
@Override
public float get()
{
return 0.0f;
}
};
}
final IndexedFloats metricVals = cachedMetricVals;
return new FloatMetricSelector()
{
@Override
public float get()
{
return metricVals.get(currRow);
}
};
}
@Override
public ComplexMetricSelector makeComplexMetricSelector(String metric)
{
final String metricName = metric.toLowerCase();
Indexed cachedMetricVals = (Indexed) metricCacheMap.get(metricName);
if (cachedMetricVals == null) {
final MetricHolder metricHolder = index.getMetricHolder(metricName);
if (metricHolder != null) {
cachedMetricVals = metricHolder.getComplexType();
if (cachedMetricVals != null) {
metricCacheMap.put(metricName, cachedMetricVals);
}
}
}
if (cachedMetricVals == null) {
return null;
}
final Indexed metricVals = cachedMetricVals;
return new ComplexMetricSelector()
{
@Override
public Class classOfObject()
{
return metricVals.getClazz();
}
@Override
public Object get()
{
return metricVals.get(currRow);
}
};
}
};
}
}
);
return MoreIterators.after(
retVal,
new Runnable()
{
@Override
public void run()
{
Closeables.closeQuietly(timestamps);
for (Object object : metricCacheMap.values()) {
if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object);
}
}
}
}
);
}
}
private static class MMappedBitmapIndexSelector implements BitmapIndexSelector
{
private final MMappedIndex index;
public MMappedBitmapIndexSelector(final MMappedIndex index)
{
this.index = index;
}
@Override
public Indexed<String> getDimensionValues(String dimension)
{
return index.getDimValueLookup(dimension.toLowerCase());
}
@Override
public int getNumRows()
{
return index.getReadOnlyTimestamps().size();
}
@Override
public ImmutableConciseSet getConciseInvertedIndex(String dimension, String value)
{
return index.getInvertedIndex(dimension.toLowerCase(), value);
}
}
}

View File

@ -113,7 +113,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
GenericColumn column = null;
try {
column = index.getTimeColumn().getGenericColumn();
return new DateTime(column.getLongSingleValueRow(column.size() - 1));
return new DateTime(column.getLongSingleValueRow(column.length() - 1));
}
finally {
Closeables.closeQuietly(column);
@ -572,7 +572,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
public Cursor apply(final Long input)
{
final long timeStart = Math.max(interval.getStartMillis(), input);
while (currRow < timestamps.size() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
while (currRow < timestamps.length() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
++currRow;
}
@ -597,7 +597,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
@Override
public boolean isDone()
{
return currRow >= timestamps.size() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
return currRow >= timestamps.length() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
}
@Override
@ -848,7 +848,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
GenericColumn column = null;
try {
column = index.getTimeColumn().getGenericColumn();
return column.size();
return column.length();
}
finally {
Closeables.closeQuietly(column);

View File

@ -0,0 +1,160 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Longs;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.column.BitmapIndex;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ColumnCapabilities;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import java.util.Map;
public class SegmentAnalyzer
{
private static final Logger log = new Logger(SegmentAnalyzer.class);
/**
* This is based on the minimum size of a timestamp (POSIX seconds). An ISO timestamp will actually be more like 24+
*/
private static final int NUM_BYTES_IN_TIMESTAMP = 10;
/**
* This is based on assuming 6 units of precision, one decimal point and a single value left of the decimal
*/
private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
public Map<String, ColumnAnalysis> analyze(QueryableIndex index)
{
Preconditions.checkNotNull(index, "Index cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
for (String columnName : index.getColumnNames()) {
final Column column = index.getColumn(columnName);
final ColumnCapabilities capabilities = column.getCapabilities();
final ColumnAnalysis analysis;
final ValueType type = capabilities.getType();
switch(type) {
case LONG:
analysis = analyzeLongColumn(column);
break;
case FLOAT:
analysis = analyzeFloatColumn(column);
break;
case STRING:
analysis = analyzeStringColumn(column);
break;
case COMPLEX:
analysis = analyzeComplexColumn(column);
break;
default:
log.warn("Unknown column type[%s].", type);
analysis = ColumnAnalysis.error(String.format("unknown_type_%s", type));
}
columns.put(columnName, analysis);
}
columns.put("__time", lengthBasedAnalysis(index.getTimeColumn(), NUM_BYTES_IN_TIMESTAMP));
return columns;
}
public ColumnAnalysis analyzeLongColumn(Column column)
{
return lengthBasedAnalysis(column, Longs.BYTES);
}
public ColumnAnalysis analyzeFloatColumn(Column column)
{
return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT);
}
private ColumnAnalysis lengthBasedAnalysis(Column column, final int numBytes)
{
final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null);
}
public ColumnAnalysis analyzeStringColumn(Column column)
{
final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasBitmapIndexes()) {
final BitmapIndex bitmapIndex = column.getBitmapIndex();
int cardinality = bitmapIndex.getCardinality();
long size = 0;
for (int i = 0; i < cardinality; ++i) {
String value = bitmapIndex.getValue(i);
if (value != null) {
size += value.getBytes(Charsets.UTF_8).length * bitmapIndex.getConciseSet(value).size();
}
}
return new ColumnAnalysis(capabilities.getType(), size, cardinality);
}
return ColumnAnalysis.error("string_no_bitmap");
}
public ColumnAnalysis analyzeComplexColumn(Column column)
{
final ColumnCapabilities capabilities = column.getCapabilities();
final ComplexColumn complexColumn = column.getComplexColumn();
final String typeName = complexColumn.getTypeName();
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
}
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
if (inputSizeFn == null) {
return ColumnAnalysis.error("noSizeFn");
}
final int length = column.getLength();
long size = 0;
for (int i = 0; i < length; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
return new ColumnAnalysis(capabilities.getType(), size, null);
}
}

View File

@ -1,109 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.SimpleSequence;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
public class SegmentMetadataQueryEngine
{
public Sequence<Result<SegmentMetadataResultValue>> process(
final SegmentMetadataQuery query,
StorageAdapter storageAdapter
)
{
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}
if(!(storageAdapter instanceof SegmentIdAttachedStorageAdapter) ||
!(((SegmentIdAttachedStorageAdapter)storageAdapter).getDelegate() instanceof BaseStorageAdapter)) {
return Sequences.empty();
}
final BaseStorageAdapter adapter = (BaseStorageAdapter)
((SegmentIdAttachedStorageAdapter) storageAdapter).getDelegate();
Function<String, SegmentMetadataResultValue.Dimension> sizeDimension = new Function<String, SegmentMetadataResultValue.Dimension>()
{
@Override
public SegmentMetadataResultValue.Dimension apply(@Nullable String input)
{
long size = 0;
final Indexed<String> lookup = adapter.getDimValueLookup(input);
for (String dimVal : lookup) {
ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal);
size += (dimVal == null) ? 0 : index.size() * Charsets.UTF_8.encode(dimVal).capacity();
}
return new SegmentMetadataResultValue.Dimension(
size,
adapter.getDimensionCardinality(input)
);
}
};
// TODO: add metric storage size
long totalSize = 0;
HashMap<String, SegmentMetadataResultValue.Dimension> dimensions = Maps.newHashMap();
for(String input : adapter.getAvailableDimensions()) {
SegmentMetadataResultValue.Dimension d = sizeDimension.apply(input);
dimensions.put(input, d);
totalSize += d.size;
}
return new SimpleSequence<Result<SegmentMetadataResultValue>>(
ImmutableList.of(
new Result<SegmentMetadataResultValue>(
adapter.getMinTime(),
new SegmentMetadataResultValue(
storageAdapter.getSegmentIdentifier(),
dimensions,
ImmutableMap.<String, SegmentMetadataResultValue.Metric>of(),
totalSize
)
)
)
);
}
}

View File

@ -21,83 +21,105 @@ package com.metamx.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.ISE;
import com.google.common.collect.Maps;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.Segment;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryEngine;
import com.metamx.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import com.metamx.druid.result.SegmentMetadataResultValue;
import com.metamx.druid.result.Result;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest()
{
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(QueryRunner<Result<SegmentMetadataResultValue>> runner)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
}
};
private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest();
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> createRunner(final Segment adapter)
public QueryRunner<SegmentAnalysis> createRunner(final Segment segment)
{
return new QueryRunner<Result<SegmentMetadataResultValue>>()
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> run(Query<Result<SegmentMetadataResultValue>> query)
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ)
{
if (!(query instanceof SegmentMetadataQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), SegmentMetadataQuery.class);
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
final QueryableIndex index = segment.asQueryableIndex();
if (index == null) {
return Sequences.empty();
}
return new SegmentMetadataQueryEngine().process((SegmentMetadataQuery) query, adapter.asStorageAdapter());
final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(index);
// Initialize with the size of the whitespace, 1 byte per
long totalSize = analyzedColumns.size() * index.getNumRows();
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
ColumnIncluderator includerator = query.getToInclude();
for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
final String columnName = entry.getKey();
final ColumnAnalysis column = entry.getValue();
if (!column.isError()) {
totalSize += column.getSize();
}
if (includerator.include(columnName)) {
columns.put(columnName, column);
}
}
return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
segment.getIdentifier(),
Arrays.asList(segment.getDataInterval()),
columns,
totalSize
)
)
);
}
};
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeRunners(
final ExecutorService queryExecutor, Iterable<QueryRunner<Result<SegmentMetadataResultValue>>> queryRunners
public QueryRunner<SegmentAnalysis> mergeRunners(
final ExecutorService queryExecutor, Iterable<QueryRunner<SegmentAnalysis>> queryRunners
)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(
return new ConcatQueryRunner<SegmentAnalysis>(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<Result<SegmentMetadataResultValue>>, QueryRunner<Result<SegmentMetadataResultValue>>>()
new Function<QueryRunner<SegmentAnalysis>, QueryRunner<SegmentAnalysis>>()
{
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> apply(final QueryRunner<Result<SegmentMetadataResultValue>> input)
public QueryRunner<SegmentAnalysis> apply(final QueryRunner<SegmentAnalysis> input)
{
return new QueryRunner<Result<SegmentMetadataResultValue>>()
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> run(final Query<Result<SegmentMetadataResultValue>> query)
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
{
Future<Sequence<Result<SegmentMetadataResultValue>>> future = queryExecutor.submit(
new Callable<Sequence<Result<SegmentMetadataResultValue>>>()
Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new Callable<Sequence<SegmentAnalysis>>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> call() throws Exception
public Sequence<SegmentAnalysis> call() throws Exception
{
return new ExecutorExecutingSequence<Result<SegmentMetadataResultValue>>(
return new ExecutorExecutingSequence<SegmentAnalysis>(
input.run(query),
queryExecutor
);
@ -122,7 +144,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Res
}
@Override
public QueryToolChest getToolchest()
public QueryToolChest<SegmentAnalysis, SegmentMetadataQuery> getToolchest()
{
return toolChest;
}

View File

@ -328,7 +328,7 @@ public class ServerManagerTest
}
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
return version;
}

View File

@ -35,7 +35,7 @@ public class NoopSegmentLoader implements SegmentLoader
return new Segment()
{
@Override
public String getSegmentIdentifier()
public String getIdentifier()
{
return segment.getIdentifier();
}

View File

@ -22,7 +22,6 @@ package com.metamx.druid.query;
import com.google.common.collect.Lists;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
@ -35,13 +34,6 @@ import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexStorageAdapter;
import com.metamx.druid.index.v1.Index;
import com.metamx.druid.index.v1.IndexStorageAdapter;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.index.v1.MMappedIndexQueryableIndex;
import com.metamx.druid.index.v1.MMappedIndexStorageAdapter;
import com.metamx.druid.index.v1.QueryableIndexStorageAdapter;
import com.metamx.druid.index.v1.TestIndex;
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentSpec;
@ -132,7 +124,7 @@ public class QueryRunnerTestHelper
);
}
private static <T> QueryRunner<T> makeQueryRunner(
public static <T> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter
)

View File

@ -0,0 +1,102 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.index.IncrementalIndexSegment;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.TestIndex;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerTestHelper;
import com.metamx.druid.query.segment.QuerySegmentSpecs;
import junit.framework.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
public class SegmentAnalyzerTest
{
@Test
public void testIncrementalDoesNotWork() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex())
);
Assert.assertEquals(0, results.size());
}
@Test
public void testMappedWorks() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new QueryableIndexSegment("test_1", TestIndex.getMMappedTestIndex())
);
Assert.assertEquals(1, results.size());
final SegmentAnalysis analysis = results.get(0);
Assert.assertEquals("test_1", analysis.getId());
final Map<String,ColumnAnalysis> columns = analysis.getColumns();
Assert.assertEquals(TestIndex.COLUMNS.length, columns.size()); // All columns including time
for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension.toLowerCase());
Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
Assert.assertTrue(dimension, columnAnalysis.getSize() > 0);
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
}
for (String metric : TestIndex.METRICS) {
final ColumnAnalysis columnAnalysis = columns.get(metric.toLowerCase());
Assert.assertEquals(metric, ValueType.FLOAT.name(), columnAnalysis.getType());
Assert.assertTrue(metric, columnAnalysis.getSize() > 0);
Assert.assertNull(metric, columnAnalysis.getCardinality());
}
}
/**
* *Awesome* method name auto-generated by IntelliJ! I love IntelliJ!
*
* @param index
* @return
*/
private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
{
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(), index
);
final SegmentMetadataQuery query = new SegmentMetadataQuery(
"test", QuerySegmentSpecs.create("2011/2012"), null, null, null
);
return Sequences.toList(query.run(runner), Lists.<SegmentAnalysis>newArrayList());
}
}