mirror of https://github.com/apache/druid.git
Added checkstyle for "Methods starting with Capital Letters" (#7118)
* Added checkstyle for "Methods starting with Capital Letters" and changed the method names violating this. * Un-abbreviate the method names in the calcite tests * Fixed checkstyle errors * Changed asserts position in the code
This commit is contained in:
parent
1c2753ab90
commit
8b803cbc22
|
@ -291,5 +291,10 @@ codestyle/checkstyle.xml. "/>
|
|||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message" value="Duplicate line"/>
|
||||
</module>
|
||||
|
||||
<!-- Added as per the issue #6936 - Prohibit method names starting with capital letters -->
|
||||
<module name="MethodName">
|
||||
<property name = "format" value = "^[a-z_]*[a-z0-9][a-zA-Z0-9_]*$"/>
|
||||
</module>
|
||||
</module>
|
||||
</module>
|
||||
|
|
|
@ -274,7 +274,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
|
|||
null,
|
||||
null
|
||||
),
|
||||
BaseCalciteQueryTest.NOT(BaseCalciteQueryTest.SELECTOR("dim2", "", null))
|
||||
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
|
||||
),
|
||||
new HllSketchBuildAggregatorFactory(
|
||||
"a3",
|
||||
|
|
|
@ -342,7 +342,7 @@ public class DoublesSketchAggregatorTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void QueryingDataWithFieldNameValueAsFloatInsteadOfSketch() throws Exception
|
||||
public void queryingDataWithFieldNameValueAsFloatInsteadOfSketch() throws Exception
|
||||
{
|
||||
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
|
||||
|
@ -418,7 +418,7 @@ public class DoublesSketchAggregatorTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void TimeSeriesQueryInputAsFloat() throws Exception
|
||||
public void timeSeriesQueryInputAsFloat() throws Exception
|
||||
{
|
||||
Sequence<Row> seq = timeSeriesHelper.createIndexAndRunQueryOnSegment(
|
||||
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
|
||||
|
|
|
@ -278,7 +278,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
|
|||
null,
|
||||
null
|
||||
),
|
||||
BaseCalciteQueryTest.NOT(BaseCalciteQueryTest.SELECTOR("dim2", "", null))
|
||||
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
|
||||
),
|
||||
new SketchMergeAggregatorFactory(
|
||||
"a3",
|
||||
|
|
|
@ -114,12 +114,12 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(Filtration.eternity()))
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.filters(
|
||||
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null)
|
||||
)
|
||||
.aggregators(AGGS(new CountAggregatorFactory("a0")))
|
||||
.aggregators(aggregators(new CountAggregatorFactory("a0")))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
|
@ -146,7 +146,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(Filtration.eternity()))
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.virtualColumns()
|
||||
.filters(
|
||||
|
@ -155,7 +155,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
|
|||
createExprMacroTable()
|
||||
)
|
||||
)
|
||||
.aggregators(AGGS(new CountAggregatorFactory("a0")))
|
||||
.aggregators(aggregators(new CountAggregatorFactory("a0")))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
|
@ -178,7 +178,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(Filtration.eternity()))
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.virtualColumns()
|
||||
.filters(
|
||||
|
@ -187,7 +187,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
|
|||
createExprMacroTable()
|
||||
)
|
||||
)
|
||||
.aggregators(AGGS(new CountAggregatorFactory("a0")))
|
||||
.aggregators(aggregators(new CountAggregatorFactory("a0")))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
|
@ -214,7 +214,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(Filtration.eternity()))
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.filters(
|
||||
new OrDimFilter(
|
||||
|
@ -222,7 +222,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
|
|||
new BloomDimFilter("dim2", BloomKFilterHolder.fromBloomKFilter(filter2), null)
|
||||
)
|
||||
)
|
||||
.aggregators(AGGS(new CountAggregatorFactory("a0")))
|
||||
.aggregators(aggregators(new CountAggregatorFactory("a0")))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
|
|
|
@ -28,10 +28,10 @@ import java.util.Map;
|
|||
|
||||
public class KafkaDataSourceMetadataTest
|
||||
{
|
||||
private static final KafkaDataSourceMetadata KM0 = KM("foo", ImmutableMap.of());
|
||||
private static final KafkaDataSourceMetadata KM1 = KM("foo", ImmutableMap.of(0, 2L, 1, 3L));
|
||||
private static final KafkaDataSourceMetadata KM2 = KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
|
||||
private static final KafkaDataSourceMetadata KM3 = KM("foo", ImmutableMap.of(0, 2L, 2, 5L));
|
||||
private static final KafkaDataSourceMetadata KM0 = km("foo", ImmutableMap.of());
|
||||
private static final KafkaDataSourceMetadata KM1 = km("foo", ImmutableMap.of(0, 2L, 1, 3L));
|
||||
private static final KafkaDataSourceMetadata KM2 = km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
|
||||
private static final KafkaDataSourceMetadata KM3 = km("foo", ImmutableMap.of(0, 2L, 2, 5L));
|
||||
|
||||
@Test
|
||||
public void testMatches()
|
||||
|
@ -70,27 +70,27 @@ public class KafkaDataSourceMetadataTest
|
|||
public void testPlus()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
|
||||
km("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
|
||||
KM1.plus(KM3)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
|
||||
km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
|
||||
KM0.plus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
|
||||
km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
|
||||
KM1.plus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
|
||||
km("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
|
||||
KM2.plus(KM1)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
|
||||
km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
|
||||
KM2.plus(KM2)
|
||||
);
|
||||
}
|
||||
|
@ -99,32 +99,32 @@ public class KafkaDataSourceMetadataTest
|
|||
public void testMinus()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of(1, 3L)),
|
||||
km("foo", ImmutableMap.of(1, 3L)),
|
||||
KM1.minus(KM3)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of()),
|
||||
km("foo", ImmutableMap.of()),
|
||||
KM0.minus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of()),
|
||||
km("foo", ImmutableMap.of()),
|
||||
KM1.minus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of(2, 5L)),
|
||||
km("foo", ImmutableMap.of(2, 5L)),
|
||||
KM2.minus(KM1)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of()),
|
||||
km("foo", ImmutableMap.of()),
|
||||
KM2.minus(KM2)
|
||||
);
|
||||
}
|
||||
|
||||
private static KafkaDataSourceMetadata KM(String topic, Map<Integer, Long> offsets)
|
||||
private static KafkaDataSourceMetadata km(String topic, Map<Integer, Long> offsets)
|
||||
{
|
||||
return new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, offsets));
|
||||
}
|
||||
|
|
|
@ -277,21 +277,21 @@ public class KafkaIndexTaskTest
|
|||
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
|
||||
{
|
||||
return ImmutableList.of(
|
||||
new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
|
||||
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
|
||||
new ProducerRecord<>(topic, 0, null, null),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")),
|
||||
new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0"))
|
||||
new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "notanumber", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "20.0", "notanumber")),
|
||||
new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0"))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -411,8 +411,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -461,8 +461,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -570,13 +570,13 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = sd(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc6 = sd(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
|
||||
|
@ -696,6 +696,7 @@ public class KafkaIndexTaskTest
|
|||
}
|
||||
final Map<Integer, Long> nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
|
||||
|
||||
|
||||
Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets));
|
||||
task.getRunner().setEndOffsets(nextOffsets, false);
|
||||
|
||||
|
@ -729,13 +730,18 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = sd(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc6 = sd(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
|
||||
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
|
||||
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
|
||||
|
@ -844,8 +850,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L, 1, 0L))),
|
||||
|
@ -865,13 +871,13 @@ public class KafkaIndexTaskTest
|
|||
}
|
||||
|
||||
List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
|
||||
new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0"))
|
||||
new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0"))
|
||||
);
|
||||
|
||||
final String baseSequenceName = "sequence0";
|
||||
|
@ -973,8 +979,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -1023,9 +1029,9 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -1083,7 +1089,7 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2009/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -1164,8 +1170,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -1212,8 +1218,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -1307,10 +1313,10 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2049/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2049/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L))),
|
||||
|
@ -1463,8 +1469,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -1528,8 +1534,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata, should all be from the first task
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -1581,8 +1587,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
|
||||
|
||||
|
@ -1599,8 +1605,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = SD(task2, "2013/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task2, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = sd(task2, "2013/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
|
||||
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
|
||||
|
||||
|
@ -1643,11 +1649,11 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
// desc3 will not be created in KafkaIndexTask (0.12.x) as it does not create per Kafka partition Druid segments
|
||||
SegmentDescriptor desc3 = SD(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = SD(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = sd(task, "2012/P1D", 0);
|
||||
Assert.assertEquals(isIncrementalHandoffSupported
|
||||
? ImmutableSet.of(desc1, desc2, desc4)
|
||||
: ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
|
||||
|
@ -1722,9 +1728,9 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task2, "2012/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task2, "2012/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L, 1, 1L))),
|
||||
|
@ -1820,8 +1826,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))),
|
||||
|
@ -1909,8 +1915,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))),
|
||||
|
@ -2037,8 +2043,8 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
|
||||
|
@ -2178,10 +2184,10 @@ public class KafkaIndexTaskTest
|
|||
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2049/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2049/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L))),
|
||||
|
@ -2669,7 +2675,7 @@ public class KafkaIndexTaskTest
|
|||
return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows"));
|
||||
}
|
||||
|
||||
private static byte[] JB(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
private static byte[] jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
{
|
||||
try {
|
||||
return new ObjectMapper().writeValueAsBytes(
|
||||
|
@ -2688,7 +2694,7 @@ public class KafkaIndexTaskTest
|
|||
}
|
||||
}
|
||||
|
||||
private SegmentDescriptor SD(final Task task, final String intervalString, final int partitionNum)
|
||||
private SegmentDescriptor sd(final Task task, final String intervalString, final int partitionNum)
|
||||
{
|
||||
final Interval interval = Intervals.of(intervalString);
|
||||
return new SegmentDescriptor(interval, getLock(task, interval).getVersion(), partitionNum);
|
||||
|
|
|
@ -66,25 +66,25 @@ public class KafkaRecordSupplierTest
|
|||
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
|
||||
{
|
||||
return ImmutableList.of(
|
||||
new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
|
||||
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
|
||||
new ProducerRecord<>(topic, 0, null, null),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, JB("2049", "f", "y", "10", "notanumber", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, JB("2049", "f", "y", "10", "20.0", "notanumber")),
|
||||
new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0"))
|
||||
new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")),
|
||||
new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0"))
|
||||
);
|
||||
}
|
||||
|
||||
private static byte[] JB(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
private static byte[] jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
{
|
||||
try {
|
||||
return new ObjectMapper().writeValueAsBytes(
|
||||
|
|
|
@ -29,10 +29,10 @@ import java.util.Map;
|
|||
|
||||
public class KinesisDataSourceMetadataTest
|
||||
{
|
||||
private static final KinesisDataSourceMetadata KM0 = KM("foo", ImmutableMap.of());
|
||||
private static final KinesisDataSourceMetadata KM1 = KM("foo", ImmutableMap.of("0", "2L", "1", "3L"));
|
||||
private static final KinesisDataSourceMetadata KM2 = KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L"));
|
||||
private static final KinesisDataSourceMetadata KM3 = KM("foo", ImmutableMap.of("0", "2L", "2", "5L"));
|
||||
private static final KinesisDataSourceMetadata KM0 = km("foo", ImmutableMap.of());
|
||||
private static final KinesisDataSourceMetadata KM1 = km("foo", ImmutableMap.of("0", "2L", "1", "3L"));
|
||||
private static final KinesisDataSourceMetadata KM2 = km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L"));
|
||||
private static final KinesisDataSourceMetadata KM3 = km("foo", ImmutableMap.of("0", "2L", "2", "5L"));
|
||||
|
||||
@Test
|
||||
public void testMatches()
|
||||
|
@ -71,27 +71,27 @@ public class KinesisDataSourceMetadataTest
|
|||
public void testPlus()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
|
||||
km("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
|
||||
KM1.plus(KM3)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
|
||||
km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
|
||||
KM0.plus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
|
||||
km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
|
||||
KM1.plus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
|
||||
km("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
|
||||
KM2.plus(KM1)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
|
||||
km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
|
||||
KM2.plus(KM2)
|
||||
);
|
||||
}
|
||||
|
@ -100,32 +100,32 @@ public class KinesisDataSourceMetadataTest
|
|||
public void testMinus()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of("1", "3L")),
|
||||
km("foo", ImmutableMap.of("1", "3L")),
|
||||
KM1.minus(KM3)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of()),
|
||||
km("foo", ImmutableMap.of()),
|
||||
KM0.minus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of()),
|
||||
km("foo", ImmutableMap.of()),
|
||||
KM1.minus(KM2)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of("2", "5L")),
|
||||
km("foo", ImmutableMap.of("2", "5L")),
|
||||
KM2.minus(KM1)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
KM("foo", ImmutableMap.of()),
|
||||
km("foo", ImmutableMap.of()),
|
||||
KM2.minus(KM2)
|
||||
);
|
||||
}
|
||||
|
||||
private static KinesisDataSourceMetadata KM(String stream, Map<String, String> sequences)
|
||||
private static KinesisDataSourceMetadata km(String stream, Map<String, String> sequences)
|
||||
{
|
||||
return new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, sequences));
|
||||
}
|
||||
|
|
|
@ -198,26 +198,26 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
private static String shardId0 = "0";
|
||||
private static KinesisRecordSupplier recordSupplier;
|
||||
private static List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of(
|
||||
new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(
|
||||
stream,
|
||||
"1",
|
||||
"5",
|
||||
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
|
||||
jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
|
||||
),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "9", JB("2013", "f", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "10", JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "11", JB("2049", "f", "y", "10", "notanumber", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "12", JB("2049", "f", "y", "10", "20.0", "notanumber")),
|
||||
new OrderedPartitionableRecord<>(stream, "0", "0", JB("2012", "g", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "0", "1", JB("2011", "h", "y", "10", "20.0", "1.0"))
|
||||
new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")),
|
||||
new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0"))
|
||||
);
|
||||
|
||||
private static ServiceEmitter emitter;
|
||||
|
@ -404,8 +404,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(
|
||||
|
@ -484,8 +484,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2012/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -609,13 +609,13 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = sd(task, "2011/P1D", 1);
|
||||
SegmentDescriptor desc6 = sd(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
|
||||
|
@ -770,12 +770,12 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = SD(task, "2049/P1D", 0);
|
||||
SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc5 = sd(task, "2049/P1D", 0);
|
||||
SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc7), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
|
||||
|
@ -857,8 +857,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -940,9 +940,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -1033,7 +1033,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2009/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -1171,8 +1171,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
|
||||
|
@ -1248,8 +1248,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
|
||||
|
@ -1395,10 +1395,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getUnparseable());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2049/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task, "2013/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2049/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -1620,8 +1620,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -1731,8 +1731,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata, should all be from the first task
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -1829,8 +1829,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
|
||||
|
||||
|
@ -1849,8 +1849,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = SD(task2, "2013/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task2, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = sd(task2, "2013/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
|
||||
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
|
||||
|
||||
|
@ -1928,9 +1928,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc4 = SD(task, "2012/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc4 = sd(task, "2012/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc4), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
|
||||
|
@ -2043,10 +2043,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = SD(task2, "2012/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc3 = sd(task2, "2011/P1D", 1);
|
||||
SegmentDescriptor desc4 = sd(task2, "2012/P1D", 0);
|
||||
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
|
@ -2196,8 +2196,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published segments & metadata
|
||||
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(
|
||||
|
@ -2317,8 +2317,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(
|
||||
|
@ -2407,8 +2407,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
|
||||
|
||||
// Check published metadata
|
||||
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
|
||||
SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
|
||||
SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
|
||||
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
|
||||
Assert.assertEquals(
|
||||
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
|
||||
|
@ -2877,7 +2877,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows"));
|
||||
}
|
||||
|
||||
private static List<byte[]> JB(
|
||||
private static List<byte[]> jb(
|
||||
String timestamp,
|
||||
String dim1,
|
||||
String dim2,
|
||||
|
@ -2903,7 +2903,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
|||
}
|
||||
}
|
||||
|
||||
private SegmentDescriptor SD(final Task task, final String intervalString, final int partitionNum)
|
||||
private SegmentDescriptor sd(final Task task, final String intervalString, final int partitionNum)
|
||||
{
|
||||
final Interval interval = Intervals.of(intervalString);
|
||||
return new SegmentDescriptor(interval, getLock(task, interval).getVersion(), partitionNum);
|
||||
|
|
|
@ -76,20 +76,20 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
|
|||
private static Shard shard1;
|
||||
private static KinesisRecordSupplier recordSupplier;
|
||||
private static List<Record> shard1Records = ImmutableList.of(
|
||||
new Record().withData(JB("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
|
||||
new Record().withData(JB("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"),
|
||||
new Record().withData(JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")).withSequenceNumber("2"),
|
||||
new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
|
||||
new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"),
|
||||
new Record().withData(jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")).withSequenceNumber("2"),
|
||||
new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("unparseable"))).withSequenceNumber("3"),
|
||||
new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("unparseable2"))).withSequenceNumber("4"),
|
||||
new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("{}"))).withSequenceNumber("5"),
|
||||
new Record().withData(JB("2013", "f", "y", "10", "20.0", "1.0")).withSequenceNumber("6"),
|
||||
new Record().withData(JB("2049", "f", "y", "notanumber", "20.0", "1.0")).withSequenceNumber("7"),
|
||||
new Record().withData(JB("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"),
|
||||
new Record().withData(JB("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9")
|
||||
new Record().withData(jb("2013", "f", "y", "10", "20.0", "1.0")).withSequenceNumber("6"),
|
||||
new Record().withData(jb("2049", "f", "y", "notanumber", "20.0", "1.0")).withSequenceNumber("7"),
|
||||
new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"),
|
||||
new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9")
|
||||
);
|
||||
private static List<Record> shard0Records = ImmutableList.of(
|
||||
new Record().withData(JB("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
|
||||
new Record().withData(JB("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1")
|
||||
new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
|
||||
new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1")
|
||||
);
|
||||
private static List<Object> allRecords = ImmutableList.builder()
|
||||
.addAll(shard0Records.stream()
|
||||
|
@ -120,7 +120,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
|
|||
.toList()))
|
||||
.build();
|
||||
|
||||
private static ByteBuffer JB(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
{
|
||||
try {
|
||||
return ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(
|
||||
|
|
|
@ -3560,7 +3560,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
);
|
||||
}
|
||||
|
||||
private static List<byte[]> JB(
|
||||
private static List<byte[]> jb(
|
||||
String timestamp,
|
||||
String dim1,
|
||||
String dim2,
|
||||
|
|
|
@ -152,7 +152,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
|
|||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
|
||||
private static TestCase TC(
|
||||
private static TestCase tc(
|
||||
String intervalString,
|
||||
int expectedCount,
|
||||
long expectedSum,
|
||||
|
@ -174,7 +174,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
|
|||
);
|
||||
}
|
||||
|
||||
private static DataSegmentMaker DS(
|
||||
private static DataSegmentMaker ds(
|
||||
String intervalString,
|
||||
String version,
|
||||
int partitionNum,
|
||||
|
@ -184,7 +184,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
|
|||
return new DataSegmentMaker(Intervals.of(intervalString), version, partitionNum, Arrays.asList(rows));
|
||||
}
|
||||
|
||||
private static InputRow IR(String timeString, long metricValue)
|
||||
private static InputRow ir(String timeString, long metricValue)
|
||||
{
|
||||
return new MapBasedInputRow(
|
||||
DateTimes.of(timeString).getMillis(),
|
||||
|
@ -236,34 +236,34 @@ public class IngestSegmentFirehoseFactoryTimelineTest
|
|||
public static Collection<Object[]> constructorFeeder()
|
||||
{
|
||||
final List<TestCase> testCases = ImmutableList.of(
|
||||
TC(
|
||||
tc(
|
||||
"2000/2000T02", 3, 7,
|
||||
DS("2000/2000T01", "v1", 0, IR("2000", 1), IR("2000T00:01", 2)),
|
||||
DS("2000T01/2000T02", "v1", 0, IR("2000T01", 4))
|
||||
ds("2000/2000T01", "v1", 0, ir("2000", 1), ir("2000T00:01", 2)),
|
||||
ds("2000T01/2000T02", "v1", 0, ir("2000T01", 4))
|
||||
) /* Adjacent segments */,
|
||||
TC(
|
||||
tc(
|
||||
"2000/2000T02", 3, 7,
|
||||
DS("2000/2000T02", "v1", 0, IR("2000", 1), IR("2000T00:01", 2), IR("2000T01", 8)),
|
||||
DS("2000T01/2000T02", "v2", 0, IR("2000T01:01", 4))
|
||||
ds("2000/2000T02", "v1", 0, ir("2000", 1), ir("2000T00:01", 2), ir("2000T01", 8)),
|
||||
ds("2000T01/2000T02", "v2", 0, ir("2000T01:01", 4))
|
||||
) /* 1H segment overlaid on top of 2H segment */,
|
||||
TC(
|
||||
tc(
|
||||
"2000/2000-01-02", 4, 23,
|
||||
DS("2000/2000-01-02", "v1", 0, IR("2000", 1), IR("2000T00:01", 2), IR("2000T01", 8), IR("2000T02", 16)),
|
||||
DS("2000T01/2000T02", "v2", 0, IR("2000T01:01", 4))
|
||||
ds("2000/2000-01-02", "v1", 0, ir("2000", 1), ir("2000T00:01", 2), ir("2000T01", 8), ir("2000T02", 16)),
|
||||
ds("2000T01/2000T02", "v2", 0, ir("2000T01:01", 4))
|
||||
) /* 1H segment overlaid on top of 1D segment */,
|
||||
TC(
|
||||
tc(
|
||||
"2000/2000T02", 4, 15,
|
||||
DS("2000/2000T02", "v1", 0, IR("2000", 1), IR("2000T00:01", 2), IR("2000T01", 8)),
|
||||
DS("2000/2000T02", "v1", 1, IR("2000T01:01", 4))
|
||||
ds("2000/2000T02", "v1", 0, ir("2000", 1), ir("2000T00:01", 2), ir("2000T01", 8)),
|
||||
ds("2000/2000T02", "v1", 1, ir("2000T01:01", 4))
|
||||
) /* Segment set with two segments for the same interval */,
|
||||
TC(
|
||||
tc(
|
||||
"2000T01/2000T02", 1, 2,
|
||||
DS("2000/2000T03", "v1", 0, IR("2000", 1), IR("2000T01", 2), IR("2000T02", 4))
|
||||
ds("2000/2000T03", "v1", 0, ir("2000", 1), ir("2000T01", 2), ir("2000T02", 4))
|
||||
) /* Segment wider than desired interval */,
|
||||
TC(
|
||||
tc(
|
||||
"2000T02/2000T04", 2, 12,
|
||||
DS("2000/2000T03", "v1", 0, IR("2000", 1), IR("2000T01", 2), IR("2000T02", 4)),
|
||||
DS("2000T03/2000T04", "v1", 0, IR("2000T03", 8))
|
||||
ds("2000/2000T03", "v1", 0, ir("2000", 1), ir("2000T01", 2), ir("2000T02", 4)),
|
||||
ds("2000T03/2000T04", "v1", 0, ir("2000T03", 8))
|
||||
) /* Segment intersecting desired interval */
|
||||
);
|
||||
|
||||
|
|
|
@ -194,16 +194,16 @@ public class TaskLifecycleTest
|
|||
private static DateTime now = DateTimes.nowUtc();
|
||||
|
||||
private static final Iterable<InputRow> realtimeIdxTaskInputRows = ImmutableList.of(
|
||||
IR(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f),
|
||||
IR(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f),
|
||||
IR(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f)
|
||||
ir(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f),
|
||||
ir(now.plus(new Period(Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f),
|
||||
ir(now.plus(new Period(Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f)
|
||||
);
|
||||
|
||||
private static final Iterable<InputRow> IdxTaskInputRows = ImmutableList.of(
|
||||
IR("2010-01-01T01", "x", "y", 1),
|
||||
IR("2010-01-01T01", "x", "z", 1),
|
||||
IR("2010-01-02T01", "a", "b", 2),
|
||||
IR("2010-01-02T01", "a", "c", 1)
|
||||
ir("2010-01-01T01", "x", "y", 1),
|
||||
ir("2010-01-01T01", "x", "z", 1),
|
||||
ir("2010-01-02T01", "a", "b", 2),
|
||||
ir("2010-01-02T01", "a", "c", 1)
|
||||
);
|
||||
|
||||
@Rule
|
||||
|
@ -240,7 +240,7 @@ public class TaskLifecycleTest
|
|||
return new NoopServiceEmitter();
|
||||
}
|
||||
|
||||
private static InputRow IR(String dt, String dim1, String dim2, float met)
|
||||
private static InputRow ir(String dt, String dim1, String dim2, float met)
|
||||
{
|
||||
return new MapBasedInputRow(
|
||||
DateTimes.of(dt).getMillis(),
|
||||
|
|
|
@ -394,7 +394,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
|
|||
strategySelector,
|
||||
new GroupByQueryQueryToolChest(
|
||||
strategySelector,
|
||||
NoopIntervalChunkingQueryRunnerDecorator()
|
||||
noopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -402,7 +402,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
|
|||
tooSmallStrategySelector,
|
||||
new GroupByQueryQueryToolChest(
|
||||
tooSmallStrategySelector,
|
||||
NoopIntervalChunkingQueryRunnerDecorator()
|
||||
noopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -679,7 +679,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
|
|||
}
|
||||
};
|
||||
|
||||
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
||||
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
|
||||
{
|
||||
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
|
||||
{
|
||||
|
|
|
@ -426,7 +426,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
|
|||
strategySelector,
|
||||
new GroupByQueryQueryToolChest(
|
||||
strategySelector,
|
||||
NoopIntervalChunkingQueryRunnerDecorator()
|
||||
noopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -434,7 +434,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
|
|||
strategySelector2,
|
||||
new GroupByQueryQueryToolChest(
|
||||
strategySelector2,
|
||||
NoopIntervalChunkingQueryRunnerDecorator()
|
||||
noopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -780,7 +780,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
|
|||
}
|
||||
};
|
||||
|
||||
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
||||
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
|
||||
{
|
||||
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
|
||||
{
|
||||
|
|
|
@ -287,7 +287,7 @@ public class GroupByMultiSegmentTest
|
|||
strategySelector,
|
||||
new GroupByQueryQueryToolChest(
|
||||
strategySelector,
|
||||
NoopIntervalChunkingQueryRunnerDecorator()
|
||||
noopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -419,7 +419,7 @@ public class GroupByMultiSegmentTest
|
|||
}
|
||||
};
|
||||
|
||||
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
||||
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
|
||||
{
|
||||
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
|
||||
@Override
|
||||
|
|
|
@ -36,7 +36,7 @@ public class LookupConfigTest
|
|||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void TestSerDesr() throws IOException
|
||||
public void testSerDesr() throws IOException
|
||||
{
|
||||
LookupConfig lookupConfig = new LookupConfig(temporaryFolder.newFile().getAbsolutePath());
|
||||
Assert.assertEquals(
|
||||
|
|
|
@ -117,22 +117,22 @@ public class ExpressionFilterTest extends BaseFilterTest
|
|||
@Test
|
||||
public void testOneSingleValuedStringColumn()
|
||||
{
|
||||
assertFilterMatches(EDF("dim3 == ''"), ImmutableList.of("0"));
|
||||
assertFilterMatches(EDF("dim3 == '1'"), ImmutableList.of("3", "4", "6"));
|
||||
assertFilterMatches(EDF("dim3 == 'a'"), ImmutableList.of("7"));
|
||||
assertFilterMatches(EDF("dim3 == 1"), ImmutableList.of("3", "4", "6"));
|
||||
assertFilterMatches(EDF("dim3 == 1.0"), ImmutableList.of("3", "4", "6"));
|
||||
assertFilterMatches(EDF("dim3 == 1.234"), ImmutableList.of("9"));
|
||||
assertFilterMatches(EDF("dim3 < '2'"), ImmutableList.of("0", "1", "3", "4", "6", "9"));
|
||||
assertFilterMatches(edf("dim3 == ''"), ImmutableList.of("0"));
|
||||
assertFilterMatches(edf("dim3 == '1'"), ImmutableList.of("3", "4", "6"));
|
||||
assertFilterMatches(edf("dim3 == 'a'"), ImmutableList.of("7"));
|
||||
assertFilterMatches(edf("dim3 == 1"), ImmutableList.of("3", "4", "6"));
|
||||
assertFilterMatches(edf("dim3 == 1.0"), ImmutableList.of("3", "4", "6"));
|
||||
assertFilterMatches(edf("dim3 == 1.234"), ImmutableList.of("9"));
|
||||
assertFilterMatches(edf("dim3 < '2'"), ImmutableList.of("0", "1", "3", "4", "6", "9"));
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(EDF("dim3 < 2"), ImmutableList.of("0", "3", "4", "6", "7", "9"));
|
||||
assertFilterMatches(EDF("dim3 < 2.0"), ImmutableList.of("0", "3", "4", "6", "7", "9"));
|
||||
assertFilterMatches(edf("dim3 < 2"), ImmutableList.of("0", "3", "4", "6", "7", "9"));
|
||||
assertFilterMatches(edf("dim3 < 2.0"), ImmutableList.of("0", "3", "4", "6", "7", "9"));
|
||||
} else {
|
||||
// Empty String and "a" will not match
|
||||
assertFilterMatches(EDF("dim3 < 2"), ImmutableList.of("3", "4", "6", "9"));
|
||||
assertFilterMatches(EDF("dim3 < 2.0"), ImmutableList.of("3", "4", "6", "9"));
|
||||
assertFilterMatches(edf("dim3 < 2"), ImmutableList.of("3", "4", "6", "9"));
|
||||
assertFilterMatches(edf("dim3 < 2.0"), ImmutableList.of("3", "4", "6", "9"));
|
||||
}
|
||||
assertFilterMatches(EDF("like(dim3, '1%')"), ImmutableList.of("1", "3", "4", "6", "9"));
|
||||
assertFilterMatches(edf("like(dim3, '1%')"), ImmutableList.of("1", "3", "4", "6", "9"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -141,124 +141,124 @@ public class ExpressionFilterTest extends BaseFilterTest
|
|||
// Expressions currently treat multi-valued arrays as nulls.
|
||||
// This test is just documenting the current behavior, not necessarily saying it makes sense.
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(EDF("dim4 == ''"), ImmutableList.of("0", "1", "2", "4", "5", "6", "7", "8"));
|
||||
assertFilterMatches(edf("dim4 == ''"), ImmutableList.of("0", "1", "2", "4", "5", "6", "7", "8"));
|
||||
} else {
|
||||
assertFilterMatches(EDF("dim4 == ''"), ImmutableList.of("2"));
|
||||
assertFilterMatches(edf("dim4 == ''"), ImmutableList.of("2"));
|
||||
// AS per SQL standard null == null returns false.
|
||||
assertFilterMatches(EDF("dim4 == null"), ImmutableList.of());
|
||||
assertFilterMatches(edf("dim4 == null"), ImmutableList.of());
|
||||
}
|
||||
assertFilterMatches(EDF("dim4 == '1'"), ImmutableList.of());
|
||||
assertFilterMatches(EDF("dim4 == '3'"), ImmutableList.of("3"));
|
||||
assertFilterMatches(edf("dim4 == '1'"), ImmutableList.of());
|
||||
assertFilterMatches(edf("dim4 == '3'"), ImmutableList.of("3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneLongColumn()
|
||||
{
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(EDF("dim1 == ''"), ImmutableList.of("0"));
|
||||
assertFilterMatches(edf("dim1 == ''"), ImmutableList.of("0"));
|
||||
} else {
|
||||
// A long does not match empty string
|
||||
assertFilterMatches(EDF("dim1 == ''"), ImmutableList.of());
|
||||
assertFilterMatches(edf("dim1 == ''"), ImmutableList.of());
|
||||
}
|
||||
assertFilterMatches(EDF("dim1 == '1'"), ImmutableList.of("1"));
|
||||
assertFilterMatches(EDF("dim1 == 2"), ImmutableList.of("2"));
|
||||
assertFilterMatches(EDF("dim1 < '2'"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(EDF("dim1 < 2"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(EDF("dim1 < 2.0"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(EDF("like(dim1, '1%')"), ImmutableList.of("1"));
|
||||
assertFilterMatches(edf("dim1 == '1'"), ImmutableList.of("1"));
|
||||
assertFilterMatches(edf("dim1 == 2"), ImmutableList.of("2"));
|
||||
assertFilterMatches(edf("dim1 < '2'"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(edf("dim1 < 2"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(edf("dim1 < 2.0"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(edf("like(dim1, '1%')"), ImmutableList.of("1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneFloatColumn()
|
||||
{
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(EDF("dim2 == ''"), ImmutableList.of("0"));
|
||||
assertFilterMatches(edf("dim2 == ''"), ImmutableList.of("0"));
|
||||
} else {
|
||||
// A float does not match empty string
|
||||
assertFilterMatches(EDF("dim2 == ''"), ImmutableList.of());
|
||||
assertFilterMatches(edf("dim2 == ''"), ImmutableList.of());
|
||||
}
|
||||
assertFilterMatches(EDF("dim2 == '1'"), ImmutableList.of("1"));
|
||||
assertFilterMatches(EDF("dim2 == 2"), ImmutableList.of("2"));
|
||||
assertFilterMatches(EDF("dim2 < '2'"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(EDF("dim2 < 2"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(EDF("dim2 < 2.0"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(EDF("like(dim2, '1%')"), ImmutableList.of("1"));
|
||||
assertFilterMatches(edf("dim2 == '1'"), ImmutableList.of("1"));
|
||||
assertFilterMatches(edf("dim2 == 2"), ImmutableList.of("2"));
|
||||
assertFilterMatches(edf("dim2 < '2'"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(edf("dim2 < 2"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(edf("dim2 < 2.0"), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(edf("like(dim2, '1%')"), ImmutableList.of("1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstantExpression()
|
||||
{
|
||||
assertFilterMatches(EDF("1 + 1"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(EDF("0 + 0"), ImmutableList.of());
|
||||
assertFilterMatches(edf("1 + 1"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(edf("0 + 0"), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompareColumns()
|
||||
{
|
||||
// String vs string
|
||||
assertFilterMatches(EDF("dim0 == dim3"), ImmutableList.of("2", "5", "8"));
|
||||
assertFilterMatches(edf("dim0 == dim3"), ImmutableList.of("2", "5", "8"));
|
||||
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
// String vs long
|
||||
assertFilterMatches(EDF("dim1 == dim3"), ImmutableList.of("0", "2", "5", "8"));
|
||||
assertFilterMatches(edf("dim1 == dim3"), ImmutableList.of("0", "2", "5", "8"));
|
||||
|
||||
// String vs float
|
||||
assertFilterMatches(EDF("dim2 == dim3"), ImmutableList.of("0", "2", "5", "8"));
|
||||
assertFilterMatches(edf("dim2 == dim3"), ImmutableList.of("0", "2", "5", "8"));
|
||||
} else {
|
||||
// String vs long
|
||||
assertFilterMatches(EDF("dim1 == dim3"), ImmutableList.of("2", "5", "8"));
|
||||
assertFilterMatches(edf("dim1 == dim3"), ImmutableList.of("2", "5", "8"));
|
||||
|
||||
// String vs float
|
||||
assertFilterMatches(EDF("dim2 == dim3"), ImmutableList.of("2", "5", "8"));
|
||||
assertFilterMatches(edf("dim2 == dim3"), ImmutableList.of("2", "5", "8"));
|
||||
}
|
||||
|
||||
// String vs. multi-value string
|
||||
// Expressions currently treat multi-valued arrays as nulls.
|
||||
// This test is just documenting the current behavior, not necessarily saying it makes sense.
|
||||
assertFilterMatches(EDF("dim0 == dim4"), ImmutableList.of("3"));
|
||||
assertFilterMatches(edf("dim0 == dim4"), ImmutableList.of("3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingColumn()
|
||||
{
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(EDF("missing == ''"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(edf("missing == ''"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
} else {
|
||||
// AS per SQL standard null == null returns false.
|
||||
assertFilterMatches(EDF("missing == null"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing == null"), ImmutableList.of());
|
||||
}
|
||||
assertFilterMatches(EDF("missing == '1'"), ImmutableList.of());
|
||||
assertFilterMatches(EDF("missing == 2"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing == '1'"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing == 2"), ImmutableList.of());
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
// missing equivaluent to 0
|
||||
assertFilterMatches(EDF("missing < '2'"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(EDF("missing < 2"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(EDF("missing < 2.0"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(edf("missing < '2'"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(edf("missing < 2"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
assertFilterMatches(edf("missing < 2.0"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"));
|
||||
} else {
|
||||
// missing equivalent to null
|
||||
assertFilterMatches(EDF("missing < '2'"), ImmutableList.of());
|
||||
assertFilterMatches(EDF("missing < 2"), ImmutableList.of());
|
||||
assertFilterMatches(EDF("missing < 2.0"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing < '2'"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing < 2"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing < 2.0"), ImmutableList.of());
|
||||
}
|
||||
assertFilterMatches(EDF("missing > '2'"), ImmutableList.of());
|
||||
assertFilterMatches(EDF("missing > 2"), ImmutableList.of());
|
||||
assertFilterMatches(EDF("missing > 2.0"), ImmutableList.of());
|
||||
assertFilterMatches(EDF("like(missing, '1%')"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing > '2'"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing > 2"), ImmutableList.of());
|
||||
assertFilterMatches(edf("missing > 2.0"), ImmutableList.of());
|
||||
assertFilterMatches(edf("like(missing, '1%')"), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRequiredColumn()
|
||||
{
|
||||
Assert.assertEquals(EDF("like(dim1, '1%')").getRequiredColumns(), Sets.newHashSet("dim1"));
|
||||
Assert.assertEquals(EDF("dim2 == '1'").getRequiredColumns(), Sets.newHashSet("dim2"));
|
||||
Assert.assertEquals(EDF("dim3 < '2'").getRequiredColumns(), Sets.newHashSet("dim3"));
|
||||
Assert.assertEquals(EDF("dim4 == ''").getRequiredColumns(), Sets.newHashSet("dim4"));
|
||||
Assert.assertEquals(EDF("1 + 1").getRequiredColumns(), new HashSet<>());
|
||||
Assert.assertEquals(EDF("dim0 == dim3").getRequiredColumns(), Sets.newHashSet("dim0", "dim3"));
|
||||
Assert.assertEquals(EDF("missing == ''").getRequiredColumns(), Sets.newHashSet("missing"));
|
||||
Assert.assertEquals(edf("like(dim1, '1%')").getRequiredColumns(), Sets.newHashSet("dim1"));
|
||||
Assert.assertEquals(edf("dim2 == '1'").getRequiredColumns(), Sets.newHashSet("dim2"));
|
||||
Assert.assertEquals(edf("dim3 < '2'").getRequiredColumns(), Sets.newHashSet("dim3"));
|
||||
Assert.assertEquals(edf("dim4 == ''").getRequiredColumns(), Sets.newHashSet("dim4"));
|
||||
Assert.assertEquals(edf("1 + 1").getRequiredColumns(), new HashSet<>());
|
||||
Assert.assertEquals(edf("dim0 == dim3").getRequiredColumns(), Sets.newHashSet("dim0", "dim3"));
|
||||
Assert.assertEquals(edf("missing == ''").getRequiredColumns(), Sets.newHashSet("missing"));
|
||||
}
|
||||
|
||||
private static ExpressionDimFilter EDF(final String expression)
|
||||
private static ExpressionDimFilter edf(final String expression)
|
||||
{
|
||||
return new ExpressionDimFilter(expression, TestExprMacroTable.INSTANCE);
|
||||
}
|
||||
|
|
|
@ -100,8 +100,8 @@ public class AppenderatorPlumberTest
|
|||
// getDataSource
|
||||
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
|
||||
|
||||
InputRow[] rows = new InputRow[] {AppenderatorTest.IR("2000", "foo", 1),
|
||||
AppenderatorTest.IR("2000", "bar", 2), AppenderatorTest.IR("2000", "qux", 4)};
|
||||
InputRow[] rows = new InputRow[] {AppenderatorTest.ir("2000", "foo", 1),
|
||||
AppenderatorTest.ir("2000", "bar", 2), AppenderatorTest.ir("2000", "qux", 4)};
|
||||
// add
|
||||
Assert.assertEquals(1, plumber.add(rows[0], null).getRowCount());
|
||||
|
||||
|
|
|
@ -58,9 +58,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
public class AppenderatorTest
|
||||
{
|
||||
private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
|
||||
SI("2000/2001", "A", 0),
|
||||
SI("2000/2001", "A", 1),
|
||||
SI("2001/2002", "A", 0)
|
||||
si("2000/2001", "A", 0),
|
||||
si("2000/2001", "A", 1),
|
||||
si("2001/2002", "A", 0)
|
||||
);
|
||||
|
||||
@Test
|
||||
|
@ -83,21 +83,21 @@ public class AppenderatorTest
|
|||
commitMetadata.put("x", "1");
|
||||
Assert.assertEquals(
|
||||
1,
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier)
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier)
|
||||
.getNumRowsInSegment()
|
||||
);
|
||||
|
||||
commitMetadata.put("x", "2");
|
||||
Assert.assertEquals(
|
||||
2,
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bar", 2), committerSupplier)
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar", 2), committerSupplier)
|
||||
.getNumRowsInSegment()
|
||||
);
|
||||
|
||||
commitMetadata.put("x", "3");
|
||||
Assert.assertEquals(
|
||||
1,
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 4), committerSupplier)
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 4), committerSupplier)
|
||||
.getNumRowsInSegment()
|
||||
);
|
||||
|
||||
|
@ -173,14 +173,14 @@ public class AppenderatorTest
|
|||
};
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138 + 1 byte when null handling is enabled
|
||||
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
|
||||
Assert.assertEquals(
|
||||
138 + nullHandlingOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
Assert.assertEquals(
|
||||
138 + nullHandlingOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
|
||||
|
@ -216,11 +216,11 @@ public class AppenderatorTest
|
|||
};
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138
|
||||
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
|
||||
Assert.assertEquals(138 + nullHandlingOverhead, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
Assert.assertEquals(
|
||||
276 + 2 * nullHandlingOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
|
@ -258,7 +258,7 @@ public class AppenderatorTest
|
|||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.startJob();
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
//we still calculate the size even when ignoring it to make persist decision
|
||||
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
|
||||
Assert.assertEquals(
|
||||
|
@ -266,7 +266,7 @@ public class AppenderatorTest
|
|||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
Assert.assertEquals(
|
||||
276 + 2 * nullHandlingOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
|
@ -310,17 +310,17 @@ public class AppenderatorTest
|
|||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.startJob();
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "baz", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier);
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier);
|
||||
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
|
||||
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.persistAll(committerSupplier.get());
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
|
@ -356,17 +356,17 @@ public class AppenderatorTest
|
|||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.startJob();
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier, false);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier, false);
|
||||
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier, false);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
|
||||
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier, false);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
|
||||
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "baz", 1), committerSupplier, false);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier, false);
|
||||
Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 1), committerSupplier, false);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier, false);
|
||||
Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier, false);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier, false);
|
||||
Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.persistAll(committerSupplier.get());
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
|
@ -409,15 +409,15 @@ public class AppenderatorTest
|
|||
|
||||
appenderator.startJob();
|
||||
eventCount.incrementAndGet();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
eventCount.incrementAndGet();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bar", 2), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar", 2), committerSupplier);
|
||||
eventCount.incrementAndGet();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "baz", 3), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 3), committerSupplier);
|
||||
eventCount.incrementAndGet();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "qux", 4), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "qux", 4), committerSupplier);
|
||||
eventCount.incrementAndGet();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 5), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), committerSupplier);
|
||||
appenderator.close();
|
||||
|
||||
try (final AppenderatorTester tester2 = new AppenderatorTester(
|
||||
|
@ -445,9 +445,9 @@ public class AppenderatorTest
|
|||
Assert.assertEquals(0, appenderator.getTotalRowCount());
|
||||
appenderator.startJob();
|
||||
Assert.assertEquals(0, appenderator.getTotalRowCount());
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
Assert.assertEquals(1, appenderator.getTotalRowCount());
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
Assert.assertEquals(2, appenderator.getTotalRowCount());
|
||||
|
||||
appenderator.persistAll(committerSupplier.get()).get();
|
||||
|
@ -457,13 +457,13 @@ public class AppenderatorTest
|
|||
appenderator.drop(IDENTIFIERS.get(1)).get();
|
||||
Assert.assertEquals(0, appenderator.getTotalRowCount());
|
||||
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001", "bar", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001", "bar", 1), committerSupplier);
|
||||
Assert.assertEquals(1, appenderator.getTotalRowCount());
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001", "baz", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001", "baz", 1), committerSupplier);
|
||||
Assert.assertEquals(2, appenderator.getTotalRowCount());
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001", "qux", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001", "qux", 1), committerSupplier);
|
||||
Assert.assertEquals(3, appenderator.getTotalRowCount());
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001", "bob", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001", "bob", 1), committerSupplier);
|
||||
Assert.assertEquals(4, appenderator.getTotalRowCount());
|
||||
|
||||
appenderator.persistAll(committerSupplier.get()).get();
|
||||
|
@ -483,13 +483,13 @@ public class AppenderatorTest
|
|||
final Appenderator appenderator = tester.getAppenderator();
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
|
||||
|
||||
// Query1: 2000/2001
|
||||
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
|
||||
|
@ -619,13 +619,13 @@ public class AppenderatorTest
|
|||
final Appenderator appenderator = tester.getAppenderator();
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(1), IR("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), IR("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
|
||||
|
||||
// Query1: segment #2
|
||||
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
|
||||
|
@ -742,7 +742,7 @@ public class AppenderatorTest
|
|||
}
|
||||
}
|
||||
|
||||
private static SegmentIdWithShardSpec SI(String interval, String version, int partitionNum)
|
||||
private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum)
|
||||
{
|
||||
return new SegmentIdWithShardSpec(
|
||||
AppenderatorTester.DATASOURCE,
|
||||
|
@ -752,7 +752,7 @@ public class AppenderatorTest
|
|||
);
|
||||
}
|
||||
|
||||
static InputRow IR(String ts, String dim, long met)
|
||||
static InputRow ir(String ts, String dim, long met)
|
||||
{
|
||||
return new MapBasedInputRow(
|
||||
DateTimes.of(ts).getMillis(),
|
||||
|
|
|
@ -167,9 +167,9 @@ public class DefaultOfflineAppenderatorFactoryTest
|
|||
new LinearShardSpec(0)
|
||||
);
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(identifier, AppenderatorTest.IR("2000", "bar", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(identifier, AppenderatorTest.ir("2000", "bar", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(identifier, AppenderatorTest.IR("2000", "baz", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
appenderator.add(identifier, AppenderatorTest.ir("2000", "baz", 1), Suppliers.ofInstance(Committers.nil()));
|
||||
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.close();
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
|
|
|
@ -362,7 +362,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = client.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ROW(Pair.of("TABLE_CAT", "druid"))
|
||||
row(Pair.of("TABLE_CAT", "druid"))
|
||||
),
|
||||
getRows(metaData.getCatalogs())
|
||||
);
|
||||
|
@ -374,7 +374,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = client.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ROW(Pair.of("TABLE_CATALOG", "druid"), Pair.of("TABLE_SCHEM", "druid"))
|
||||
row(Pair.of("TABLE_CATALOG", "druid"), Pair.of("TABLE_SCHEM", "druid"))
|
||||
),
|
||||
getRows(metaData.getSchemas(null, "druid"))
|
||||
);
|
||||
|
@ -386,19 +386,19 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = client.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE2),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE3),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
|
@ -418,25 +418,25 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = superuserClient.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE2),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE3),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
|
@ -456,7 +456,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = client.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "__time"),
|
||||
|
@ -464,7 +464,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "TIMESTAMP"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "cnt"),
|
||||
|
@ -472,7 +472,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "BIGINT"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "dim1"),
|
||||
|
@ -480,7 +480,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "VARCHAR"),
|
||||
Pair.of("IS_NULLABLE", "YES")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "dim2"),
|
||||
|
@ -488,7 +488,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "VARCHAR"),
|
||||
Pair.of("IS_NULLABLE", "YES")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "dim3"),
|
||||
|
@ -496,7 +496,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "VARCHAR"),
|
||||
Pair.of("IS_NULLABLE", "YES")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "m1"),
|
||||
|
@ -504,7 +504,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "FLOAT"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "m2"),
|
||||
|
@ -512,7 +512,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "DOUBLE"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("COLUMN_NAME", "unique_dim1"),
|
||||
|
@ -547,7 +547,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = superuserClient.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("COLUMN_NAME", "__time"),
|
||||
|
@ -555,7 +555,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "TIMESTAMP"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("COLUMN_NAME", "cnt"),
|
||||
|
@ -563,7 +563,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "BIGINT"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("COLUMN_NAME", "dim1"),
|
||||
|
@ -571,7 +571,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "VARCHAR"),
|
||||
Pair.of("IS_NULLABLE", "YES")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("COLUMN_NAME", "dim2"),
|
||||
|
@ -579,7 +579,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "VARCHAR"),
|
||||
Pair.of("IS_NULLABLE", "YES")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("COLUMN_NAME", "m1"),
|
||||
|
@ -587,7 +587,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "FLOAT"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("COLUMN_NAME", "m2"),
|
||||
|
@ -595,7 +595,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
Pair.of("TYPE_NAME", "DOUBLE"),
|
||||
Pair.of("IS_NULLABLE", "NO")
|
||||
),
|
||||
ROW(
|
||||
row(
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
|
||||
Pair.of("COLUMN_NAME", "unique_dim1"),
|
||||
|
@ -928,7 +928,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
}
|
||||
}
|
||||
|
||||
private static Map<String, Object> ROW(final Pair<String, ?>... entries)
|
||||
private static Map<String, Object> row(final Pair<String, ?>... entries)
|
||||
{
|
||||
final Map<String, Object> m = new HashMap<>();
|
||||
for (Pair<String, ?> entry : entries) {
|
||||
|
|
|
@ -243,70 +243,70 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
}
|
||||
|
||||
// Generate timestamps for expected results
|
||||
public static long T(final String timeString)
|
||||
public static long t(final String timeString)
|
||||
{
|
||||
return Calcites.jodaToCalciteTimestamp(DateTimes.of(timeString), DateTimeZone.UTC);
|
||||
}
|
||||
|
||||
// Generate timestamps for expected results
|
||||
public static long T(final String timeString, final String timeZoneString)
|
||||
public static long t(final String timeString, final String timeZoneString)
|
||||
{
|
||||
final DateTimeZone timeZone = DateTimes.inferTzFromString(timeZoneString);
|
||||
return Calcites.jodaToCalciteTimestamp(new DateTime(timeString, timeZone), timeZone);
|
||||
}
|
||||
|
||||
// Generate day numbers for expected results
|
||||
public static int D(final String dayString)
|
||||
public static int d(final String dayString)
|
||||
{
|
||||
return (int) (Intervals.utc(T("1970"), T(dayString)).toDurationMillis() / (86400L * 1000L));
|
||||
return (int) (Intervals.utc(t("1970"), t(dayString)).toDurationMillis() / (86400L * 1000L));
|
||||
}
|
||||
|
||||
public static QuerySegmentSpec QSS(final Interval... intervals)
|
||||
public static QuerySegmentSpec querySegmentSpec(final Interval... intervals)
|
||||
{
|
||||
return new MultipleIntervalSegmentSpec(Arrays.asList(intervals));
|
||||
}
|
||||
|
||||
public static AndDimFilter AND(DimFilter... filters)
|
||||
public static AndDimFilter and(DimFilter... filters)
|
||||
{
|
||||
return new AndDimFilter(Arrays.asList(filters));
|
||||
}
|
||||
|
||||
public static OrDimFilter OR(DimFilter... filters)
|
||||
public static OrDimFilter or(DimFilter... filters)
|
||||
{
|
||||
return new OrDimFilter(Arrays.asList(filters));
|
||||
}
|
||||
|
||||
public static NotDimFilter NOT(DimFilter filter)
|
||||
public static NotDimFilter not(DimFilter filter)
|
||||
{
|
||||
return new NotDimFilter(filter);
|
||||
}
|
||||
|
||||
public static InDimFilter IN(String dimension, List<String> values, ExtractionFn extractionFn)
|
||||
public static InDimFilter in(String dimension, List<String> values, ExtractionFn extractionFn)
|
||||
{
|
||||
return new InDimFilter(dimension, values, extractionFn);
|
||||
}
|
||||
|
||||
public static SelectorDimFilter SELECTOR(final String fieldName, final String value, final ExtractionFn extractionFn)
|
||||
public static SelectorDimFilter selector(final String fieldName, final String value, final ExtractionFn extractionFn)
|
||||
{
|
||||
return new SelectorDimFilter(fieldName, value, extractionFn);
|
||||
}
|
||||
|
||||
public static ExpressionDimFilter EXPRESSION_FILTER(final String expression)
|
||||
public static ExpressionDimFilter expressionFilter(final String expression)
|
||||
{
|
||||
return new ExpressionDimFilter(expression, CalciteTests.createExprMacroTable());
|
||||
}
|
||||
|
||||
public static DimFilter NUMERIC_SELECTOR(
|
||||
public static DimFilter numeric_Selector(
|
||||
final String fieldName,
|
||||
final String value,
|
||||
final ExtractionFn extractionFn
|
||||
)
|
||||
{
|
||||
// We use Bound filters for numeric equality to achieve "10.0" = "10"
|
||||
return BOUND(fieldName, value, value, false, false, extractionFn, StringComparators.NUMERIC);
|
||||
return bound(fieldName, value, value, false, false, extractionFn, StringComparators.NUMERIC);
|
||||
}
|
||||
|
||||
public static BoundDimFilter BOUND(
|
||||
public static BoundDimFilter bound(
|
||||
final String fieldName,
|
||||
final String lower,
|
||||
final String upper,
|
||||
|
@ -319,7 +319,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
return new BoundDimFilter(fieldName, lower, upper, lowerStrict, upperStrict, null, extractionFn, comparator);
|
||||
}
|
||||
|
||||
public static BoundDimFilter TIME_BOUND(final Object intervalObj)
|
||||
public static BoundDimFilter timeBound(final Object intervalObj)
|
||||
{
|
||||
final Interval interval = new Interval(intervalObj, ISOChronology.getInstanceUTC());
|
||||
return new BoundDimFilter(
|
||||
|
@ -334,27 +334,27 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
);
|
||||
}
|
||||
|
||||
public static CascadeExtractionFn CASCADE(final ExtractionFn... fns)
|
||||
public static CascadeExtractionFn cascade(final ExtractionFn... fns)
|
||||
{
|
||||
return new CascadeExtractionFn(fns);
|
||||
}
|
||||
|
||||
public static List<DimensionSpec> DIMS(final DimensionSpec... dimensionSpecs)
|
||||
public static List<DimensionSpec> dimensionSpec(final DimensionSpec... dimensionSpecs)
|
||||
{
|
||||
return Arrays.asList(dimensionSpecs);
|
||||
}
|
||||
|
||||
public static List<AggregatorFactory> AGGS(final AggregatorFactory... aggregators)
|
||||
public static List<AggregatorFactory> aggregators(final AggregatorFactory... aggregators)
|
||||
{
|
||||
return Arrays.asList(aggregators);
|
||||
}
|
||||
|
||||
public static DimFilterHavingSpec HAVING(final DimFilter filter)
|
||||
public static DimFilterHavingSpec having(final DimFilter filter)
|
||||
{
|
||||
return new DimFilterHavingSpec(filter, true);
|
||||
}
|
||||
|
||||
public static ExpressionVirtualColumn EXPRESSION_VIRTUAL_COLUMN(
|
||||
public static ExpressionVirtualColumn expression_Virtual_Column(
|
||||
final String name,
|
||||
final String expression,
|
||||
final ValueType outputType
|
||||
|
@ -363,7 +363,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
return new ExpressionVirtualColumn(name, expression, outputType, CalciteTests.createExprMacroTable());
|
||||
}
|
||||
|
||||
public static ExpressionPostAggregator EXPRESSION_POST_AGG(final String name, final String expression)
|
||||
public static ExpressionPostAggregator expresionPostAgg(final String name, final String expression)
|
||||
{
|
||||
return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable());
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue