Merge pull request #44 from metamx/segment-metadata-query

Barebones segment metadata query
This commit is contained in:
cheddar 2012-12-12 15:33:50 -08:00
commit 55652861da
6 changed files with 525 additions and 0 deletions

View File

@ -0,0 +1,78 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.metamx.druid.BaseQuery;
import com.metamx.druid.Query;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Map;
public class SegmentMetadataQuery extends BaseQuery<Result<SegmentMetadataResultValue>>
{
public SegmentMetadataQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("context") Map<String, String> context
)
{
super(
dataSource,
querySegmentSpec,
context
);
}
@Override
public boolean hasFilters()
{
return false;
}
@Override
public String getType()
{
return "segmentMetadata";
}
@Override
public Query<Result<SegmentMetadataResultValue>> withOverriddenContext(Map<String, String> contextOverride)
{
return new SegmentMetadataQuery(
getDataSource(),
getQuerySegmentSpec(),
computeOverridenContext(contextOverride)
);
}
@Override
public Query<Result<SegmentMetadataResultValue>> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new SegmentMetadataQuery(
getDataSource(),
spec,
getContext()
);
}
}

View File

@ -0,0 +1,106 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.Interval;
import org.joda.time.Minutes;
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
{
private static final TypeReference<Result<SegmentMetadataResultValue>> TYPE_REFERENCE = new TypeReference<Result<SegmentMetadataResultValue>>(){};
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(final QueryRunner<Result<SegmentMetadataResultValue>> runner)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
}
@Override
public Sequence<Result<SegmentMetadataResultValue>> mergeSequences(Sequence<Sequence<Result<SegmentMetadataResultValue>>> seqOfSequences)
{
return new ConcatSequence<Result<SegmentMetadataResultValue>>(seqOfSequences);
}
@Override
public Function<Result<SegmentMetadataResultValue>, Result<SegmentMetadataResultValue>> makeMetricManipulatorFn(
SegmentMetadataQuery query, MetricManipulationFn fn
)
{
return Functions.identity();
}
@Override
public TypeReference<Result<SegmentMetadataResultValue>> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Result<SegmentMetadataResultValue>, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
{
return null;
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> preMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
{
return runner;
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> postMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
{
return runner;
}
}

View File

@ -0,0 +1,101 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.result;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Map;
public class SegmentMetadataResultValue
{
public static class Dimension {
@JsonProperty public long size;
@JsonProperty public int cardinality;
@JsonCreator
public Dimension(
@JsonProperty("size") long size,
@JsonProperty("cardinality") int cardinality
)
{
this.size = size;
this.cardinality = cardinality;
}
}
public static class Metric {
@JsonProperty public String type;
@JsonProperty public long size;
@JsonCreator
public Metric(
@JsonProperty("type") String type,
@JsonProperty("size") long size
)
{
this.type = type;
this.size = size;
}
}
private final String id;
private final Map<String, Dimension> dimensions;
private final Map<String, Metric> metrics;
private final long size;
@JsonCreator
public SegmentMetadataResultValue(
@JsonProperty("id") String id,
@JsonProperty("dimensions") Map<String, Dimension> dimensions,
@JsonProperty("metrics") Map<String, Metric> metrics,
@JsonProperty("size") long size
)
{
this.id = id;
this.dimensions = dimensions;
this.metrics = metrics;
this.size = size;
}
@JsonProperty
public String getId()
{
return id;
}
@JsonProperty
public Map<String, Dimension> getDimensions()
{
return dimensions;
}
@JsonProperty
public Map<String, Metric> getMetrics()
{
return metrics;
}
@JsonProperty
public long getSize()
{
return size;
}
}

View File

@ -41,6 +41,8 @@ import com.metamx.druid.loading.StorageAdapterLoader;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryRunnerFactory;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQueryRunnerFactory;
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
@ -144,6 +146,7 @@ public class ServerInit
);
queryRunners.put(SearchQuery.class, new SearchQueryRunnerFactory());
queryRunners.put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory());
queryRunners.put(SegmentMetadataQuery.class, new SegmentMetadataQueryRunnerFactory());
return queryRunners;
}

View File

@ -0,0 +1,109 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.SimpleSequence;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SegmentMetadataResultValue;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
public class SegmentMetadataQueryEngine
{
public Sequence<Result<SegmentMetadataResultValue>> process(
final SegmentMetadataQuery query,
StorageAdapter storageAdapter
)
{
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}
if(!(storageAdapter instanceof SegmentIdAttachedStorageAdapter) ||
!(((SegmentIdAttachedStorageAdapter)storageAdapter).getDelegate() instanceof BaseStorageAdapter)) {
return Sequences.empty();
}
final BaseStorageAdapter adapter = (BaseStorageAdapter)
((SegmentIdAttachedStorageAdapter) storageAdapter).getDelegate();
Function<String, SegmentMetadataResultValue.Dimension> sizeDimension = new Function<String, SegmentMetadataResultValue.Dimension>()
{
@Override
public SegmentMetadataResultValue.Dimension apply(@Nullable String input)
{
long size = 0;
final Indexed<String> lookup = adapter.getDimValueLookup(input);
for (String dimVal : lookup) {
ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal);
size += index.size() * Charsets.UTF_8.encode(dimVal).capacity();
}
return new SegmentMetadataResultValue.Dimension(
size,
adapter.getDimensionCardinality(input)
);
}
};
// TODO: add metric storage size
long totalSize = 0;
HashMap<String, SegmentMetadataResultValue.Dimension> dimensions = Maps.newHashMap();
for(String input : adapter.getAvailableDimensions()) {
SegmentMetadataResultValue.Dimension d = sizeDimension.apply(input);
dimensions.put(input, d);
totalSize += d.size;
}
return new SimpleSequence<Result<SegmentMetadataResultValue>>(
ImmutableList.of(
new Result<SegmentMetadataResultValue>(
adapter.getMinTime(),
new SegmentMetadataResultValue(
storageAdapter.getSegmentIdentifier(),
dimensions,
ImmutableMap.<String, SegmentMetadataResultValue.Metric>of(),
totalSize
)
)
)
);
}
}

View File

@ -0,0 +1,128 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.ISE;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryEngine;
import com.metamx.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import com.metamx.druid.result.SegmentMetadataResultValue;
import com.metamx.druid.result.Result;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
{
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest()
{
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(QueryRunner<Result<SegmentMetadataResultValue>> runner)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
}
};
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> createRunner(final StorageAdapter adapter)
{
return new QueryRunner<Result<SegmentMetadataResultValue>>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> run(Query<Result<SegmentMetadataResultValue>> query)
{
if (!(query instanceof SegmentMetadataQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), SegmentMetadataQuery.class);
}
return new SegmentMetadataQueryEngine().process((SegmentMetadataQuery) query, adapter);
}
};
}
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> mergeRunners(
final ExecutorService queryExecutor, Iterable<QueryRunner<Result<SegmentMetadataResultValue>>> queryRunners
)
{
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<Result<SegmentMetadataResultValue>>, QueryRunner<Result<SegmentMetadataResultValue>>>()
{
@Override
public QueryRunner<Result<SegmentMetadataResultValue>> apply(final QueryRunner<Result<SegmentMetadataResultValue>> input)
{
return new QueryRunner<Result<SegmentMetadataResultValue>>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> run(final Query<Result<SegmentMetadataResultValue>> query)
{
Future<Sequence<Result<SegmentMetadataResultValue>>> future = queryExecutor.submit(
new Callable<Sequence<Result<SegmentMetadataResultValue>>>()
{
@Override
public Sequence<Result<SegmentMetadataResultValue>> call() throws Exception
{
return new ExecutorExecutingSequence<Result<SegmentMetadataResultValue>>(
input.run(query),
queryExecutor
);
}
}
);
try {
return future.get();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}
};
}
}
)
);
}
@Override
public QueryToolChest getToolchest()
{
return toolChest;
}
}