mirror of https://github.com/apache/druid.git
Merge pull request #2542 from guobingkun/fix_multiple_specs
[Fix Bug] Realtime Node may double count query results when multiple spec files are specified.
This commit is contained in:
commit
55c9320671
|
@ -53,7 +53,7 @@ import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -67,9 +67,9 @@ public class RealtimeManager implements QuerySegmentWalker
|
||||||
private final QueryRunnerFactoryConglomerate conglomerate;
|
private final QueryRunnerFactoryConglomerate conglomerate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* key=data source name,value=FireChiefs of all partition of that data source
|
* key=data source name,value=mappings of partition number to FireChief
|
||||||
*/
|
*/
|
||||||
private final Map<String, List<FireChief>> chiefs;
|
private final Map<String, Map<Integer, FireChief>> chiefs;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public RealtimeManager(
|
public RealtimeManager(
|
||||||
|
@ -83,19 +83,30 @@ public class RealtimeManager implements QuerySegmentWalker
|
||||||
this.chiefs = Maps.newHashMap();
|
this.chiefs = Maps.newHashMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RealtimeManager(
|
||||||
|
List<FireDepartment> fireDepartments,
|
||||||
|
QueryRunnerFactoryConglomerate conglomerate,
|
||||||
|
Map<String, Map<Integer, FireChief>> chiefs
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.fireDepartments = fireDepartments;
|
||||||
|
this.conglomerate = conglomerate;
|
||||||
|
this.chiefs = chiefs;
|
||||||
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
@LifecycleStart
|
||||||
public void start() throws IOException
|
public void start() throws IOException
|
||||||
{
|
{
|
||||||
for (final FireDepartment fireDepartment : fireDepartments) {
|
for (final FireDepartment fireDepartment : fireDepartments) {
|
||||||
DataSchema schema = fireDepartment.getDataSchema();
|
final DataSchema schema = fireDepartment.getDataSchema();
|
||||||
|
|
||||||
final FireChief chief = new FireChief(fireDepartment);
|
final FireChief chief = new FireChief(fireDepartment, conglomerate);
|
||||||
List<FireChief> chiefs = this.chiefs.get(schema.getDataSource());
|
Map<Integer, FireChief> partitionChiefs = chiefs.get(schema.getDataSource());
|
||||||
if (chiefs == null) {
|
if (partitionChiefs == null) {
|
||||||
chiefs = new ArrayList<FireChief>();
|
partitionChiefs = new HashMap<>();
|
||||||
this.chiefs.put(schema.getDataSource(), chiefs);
|
chiefs.put(schema.getDataSource(), partitionChiefs);
|
||||||
}
|
}
|
||||||
chiefs.add(chief);
|
partitionChiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
|
||||||
|
|
||||||
chief.setName(
|
chief.setName(
|
||||||
String.format(
|
String.format(
|
||||||
|
@ -112,8 +123,8 @@ public class RealtimeManager implements QuerySegmentWalker
|
||||||
@LifecycleStop
|
@LifecycleStop
|
||||||
public void stop()
|
public void stop()
|
||||||
{
|
{
|
||||||
for (Iterable<FireChief> chiefs : this.chiefs.values()) {
|
for (Map<Integer, FireChief> chiefs : this.chiefs.values()) {
|
||||||
for (FireChief chief : chiefs) {
|
for (FireChief chief : chiefs.values()) {
|
||||||
CloseQuietly.close(chief);
|
CloseQuietly.close(chief);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,12 +132,12 @@ public class RealtimeManager implements QuerySegmentWalker
|
||||||
|
|
||||||
public FireDepartmentMetrics getMetrics(String datasource)
|
public FireDepartmentMetrics getMetrics(String datasource)
|
||||||
{
|
{
|
||||||
List<FireChief> chiefs = this.chiefs.get(datasource);
|
Map<Integer, FireChief> chiefs = this.chiefs.get(datasource);
|
||||||
if (chiefs == null) {
|
if (chiefs == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
FireDepartmentMetrics snapshot = null;
|
FireDepartmentMetrics snapshot = null;
|
||||||
for (FireChief chief : chiefs) {
|
for (FireChief chief : chiefs.values()) {
|
||||||
if (snapshot == null) {
|
if (snapshot == null) {
|
||||||
snapshot = chief.getMetrics().snapshot();
|
snapshot = chief.getMetrics().snapshot();
|
||||||
} else {
|
} else {
|
||||||
|
@ -138,27 +149,22 @@ public class RealtimeManager implements QuerySegmentWalker
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
|
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
|
||||||
{
|
|
||||||
return getQueryRunnerForSegments(query, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
|
|
||||||
{
|
{
|
||||||
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
||||||
|
final Map<Integer, FireChief> partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource()
|
||||||
|
.getNames()));
|
||||||
|
|
||||||
Iterable<FireChief> chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames()));
|
return partitionChiefs == null ? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults(
|
||||||
return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults(
|
|
||||||
factory.mergeRunners(
|
factory.mergeRunners(
|
||||||
MoreExecutors.sameThreadExecutor(),
|
MoreExecutors.sameThreadExecutor(),
|
||||||
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
|
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
|
||||||
Iterables.transform(
|
Iterables.transform(
|
||||||
chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>()
|
partitionChiefs.values(), new Function<FireChief, QueryRunner<T>>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public QueryRunner<T> apply(FireChief input)
|
public QueryRunner<T> apply(FireChief fireChief)
|
||||||
{
|
{
|
||||||
return input.getQueryRunner(query);
|
return fireChief.getQueryRunner(query);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -166,22 +172,50 @@ public class RealtimeManager implements QuerySegmentWalker
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class FireChief extends Thread implements Closeable
|
@Override
|
||||||
|
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
|
||||||
|
{
|
||||||
|
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
||||||
|
final Map<Integer, FireChief> partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource()
|
||||||
|
.getNames()));
|
||||||
|
|
||||||
|
return partitionChiefs == null
|
||||||
|
? new NoopQueryRunner<T>()
|
||||||
|
: factory.getToolchest().mergeResults(
|
||||||
|
factory.mergeRunners(
|
||||||
|
MoreExecutors.sameThreadExecutor(),
|
||||||
|
Iterables.transform(
|
||||||
|
specs,
|
||||||
|
new Function<SegmentDescriptor, QueryRunner<T>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public QueryRunner<T> apply(SegmentDescriptor spec)
|
||||||
|
{
|
||||||
|
final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber());
|
||||||
|
return retVal == null ? new NoopQueryRunner<T>() : retVal.getQueryRunner(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class FireChief extends Thread implements Closeable
|
||||||
{
|
{
|
||||||
private final FireDepartment fireDepartment;
|
private final FireDepartment fireDepartment;
|
||||||
private final FireDepartmentMetrics metrics;
|
private final FireDepartmentMetrics metrics;
|
||||||
private final RealtimeTuningConfig config;
|
private final RealtimeTuningConfig config;
|
||||||
|
private final QueryRunnerFactoryConglomerate conglomerate;
|
||||||
|
|
||||||
private volatile Firehose firehose = null;
|
private volatile Firehose firehose = null;
|
||||||
private volatile FirehoseV2 firehoseV2 = null;
|
private volatile FirehoseV2 firehoseV2 = null;
|
||||||
private volatile Plumber plumber = null;
|
private volatile Plumber plumber = null;
|
||||||
private volatile boolean normalExit = true;
|
private volatile boolean normalExit = true;
|
||||||
|
|
||||||
public FireChief(
|
public FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate)
|
||||||
FireDepartment fireDepartment
|
|
||||||
)
|
|
||||||
{
|
{
|
||||||
this.fireDepartment = fireDepartment;
|
this.fireDepartment = fireDepartment;
|
||||||
|
this.conglomerate = conglomerate;
|
||||||
this.config = fireDepartment.getTuningConfig();
|
this.config = fireDepartment.getTuningConfig();
|
||||||
this.metrics = fireDepartment.getMetrics();
|
this.metrics = fireDepartment.getMetrics();
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,14 @@ package io.druid.segment.realtime;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.base.Suppliers;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.parsers.ParseException;
|
import com.metamx.common.parsers.ParseException;
|
||||||
|
import io.druid.collections.StupidPool;
|
||||||
import io.druid.data.input.Committer;
|
import io.druid.data.input.Committer;
|
||||||
import io.druid.data.input.Firehose;
|
import io.druid.data.input.Firehose;
|
||||||
import io.druid.data.input.FirehoseFactory;
|
import io.druid.data.input.FirehoseFactory;
|
||||||
|
@ -38,8 +42,23 @@ import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
|
import io.druid.query.QueryRunnerFactory;
|
||||||
|
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
|
import io.druid.query.QueryRunnerTestHelper;
|
||||||
|
import io.druid.query.SegmentDescriptor;
|
||||||
|
import io.druid.query.TestQueryRunners;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
|
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||||
|
import io.druid.query.dimension.DimensionSpec;
|
||||||
|
import io.druid.query.groupby.GroupByQuery;
|
||||||
|
import io.druid.query.groupby.GroupByQueryConfig;
|
||||||
|
import io.druid.query.groupby.GroupByQueryEngine;
|
||||||
|
import io.druid.query.groupby.GroupByQueryQueryToolChest;
|
||||||
|
import io.druid.query.groupby.GroupByQueryRunnerFactory;
|
||||||
|
import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
|
||||||
|
import io.druid.segment.TestHelper;
|
||||||
import io.druid.segment.incremental.IndexSizeExceededException;
|
import io.druid.segment.incremental.IndexSizeExceededException;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||||
|
@ -48,18 +67,23 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||||
import io.druid.segment.realtime.plumber.Plumber;
|
import io.druid.segment.realtime.plumber.Plumber;
|
||||||
import io.druid.segment.realtime.plumber.PlumberSchool;
|
import io.druid.segment.realtime.plumber.PlumberSchool;
|
||||||
import io.druid.segment.realtime.plumber.Sink;
|
import io.druid.segment.realtime.plumber.Sink;
|
||||||
|
import io.druid.timeline.partition.LinearShardSpec;
|
||||||
import io.druid.utils.Runnables;
|
import io.druid.utils.Runnables;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -68,10 +92,13 @@ public class RealtimeManagerTest
|
||||||
{
|
{
|
||||||
private RealtimeManager realtimeManager;
|
private RealtimeManager realtimeManager;
|
||||||
private RealtimeManager realtimeManager2;
|
private RealtimeManager realtimeManager2;
|
||||||
|
private RealtimeManager realtimeManager3;
|
||||||
private DataSchema schema;
|
private DataSchema schema;
|
||||||
private DataSchema schema2;
|
private DataSchema schema2;
|
||||||
private TestPlumber plumber;
|
private TestPlumber plumber;
|
||||||
private TestPlumber plumber2;
|
private TestPlumber plumber2;
|
||||||
|
private QueryRunnerFactory factory;
|
||||||
|
private CountDownLatch chiefStartedLatch;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
|
@ -180,6 +207,134 @@ public class RealtimeManagerTest
|
||||||
),
|
),
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
factory = initFactory();
|
||||||
|
|
||||||
|
RealtimeIOConfig ioConfig3 = new RealtimeIOConfig(
|
||||||
|
new FirehoseFactory()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Firehose connect(InputRowParser parser) throws IOException
|
||||||
|
{
|
||||||
|
return new InfiniteTestFirehose();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new PlumberSchool()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Plumber findPlumber(
|
||||||
|
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return plumber;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
RealtimeTuningConfig tuningConfig_0 = new RealtimeTuningConfig(
|
||||||
|
1,
|
||||||
|
new Period("P1Y"),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new LinearShardSpec(0),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
RealtimeTuningConfig tuningConfig_1 = new RealtimeTuningConfig(
|
||||||
|
1,
|
||||||
|
new Period("P1Y"),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new LinearShardSpec(1),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSchema schema2 = new DataSchema(
|
||||||
|
"testing",
|
||||||
|
null,
|
||||||
|
new AggregatorFactory[]{new CountAggregatorFactory("ignore")},
|
||||||
|
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
|
||||||
|
jsonMapper
|
||||||
|
);
|
||||||
|
|
||||||
|
FireDepartment department_0 = new FireDepartment(schema2, ioConfig3, tuningConfig_0);
|
||||||
|
FireDepartment department_1 = new FireDepartment(schema2, ioConfig3, tuningConfig_1);
|
||||||
|
|
||||||
|
QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
|
||||||
|
{
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
chiefStartedLatch = new CountDownLatch(2);
|
||||||
|
|
||||||
|
RealtimeManager.FireChief fireChief_0 = new RealtimeManager.FireChief(department_0, conglomerate)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
super.initPlumber();
|
||||||
|
chiefStartedLatch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
RealtimeManager.FireChief fireChief_1 = new RealtimeManager.FireChief(department_1, conglomerate)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
super.initPlumber();
|
||||||
|
chiefStartedLatch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
realtimeManager3 = new RealtimeManager(
|
||||||
|
Arrays.asList(department_0, department_1),
|
||||||
|
conglomerate,
|
||||||
|
ImmutableMap.<String, Map<Integer, RealtimeManager.FireChief>>of(
|
||||||
|
"testing",
|
||||||
|
ImmutableMap.of(
|
||||||
|
0,
|
||||||
|
fireChief_0,
|
||||||
|
1,
|
||||||
|
fireChief_1
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
startFireChiefWithPartitionNum(fireChief_0, 0);
|
||||||
|
startFireChiefWithPartitionNum(fireChief_1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startFireChiefWithPartitionNum(RealtimeManager.FireChief fireChief, int partitionNum)
|
||||||
|
{
|
||||||
|
fireChief.setName(
|
||||||
|
String.format(
|
||||||
|
"chief-%s[%s]",
|
||||||
|
"testing",
|
||||||
|
partitionNum
|
||||||
|
)
|
||||||
|
);
|
||||||
|
fireChief.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -224,6 +379,176 @@ public class RealtimeManagerTest
|
||||||
Assert.assertEquals(0, plumber2.getPersistCount());
|
Assert.assertEquals(0, plumber2.getPersistCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5_000L)
|
||||||
|
public void testQueryWithInterval() throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 270L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 236L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 316L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 240L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 5740L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 242L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 5800L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 156L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 238L),
|
||||||
|
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 294L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 224L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 332L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 226L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4894L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 228L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 5010L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 194L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 252L)
|
||||||
|
);
|
||||||
|
|
||||||
|
chiefStartedLatch.await();
|
||||||
|
|
||||||
|
for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
|
||||||
|
plumber.setRunner(runner);
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
new LongSumAggregatorFactory("idx", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
|
||||||
|
factory,
|
||||||
|
realtimeManager3.getQueryRunnerForIntervals(
|
||||||
|
query,
|
||||||
|
QueryRunnerTestHelper.firstToThird.getIntervals()
|
||||||
|
),
|
||||||
|
query
|
||||||
|
);
|
||||||
|
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5_000L)
|
||||||
|
public void testQueryWithSegmentSpec() throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
|
||||||
|
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
|
||||||
|
);
|
||||||
|
|
||||||
|
chiefStartedLatch.await();
|
||||||
|
|
||||||
|
for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
|
||||||
|
plumber.setRunner(runner);
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
new LongSumAggregatorFactory("idx", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
|
||||||
|
factory,
|
||||||
|
realtimeManager3.getQueryRunnerForSegments(
|
||||||
|
query,
|
||||||
|
ImmutableList.<SegmentDescriptor>of(
|
||||||
|
new SegmentDescriptor(
|
||||||
|
new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"),
|
||||||
|
"ver",
|
||||||
|
0
|
||||||
|
))
|
||||||
|
),
|
||||||
|
query
|
||||||
|
);
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
|
|
||||||
|
results = GroupByQueryRunnerTestHelper.runQuery(
|
||||||
|
factory,
|
||||||
|
realtimeManager3.getQueryRunnerForSegments(
|
||||||
|
query,
|
||||||
|
ImmutableList.<SegmentDescriptor>of(
|
||||||
|
new SegmentDescriptor(
|
||||||
|
new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"),
|
||||||
|
"ver",
|
||||||
|
1
|
||||||
|
))
|
||||||
|
),
|
||||||
|
query
|
||||||
|
);
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private GroupByQueryRunnerFactory initFactory()
|
||||||
|
{
|
||||||
|
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
final StupidPool<ByteBuffer> pool = new StupidPool<>(
|
||||||
|
new Supplier<ByteBuffer>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ByteBuffer get()
|
||||||
|
{
|
||||||
|
return ByteBuffer.allocate(1024 * 1024);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
final GroupByQueryConfig config = new GroupByQueryConfig();
|
||||||
|
config.setMaxIntermediateRows(10000);
|
||||||
|
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
|
||||||
|
final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool);
|
||||||
|
return new GroupByQueryRunnerFactory(
|
||||||
|
engine,
|
||||||
|
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||||
|
configSupplier,
|
||||||
|
new GroupByQueryQueryToolChest(
|
||||||
|
configSupplier, mapper, engine, TestQueryRunners.pool,
|
||||||
|
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||||
|
),
|
||||||
|
TestQueryRunners.pool
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
realtimeManager.stop();
|
||||||
|
realtimeManager2.stop();
|
||||||
|
realtimeManager3.stop();
|
||||||
|
}
|
||||||
|
|
||||||
private TestInputRowHolder makeRow(final long timestamp)
|
private TestInputRowHolder makeRow(final long timestamp)
|
||||||
{
|
{
|
||||||
return new TestInputRowHolder(timestamp, null);
|
return new TestInputRowHolder(timestamp, null);
|
||||||
|
@ -304,6 +629,35 @@ public class RealtimeManagerTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class InfiniteTestFirehose implements Firehose
|
||||||
|
{
|
||||||
|
private boolean hasMore = true;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMore()
|
||||||
|
{
|
||||||
|
return hasMore;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputRow nextRow()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable commit()
|
||||||
|
{
|
||||||
|
return Runnables.getNoopRunnable();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
hasMore = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestFirehose implements Firehose
|
private static class TestFirehose implements Firehose
|
||||||
{
|
{
|
||||||
private final Iterator<TestInputRowHolder> rows;
|
private final Iterator<TestInputRowHolder> rows;
|
||||||
|
@ -419,6 +773,8 @@ public class RealtimeManagerTest
|
||||||
private volatile boolean finishedJob = false;
|
private volatile boolean finishedJob = false;
|
||||||
private volatile int persistCount = 0;
|
private volatile int persistCount = 0;
|
||||||
|
|
||||||
|
private QueryRunner runner;
|
||||||
|
|
||||||
private TestPlumber(Sink sink)
|
private TestPlumber(Sink sink)
|
||||||
{
|
{
|
||||||
this.sink = sink;
|
this.sink = sink;
|
||||||
|
@ -473,8 +829,11 @@ public class RealtimeManagerTest
|
||||||
@Override
|
@Override
|
||||||
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
|
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
|
||||||
{
|
{
|
||||||
|
if (runner == null) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
return runner;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void persist(Committer committer)
|
public void persist(Committer committer)
|
||||||
|
@ -487,5 +846,11 @@ public class RealtimeManagerTest
|
||||||
{
|
{
|
||||||
finishedJob = true;
|
finishedJob = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRunner(QueryRunner runner)
|
||||||
|
{
|
||||||
|
this.runner = runner;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue