mirror of https://github.com/apache/druid.git
segment metadata query
This commit is contained in:
parent
f7a6ac31e3
commit
bfc9b63931
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.metadata;
|
||||
|
||||
import com.metamx.druid.BaseQuery;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.query.segment.QuerySegmentSpec;
|
||||
import com.metamx.druid.result.Result;
|
||||
import com.metamx.druid.result.SegmentMetadataResultValue;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class SegmentMetadataQuery extends BaseQuery<Result<SegmentMetadataResultValue>>
|
||||
{
|
||||
|
||||
public SegmentMetadataQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
)
|
||||
{
|
||||
super(
|
||||
dataSource,
|
||||
querySegmentSpec,
|
||||
context
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasFilters()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "segmentMetadata";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query<Result<SegmentMetadataResultValue>> withOverriddenContext(Map<String, String> contextOverride)
|
||||
{
|
||||
return new SegmentMetadataQuery(
|
||||
getDataSource(),
|
||||
getQuerySegmentSpec(),
|
||||
computeOverridenContext(contextOverride)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query<Result<SegmentMetadataResultValue>> withQuerySegmentSpec(QuerySegmentSpec spec)
|
||||
{
|
||||
return new SegmentMetadataQuery(
|
||||
getDataSource(),
|
||||
spec,
|
||||
getContext()
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.metadata;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.metamx.common.guava.ConcatSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.druid.query.CacheStrategy;
|
||||
import com.metamx.druid.query.ConcatQueryRunner;
|
||||
import com.metamx.druid.query.MetricManipulationFn;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
import com.metamx.druid.query.QueryToolChest;
|
||||
import com.metamx.druid.result.Result;
|
||||
import com.metamx.druid.result.SegmentMetadataResultValue;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import org.codehaus.jackson.type.TypeReference;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Minutes;
|
||||
|
||||
|
||||
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
|
||||
{
|
||||
|
||||
private static final TypeReference<Result<SegmentMetadataResultValue>> TYPE_REFERENCE = new TypeReference<Result<SegmentMetadataResultValue>>(){};
|
||||
|
||||
@Override
|
||||
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(final QueryRunner<Result<SegmentMetadataResultValue>> runner)
|
||||
{
|
||||
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query)
|
||||
{
|
||||
int numMinutes = 0;
|
||||
for (Interval interval : query.getIntervals()) {
|
||||
numMinutes += Minutes.minutesIn(interval).getMinutes();
|
||||
}
|
||||
|
||||
return new ServiceMetricEvent.Builder()
|
||||
.setUser2(query.getDataSource())
|
||||
.setUser4(query.getType())
|
||||
.setUser5(Joiner.on(",").join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser9(Minutes.minutes(numMinutes).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Result<SegmentMetadataResultValue>> mergeSequences(Sequence<Sequence<Result<SegmentMetadataResultValue>>> seqOfSequences)
|
||||
{
|
||||
return new ConcatSequence<Result<SegmentMetadataResultValue>>(seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<Result<SegmentMetadataResultValue>, Result<SegmentMetadataResultValue>> makeMetricManipulatorFn(
|
||||
SegmentMetadataQuery query, MetricManipulationFn fn
|
||||
)
|
||||
{
|
||||
return Functions.identity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeReference<Result<SegmentMetadataResultValue>> getResultTypeReference()
|
||||
{
|
||||
return TYPE_REFERENCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStrategy<Result<SegmentMetadataResultValue>, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryRunner<Result<SegmentMetadataResultValue>> preMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
|
||||
{
|
||||
return runner;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryRunner<Result<SegmentMetadataResultValue>> postMergeQueryDecoration(QueryRunner<Result<SegmentMetadataResultValue>> runner)
|
||||
{
|
||||
return runner;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.result;
|
||||
|
||||
import org.codehaus.jackson.annotate.JsonCreator;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class SegmentMetadataResultValue
|
||||
{
|
||||
public static class Dimension {
|
||||
@JsonProperty public long size;
|
||||
@JsonProperty public int cardinality;
|
||||
|
||||
@JsonCreator
|
||||
public Dimension(
|
||||
@JsonProperty("size") long size,
|
||||
@JsonProperty("cardinality") int cardinality
|
||||
)
|
||||
{
|
||||
this.size = size;
|
||||
this.cardinality = cardinality;
|
||||
}
|
||||
}
|
||||
public static class Metric {
|
||||
@JsonProperty public String type;
|
||||
@JsonProperty public long size;
|
||||
|
||||
@JsonCreator
|
||||
public Metric(
|
||||
@JsonProperty("type") String type,
|
||||
@JsonProperty("size") long size
|
||||
)
|
||||
{
|
||||
this.type = type;
|
||||
this.size = size;
|
||||
}
|
||||
}
|
||||
|
||||
private final String id;
|
||||
private final Map<String, Dimension> dimensions;
|
||||
private final Map<String, Metric> metrics;
|
||||
private final long size;
|
||||
|
||||
@JsonCreator
|
||||
public SegmentMetadataResultValue(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("dimensions") Map<String, Dimension> dimensions,
|
||||
@JsonProperty("metrics") Map<String, Metric> metrics,
|
||||
@JsonProperty("size") long size
|
||||
|
||||
)
|
||||
{
|
||||
this.id = id;
|
||||
this.dimensions = dimensions;
|
||||
this.metrics = metrics;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getId()
|
||||
{
|
||||
return id;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, Dimension> getDimensions()
|
||||
{
|
||||
return dimensions;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, Metric> getMetrics()
|
||||
{
|
||||
return metrics;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getSize()
|
||||
{
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.metadata;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.SimpleSequence;
|
||||
import com.metamx.druid.BaseStorageAdapter;
|
||||
import com.metamx.druid.StorageAdapter;
|
||||
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
|
||||
import com.metamx.druid.kv.Indexed;
|
||||
import com.metamx.druid.result.Result;
|
||||
import com.metamx.druid.result.SegmentMetadataResultValue;
|
||||
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
|
||||
public class SegmentMetadataQueryEngine
|
||||
{
|
||||
public Sequence<Result<SegmentMetadataResultValue>> process(
|
||||
final SegmentMetadataQuery query,
|
||||
StorageAdapter storageAdapter
|
||||
)
|
||||
{
|
||||
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
|
||||
if (intervals.size() != 1) {
|
||||
throw new IAE("Should only have one interval, got[%s]", intervals);
|
||||
}
|
||||
|
||||
if(!(storageAdapter instanceof SegmentIdAttachedStorageAdapter) ||
|
||||
!(((SegmentIdAttachedStorageAdapter)storageAdapter).getDelegate() instanceof BaseStorageAdapter)) {
|
||||
return Sequences.empty();
|
||||
}
|
||||
|
||||
final BaseStorageAdapter adapter = (BaseStorageAdapter)
|
||||
((SegmentIdAttachedStorageAdapter) storageAdapter).getDelegate();
|
||||
|
||||
Function<String, SegmentMetadataResultValue.Dimension> sizeDimension = new Function<String, SegmentMetadataResultValue.Dimension>()
|
||||
{
|
||||
@Override
|
||||
public SegmentMetadataResultValue.Dimension apply(@Nullable String input)
|
||||
{
|
||||
long size = 0;
|
||||
final Indexed<String> lookup = adapter.getDimValueLookup(input);
|
||||
for (String dimVal : lookup) {
|
||||
ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal);
|
||||
size += index.size() * Charsets.UTF_8.encode(dimVal).capacity();
|
||||
}
|
||||
return new SegmentMetadataResultValue.Dimension(
|
||||
size,
|
||||
adapter.getDimensionCardinality(input)
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: add metric storage size
|
||||
|
||||
long totalSize = 0;
|
||||
|
||||
HashMap<String, SegmentMetadataResultValue.Dimension> dimensions = Maps.newHashMap();
|
||||
for(String input : adapter.getAvailableDimensions()) {
|
||||
SegmentMetadataResultValue.Dimension d = sizeDimension.apply(input);
|
||||
dimensions.put(input, d);
|
||||
totalSize += d.size;
|
||||
}
|
||||
|
||||
return new SimpleSequence<Result<SegmentMetadataResultValue>>(
|
||||
ImmutableList.of(
|
||||
new Result<SegmentMetadataResultValue>(
|
||||
adapter.getMinTime(),
|
||||
new SegmentMetadataResultValue(
|
||||
storageAdapter.getSegmentIdentifier(),
|
||||
dimensions,
|
||||
ImmutableMap.<String, SegmentMetadataResultValue.Metric>of(),
|
||||
totalSize
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.metadata;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.ExecutorExecutingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.StorageAdapter;
|
||||
import com.metamx.druid.query.ConcatQueryRunner;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
import com.metamx.druid.query.QueryRunnerFactory;
|
||||
import com.metamx.druid.query.QueryToolChest;
|
||||
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
|
||||
import com.metamx.druid.query.metadata.SegmentMetadataQueryEngine;
|
||||
import com.metamx.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
|
||||
import com.metamx.druid.result.SegmentMetadataResultValue;
|
||||
import com.metamx.druid.result.Result;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Result<SegmentMetadataResultValue>, SegmentMetadataQuery>
|
||||
{
|
||||
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest()
|
||||
{
|
||||
@Override
|
||||
public QueryRunner<Result<SegmentMetadataResultValue>> mergeResults(QueryRunner<Result<SegmentMetadataResultValue>> runner)
|
||||
{
|
||||
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(Sequences.simple(ImmutableList.of(runner)));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@Override
|
||||
public QueryRunner<Result<SegmentMetadataResultValue>> createRunner(final StorageAdapter adapter)
|
||||
{
|
||||
return new QueryRunner<Result<SegmentMetadataResultValue>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Result<SegmentMetadataResultValue>> run(Query<Result<SegmentMetadataResultValue>> query)
|
||||
{
|
||||
if (!(query instanceof SegmentMetadataQuery)) {
|
||||
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), SegmentMetadataQuery.class);
|
||||
}
|
||||
return new SegmentMetadataQueryEngine().process((SegmentMetadataQuery) query, adapter);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryRunner<Result<SegmentMetadataResultValue>> mergeRunners(
|
||||
final ExecutorService queryExecutor, Iterable<QueryRunner<Result<SegmentMetadataResultValue>>> queryRunners
|
||||
)
|
||||
{
|
||||
return new ConcatQueryRunner<Result<SegmentMetadataResultValue>>(
|
||||
Sequences.map(
|
||||
Sequences.simple(queryRunners),
|
||||
new Function<QueryRunner<Result<SegmentMetadataResultValue>>, QueryRunner<Result<SegmentMetadataResultValue>>>()
|
||||
{
|
||||
@Override
|
||||
public QueryRunner<Result<SegmentMetadataResultValue>> apply(final QueryRunner<Result<SegmentMetadataResultValue>> input)
|
||||
{
|
||||
return new QueryRunner<Result<SegmentMetadataResultValue>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Result<SegmentMetadataResultValue>> run(final Query<Result<SegmentMetadataResultValue>> query)
|
||||
{
|
||||
|
||||
Future<Sequence<Result<SegmentMetadataResultValue>>> future = queryExecutor.submit(
|
||||
new Callable<Sequence<Result<SegmentMetadataResultValue>>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Result<SegmentMetadataResultValue>> call() throws Exception
|
||||
{
|
||||
return new ExecutorExecutingSequence<Result<SegmentMetadataResultValue>>(
|
||||
input.run(query),
|
||||
queryExecutor
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
try {
|
||||
return future.get();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryToolChest getToolchest()
|
||||
{
|
||||
return toolChest;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue