mirror of https://github.com/apache/druid.git
remove unnecessary changes & fix index closing subquery
This commit is contained in:
parent
a9c09ec8a7
commit
0629be622c
|
@ -28,7 +28,6 @@ import com.google.common.collect.Multimaps;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.metrics.MonitorScheduler;
|
import com.metamx.metrics.MonitorScheduler;
|
||||||
import io.druid.client.FilteredServerView;
|
import io.druid.client.FilteredServerView;
|
||||||
import io.druid.collections.StupidPool;
|
|
||||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||||
import io.druid.indexing.common.actions.TaskActionClient;
|
import io.druid.indexing.common.actions.TaskActionClient;
|
||||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||||
|
@ -47,7 +46,6 @@ import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -212,5 +210,4 @@ public class TaskToolbox
|
||||||
{
|
{
|
||||||
return taskWorkDir;
|
return taskWorkDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,8 +24,6 @@ import com.google.inject.Inject;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.metrics.MonitorScheduler;
|
import com.metamx.metrics.MonitorScheduler;
|
||||||
import io.druid.client.FilteredServerView;
|
import io.druid.client.FilteredServerView;
|
||||||
import io.druid.collections.StupidPool;
|
|
||||||
import io.druid.guice.annotations.Global;
|
|
||||||
import io.druid.guice.annotations.Processing;
|
import io.druid.guice.annotations.Processing;
|
||||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||||
import io.druid.indexing.common.config.TaskConfig;
|
import io.druid.indexing.common.config.TaskConfig;
|
||||||
|
@ -38,7 +36,6 @@ import io.druid.segment.loading.DataSegmentPusher;
|
||||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,9 +73,7 @@ public class TaskToolboxFactory
|
||||||
@Processing ExecutorService queryExecutorService,
|
@Processing ExecutorService queryExecutorService,
|
||||||
MonitorScheduler monitorScheduler,
|
MonitorScheduler monitorScheduler,
|
||||||
SegmentLoaderFactory segmentLoaderFactory,
|
SegmentLoaderFactory segmentLoaderFactory,
|
||||||
ObjectMapper objectMapper,
|
ObjectMapper objectMapper
|
||||||
//TODO: have a separate index pool
|
|
||||||
@Global StupidPool<ByteBuffer> bufferPool
|
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
|
@ -31,16 +31,13 @@ import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import io.druid.collections.StupidPool;
|
|
||||||
import io.druid.data.input.InputRow;
|
import io.druid.data.input.InputRow;
|
||||||
import io.druid.guice.annotations.Global;
|
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
import io.druid.segment.IndexIO;
|
import io.druid.segment.IndexIO;
|
||||||
import io.druid.segment.IndexMerger;
|
import io.druid.segment.IndexMerger;
|
||||||
import io.druid.segment.QueryableIndex;
|
import io.druid.segment.QueryableIndex;
|
||||||
import io.druid.segment.SegmentUtils;
|
import io.druid.segment.SegmentUtils;
|
||||||
import io.druid.segment.incremental.IncrementalIndex;
|
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||||
import io.druid.segment.loading.DataSegmentPusher;
|
import io.druid.segment.loading.DataSegmentPusher;
|
||||||
|
@ -52,11 +49,9 @@ import io.druid.segment.realtime.plumber.Sink;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import sun.misc.JavaNioAccess;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -216,13 +211,14 @@ public class YeOldePlumberSchool implements PlumberSchool
|
||||||
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
|
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final IncrementalIndex index = indexToPersist.getIndex();
|
|
||||||
IndexMerger.persist(
|
IndexMerger.persist(
|
||||||
indexToPersist.getIndex(),
|
indexToPersist.getIndex(),
|
||||||
dirToPersist
|
dirToPersist
|
||||||
);
|
);
|
||||||
|
|
||||||
indexToPersist.swapSegment(null);
|
indexToPersist.swapSegment(null);
|
||||||
|
|
||||||
metrics.incrementRowOutputCount(rowsToPersist);
|
metrics.incrementRowOutputCount(rowsToPersist);
|
||||||
|
|
||||||
spilled.add(dirToPersist);
|
spilled.add(dirToPersist);
|
||||||
|
|
|
@ -43,7 +43,6 @@ import io.druid.data.input.InputRow;
|
||||||
import io.druid.data.input.MapBasedInputRow;
|
import io.druid.data.input.MapBasedInputRow;
|
||||||
import io.druid.data.input.impl.InputRowParser;
|
import io.druid.data.input.impl.InputRowParser;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.offheap.OffheapBufferPool;
|
|
||||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||||
import io.druid.indexing.common.SegmentLoaderFactory;
|
import io.druid.indexing.common.SegmentLoaderFactory;
|
||||||
import io.druid.indexing.common.TaskLock;
|
import io.druid.indexing.common.TaskLock;
|
||||||
|
@ -206,8 +205,7 @@ public class TaskLifecycleTest
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new DefaultObjectMapper(),
|
new DefaultObjectMapper()
|
||||||
new OffheapBufferPool(1024 * 1024)
|
|
||||||
);
|
);
|
||||||
tr = new ThreadPoolTaskRunner(tb);
|
tr = new ThreadPoolTaskRunner(tb);
|
||||||
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
|
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
|
||||||
|
|
|
@ -38,7 +38,6 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
|
||||||
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
|
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
|
||||||
import io.druid.indexing.worker.config.WorkerConfig;
|
import io.druid.indexing.worker.config.WorkerConfig;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.offheap.OffheapBufferPool;
|
|
||||||
import io.druid.segment.loading.DataSegmentPuller;
|
import io.druid.segment.loading.DataSegmentPuller;
|
||||||
import io.druid.segment.loading.LocalDataSegmentPuller;
|
import io.druid.segment.loading.LocalDataSegmentPuller;
|
||||||
import io.druid.segment.loading.OmniSegmentLoader;
|
import io.druid.segment.loading.OmniSegmentLoader;
|
||||||
|
@ -139,8 +138,7 @@ public class WorkerTaskMonitorTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
), jsonMapper,
|
), jsonMapper
|
||||||
new OffheapBufferPool(1024 * 1024)
|
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new WorkerConfig().setCapacity(1)
|
new WorkerConfig().setCapacity(1)
|
||||||
|
|
|
@ -1,195 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.druid.query;
|
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.base.Predicates;
|
|
||||||
import com.google.common.base.Supplier;
|
|
||||||
import com.google.common.base.Throwables;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Ordering;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import com.metamx.common.Pair;
|
|
||||||
import com.metamx.common.guava.Accumulator;
|
|
||||||
import com.metamx.common.guava.ResourceClosingSequence;
|
|
||||||
import com.metamx.common.guava.Sequence;
|
|
||||||
import com.metamx.common.guava.Sequences;
|
|
||||||
import com.metamx.common.logger.Logger;
|
|
||||||
import io.druid.collections.StupidPool;
|
|
||||||
import io.druid.data.input.Row;
|
|
||||||
import io.druid.query.groupby.GroupByQuery;
|
|
||||||
import io.druid.query.groupby.GroupByQueryConfig;
|
|
||||||
import io.druid.query.groupby.GroupByQueryHelper;
|
|
||||||
import io.druid.segment.incremental.IncrementalIndex;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CancellationException;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
|
|
||||||
public class GroupByParallelQueryRunner implements QueryRunner<Row>
|
|
||||||
{
|
|
||||||
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
|
|
||||||
private final Iterable<QueryRunner<Row>> queryables;
|
|
||||||
private final ListeningExecutorService exec;
|
|
||||||
private final Ordering<Row> ordering;
|
|
||||||
private final Supplier<GroupByQueryConfig> configSupplier;
|
|
||||||
<<<<<<< HEAD
|
|
||||||
private final StupidPool<ByteBuffer> bufferPool;
|
|
||||||
=======
|
|
||||||
private final QueryWatcher queryWatcher;
|
|
||||||
>>>>>>> master
|
|
||||||
|
|
||||||
|
|
||||||
public GroupByParallelQueryRunner(
|
|
||||||
ExecutorService exec,
|
|
||||||
Ordering<Row> ordering,
|
|
||||||
Supplier<GroupByQueryConfig> configSupplier,
|
|
||||||
<<<<<<< HEAD
|
|
||||||
StupidPool<ByteBuffer> bufferPool,
|
|
||||||
QueryRunner<Row>... queryables
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this(exec, ordering, configSupplier, bufferPool, Arrays.asList(queryables));
|
|
||||||
=======
|
|
||||||
QueryWatcher queryWatcher,
|
|
||||||
QueryRunner<Row>... queryables
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this(exec, ordering, configSupplier, queryWatcher, Arrays.asList(queryables));
|
|
||||||
>>>>>>> master
|
|
||||||
}
|
|
||||||
|
|
||||||
public GroupByParallelQueryRunner(
|
|
||||||
ExecutorService exec,
|
|
||||||
<<<<<<< HEAD
|
|
||||||
Ordering<Row> ordering,
|
|
||||||
Supplier<GroupByQueryConfig> configSupplier,
|
|
||||||
StupidPool<ByteBuffer> bufferPool,
|
|
||||||
=======
|
|
||||||
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
|
|
||||||
QueryWatcher queryWatcher,
|
|
||||||
>>>>>>> master
|
|
||||||
Iterable<QueryRunner<Row>> queryables
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.exec = MoreExecutors.listeningDecorator(exec);
|
|
||||||
this.ordering = ordering;
|
|
||||||
this.queryWatcher = queryWatcher;
|
|
||||||
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
|
|
||||||
this.configSupplier = configSupplier;
|
|
||||||
this.bufferPool = bufferPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Sequence<Row> run(final Query<Row> queryParam)
|
|
||||||
{
|
|
||||||
|
|
||||||
final GroupByQuery query = (GroupByQuery) queryParam;
|
|
||||||
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
|
|
||||||
query,
|
|
||||||
configSupplier.get(),
|
|
||||||
bufferPool
|
|
||||||
);
|
|
||||||
final int priority = query.getContextPriority(0);
|
|
||||||
|
|
||||||
if (Iterables.isEmpty(queryables)) {
|
|
||||||
log.warn("No queryables found.");
|
|
||||||
}
|
|
||||||
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
|
|
||||||
Lists.newArrayList(
|
|
||||||
Iterables.transform(
|
|
||||||
queryables,
|
|
||||||
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
|
|
||||||
{
|
|
||||||
return exec.submit(
|
|
||||||
new AbstractPrioritizedCallable<Boolean>(priority)
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws Exception
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
catch (QueryInterruptedException e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
log.error(e, "Exception with one of the sequences!");
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Let the runners complete
|
|
||||||
try {
|
|
||||||
queryWatcher.registerQuery(query, futures);
|
|
||||||
final Number timeout = query.getContextValue("timeout", (Number) null);
|
|
||||||
if(timeout == null) {
|
|
||||||
futures.get();
|
|
||||||
} else {
|
|
||||||
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
|
|
||||||
futures.cancel(true);
|
|
||||||
throw new QueryInterruptedException("Query interrupted");
|
|
||||||
}
|
|
||||||
catch(CancellationException e) {
|
|
||||||
throw new QueryInterruptedException("Query cancelled");
|
|
||||||
}
|
|
||||||
catch(TimeoutException e) {
|
|
||||||
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
|
|
||||||
futures.cancel(true);
|
|
||||||
throw new QueryInterruptedException("Query timeout");
|
|
||||||
}
|
|
||||||
catch (ExecutionException e) {
|
|
||||||
throw Throwables.propagate(e.getCause());
|
|
||||||
}
|
|
||||||
|
|
||||||
return new ResourceClosingSequence<Row>(
|
|
||||||
Sequences.simple(
|
|
||||||
indexAccumulatorPair.lhs
|
|
||||||
.iterableWithPostAggregations(null)
|
|
||||||
), indexAccumulatorPair.lhs
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -103,9 +103,9 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
|
|
||||||
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
|
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
|
||||||
{
|
{
|
||||||
Sequence<Row> result;
|
|
||||||
// If there's a subquery, merge subquery results and then apply the aggregator
|
// If there's a subquery, merge subquery results and then apply the aggregator
|
||||||
DataSource dataSource = query.getDataSource();
|
DataSource dataSource = query.getDataSource();
|
||||||
|
final IncrementalIndex index;
|
||||||
if (dataSource instanceof QueryDataSource) {
|
if (dataSource instanceof QueryDataSource) {
|
||||||
GroupByQuery subquery;
|
GroupByQuery subquery;
|
||||||
try {
|
try {
|
||||||
|
@ -115,13 +115,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
|
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
|
||||||
}
|
}
|
||||||
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
|
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
|
||||||
IncrementalIndexStorageAdapter adapter
|
final IncrementalIndex subQueryResultIndex = makeIncrementalIndex(subquery, subqueryResult);
|
||||||
= new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult));
|
|
||||||
result = engine.process(query, adapter);
|
Sequence<Row> result = engine.process(query, new IncrementalIndexStorageAdapter(subQueryResultIndex));
|
||||||
|
index = makeIncrementalIndex(query, result);
|
||||||
|
subQueryResultIndex.close();
|
||||||
} else {
|
} else {
|
||||||
result = runner.run(query);
|
index = makeIncrementalIndex(query, runner.run(query));
|
||||||
|
|
||||||
}
|
}
|
||||||
final IncrementalIndex index = makeIncrementalIndex(query, result);
|
|
||||||
return new ResourceClosingSequence<Row>(postAggregate(query, index), index);
|
return new ResourceClosingSequence<Row>(postAggregate(query, index), index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,7 @@ public class GreaterThanHavingSpec implements HavingSpec
|
||||||
public boolean eval(Row row)
|
public boolean eval(Row row)
|
||||||
{
|
{
|
||||||
float metricValue = row.getFloatMetric(aggregationName);
|
float metricValue = row.getFloatMetric(aggregationName);
|
||||||
|
|
||||||
return Float.compare(metricValue, value.floatValue()) > 0;
|
return Float.compare(metricValue, value.floatValue()) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,93 +0,0 @@
|
||||||
package io.druid.query.aggregation.hyperloglog;
|
|
||||||
|
|
||||||
import com.google.common.hash.HashFunction;
|
|
||||||
import com.google.common.hash.Hashing;
|
|
||||||
import io.druid.segment.ObjectColumnSelector;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Created with IntelliJ IDEA.
|
|
||||||
* User: neo
|
|
||||||
* Date: 05/06/14
|
|
||||||
* Time: 3:14 PM
|
|
||||||
* To change this template use File | Settings | File Templates.
|
|
||||||
*/
|
|
||||||
public class HyperUniqueBufferAggregatorTest
|
|
||||||
{
|
|
||||||
private final HashFunction fn = Hashing.murmur3_128();
|
|
||||||
private volatile HyperLogLogCollector collector;
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAggregation()
|
|
||||||
{
|
|
||||||
final HyperUniquesBufferAggregator agg = new HyperUniquesBufferAggregator(
|
|
||||||
new ObjectColumnSelector()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Class classOfObject()
|
|
||||||
{
|
|
||||||
return HyperLogLogCollector.class;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object get()
|
|
||||||
{
|
|
||||||
return collector;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
|
||||||
|
|
||||||
for (int i = 0; i < 1000; i++) {
|
|
||||||
collector = HyperLogLogCollector.makeLatestCollector();
|
|
||||||
collector.add(fn.hashInt(i).asBytes());
|
|
||||||
agg.aggregate(byteBuffer, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
|
|
||||||
((HyperLogLogCollector) agg.get(
|
|
||||||
byteBuffer,
|
|
||||||
0
|
|
||||||
)).toByteBuffer()
|
|
||||||
);
|
|
||||||
System.out.println(collector.estimateCardinality());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAggregation2()
|
|
||||||
{
|
|
||||||
final HyperUniquesAggregator agg = new HyperUniquesAggregator(
|
|
||||||
"abc",
|
|
||||||
new ObjectColumnSelector()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Class classOfObject()
|
|
||||||
{
|
|
||||||
return HyperLogLogCollector.class;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object get()
|
|
||||||
{
|
|
||||||
return collector;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
for (int i = 0; i < 1000; i++) {
|
|
||||||
collector = HyperLogLogCollector.makeLatestCollector();
|
|
||||||
collector.add(fn.hashInt(i).asBytes());
|
|
||||||
agg.aggregate();
|
|
||||||
}
|
|
||||||
|
|
||||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
|
|
||||||
((HyperLogLogCollector) agg.get(
|
|
||||||
)).toByteBuffer()
|
|
||||||
);
|
|
||||||
System.out.println(collector.estimateCardinality());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -25,7 +25,6 @@ import com.metamx.common.Granularity;
|
||||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import io.druid.collections.StupidPool;
|
|
||||||
import io.druid.common.guava.ThreadRenamingCallable;
|
import io.druid.common.guava.ThreadRenamingCallable;
|
||||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
|
@ -35,7 +34,6 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
|
@ -25,8 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import io.druid.collections.StupidPool;
|
|
||||||
import io.druid.guice.annotations.Global;
|
|
||||||
import io.druid.guice.annotations.Processing;
|
import io.druid.guice.annotations.Processing;
|
||||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
|
@ -37,7 +35,6 @@ import org.joda.time.Duration;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -718,7 +718,6 @@ public class RealtimePlumber implements Plumber
|
||||||
indexToPersist.getIndex(),
|
indexToPersist.getIndex(),
|
||||||
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
|
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
|
||||||
);
|
);
|
||||||
IncrementalIndex index = indexToPersist.getIndex();
|
|
||||||
indexToPersist.swapSegment(
|
indexToPersist.swapSegment(
|
||||||
new QueryableIndexSegment(
|
new QueryableIndexSegment(
|
||||||
indexToPersist.getSegment().getIdentifier(),
|
indexToPersist.getSegment().getIdentifier(),
|
||||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import io.druid.client.FilteredServerView;
|
import io.druid.client.FilteredServerView;
|
||||||
|
import io.druid.client.ServerView;
|
||||||
import io.druid.guice.annotations.Processing;
|
import io.druid.guice.annotations.Processing;
|
||||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
|
@ -50,6 +51,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
private final SegmentPublisher segmentPublisher;
|
private final SegmentPublisher segmentPublisher;
|
||||||
private final FilteredServerView serverView;
|
private final FilteredServerView serverView;
|
||||||
private final ExecutorService queryExecutorService;
|
private final ExecutorService queryExecutorService;
|
||||||
|
|
||||||
// Backwards compatible
|
// Backwards compatible
|
||||||
private final Period windowPeriod;
|
private final Period windowPeriod;
|
||||||
private final File basePersistDirectory;
|
private final File basePersistDirectory;
|
||||||
|
@ -160,5 +162,4 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
|
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
|
||||||
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
|
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,12 +27,11 @@ import io.druid.data.input.impl.StringInputRowParser;
|
||||||
import io.druid.data.input.impl.TimestampSpec;
|
import io.druid.data.input.impl.TimestampSpec;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.query.TestQueryRunners;
|
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
|
||||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||||
|
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||||
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
|
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
|
@ -31,12 +31,11 @@ import io.druid.data.input.impl.InputRowParser;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
import io.druid.query.TestQueryRunners;
|
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
|
||||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||||
|
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||||
import io.druid.segment.realtime.plumber.Plumber;
|
import io.druid.segment.realtime.plumber.Plumber;
|
||||||
import io.druid.segment.realtime.plumber.PlumberSchool;
|
import io.druid.segment.realtime.plumber.PlumberSchool;
|
||||||
|
|
|
@ -31,16 +31,15 @@ import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import io.druid.client.FilteredServerView;
|
import io.druid.client.FilteredServerView;
|
||||||
import io.druid.client.ServerView;
|
import io.druid.client.ServerView;
|
||||||
import io.druid.data.input.InputRow;
|
import io.druid.data.input.InputRow;
|
||||||
import io.druid.data.input.impl.ParseSpec;
|
|
||||||
import io.druid.data.input.impl.DimensionsSpec;
|
import io.druid.data.input.impl.DimensionsSpec;
|
||||||
import io.druid.data.input.impl.InputRowParser;
|
import io.druid.data.input.impl.InputRowParser;
|
||||||
import io.druid.data.input.impl.JSONParseSpec;
|
import io.druid.data.input.impl.JSONParseSpec;
|
||||||
|
import io.druid.data.input.impl.ParseSpec;
|
||||||
import io.druid.data.input.impl.TimestampSpec;
|
import io.druid.data.input.impl.TimestampSpec;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunnerFactory;
|
import io.druid.query.QueryRunnerFactory;
|
||||||
import io.druid.query.TestQueryRunners;
|
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
import io.druid.data.input.InputRow;
|
import io.druid.data.input.InputRow;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.query.TestQueryRunners;
|
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
|
|
Loading…
Reference in New Issue