diff --git a/codestyle/checkstyle.xml b/codestyle/checkstyle.xml
index d42b7053789..04e18b5fe27 100644
--- a/codestyle/checkstyle.xml
+++ b/codestyle/checkstyle.xml
@@ -291,5 +291,10 @@ codestyle/checkstyle.xml.
"/>
+
+
+
+
+
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
index 1690ef9b389..da70643ae40 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
@@ -274,7 +274,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
null,
null
),
- BaseCalciteQueryTest.NOT(BaseCalciteQueryTest.SELECTOR("dim2", "", null))
+ BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
),
new HllSketchBuildAggregatorFactory(
"a3",
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
index 65d5717890b..a29f6493dab 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
@@ -342,7 +342,7 @@ public class DoublesSketchAggregatorTest
}
@Test
- public void QueryingDataWithFieldNameValueAsFloatInsteadOfSketch() throws Exception
+ public void queryingDataWithFieldNameValueAsFloatInsteadOfSketch() throws Exception
{
Sequence seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
@@ -418,7 +418,7 @@ public class DoublesSketchAggregatorTest
}
@Test
- public void TimeSeriesQueryInputAsFloat() throws Exception
+ public void timeSeriesQueryInputAsFloat() throws Exception
{
Sequence seq = timeSeriesHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
index 919a59618a0..939e396950a 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
@@ -278,7 +278,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
null,
null
),
- BaseCalciteQueryTest.NOT(BaseCalciteQueryTest.SELECTOR("dim2", "", null))
+ BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
),
new SketchMergeAggregatorFactory(
"a3",
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
index 0b44cf182ee..335599adf63 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
@@ -114,12 +114,12 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
- .intervals(QSS(Filtration.eternity()))
+ .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null)
)
- .aggregators(AGGS(new CountAggregatorFactory("a0")))
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
@@ -146,7 +146,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
- .intervals(QSS(Filtration.eternity()))
+ .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns()
.filters(
@@ -155,7 +155,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
createExprMacroTable()
)
)
- .aggregators(AGGS(new CountAggregatorFactory("a0")))
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
@@ -178,7 +178,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
- .intervals(QSS(Filtration.eternity()))
+ .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns()
.filters(
@@ -187,7 +187,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
createExprMacroTable()
)
)
- .aggregators(AGGS(new CountAggregatorFactory("a0")))
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
@@ -214,7 +214,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
- .intervals(QSS(Filtration.eternity()))
+ .intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new OrDimFilter(
@@ -222,7 +222,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
new BloomDimFilter("dim2", BloomKFilterHolder.fromBloomKFilter(filter2), null)
)
)
- .aggregators(AGGS(new CountAggregatorFactory("a0")))
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
index 89f5ce8b0e7..5b609132af2 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
@@ -28,10 +28,10 @@ import java.util.Map;
public class KafkaDataSourceMetadataTest
{
- private static final KafkaDataSourceMetadata KM0 = KM("foo", ImmutableMap.of());
- private static final KafkaDataSourceMetadata KM1 = KM("foo", ImmutableMap.of(0, 2L, 1, 3L));
- private static final KafkaDataSourceMetadata KM2 = KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
- private static final KafkaDataSourceMetadata KM3 = KM("foo", ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata KM0 = km("foo", ImmutableMap.of());
+ private static final KafkaDataSourceMetadata KM1 = km("foo", ImmutableMap.of(0, 2L, 1, 3L));
+ private static final KafkaDataSourceMetadata KM2 = km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
+ private static final KafkaDataSourceMetadata KM3 = km("foo", ImmutableMap.of(0, 2L, 2, 5L));
@Test
public void testMatches()
@@ -70,27 +70,27 @@ public class KafkaDataSourceMetadataTest
public void testPlus()
{
Assert.assertEquals(
- KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+ km("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
KM1.plus(KM3)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+ km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
KM0.plus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+ km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
KM1.plus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+ km("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
KM2.plus(KM1)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+ km("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
KM2.plus(KM2)
);
}
@@ -99,32 +99,32 @@ public class KafkaDataSourceMetadataTest
public void testMinus()
{
Assert.assertEquals(
- KM("foo", ImmutableMap.of(1, 3L)),
+ km("foo", ImmutableMap.of(1, 3L)),
KM1.minus(KM3)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of()),
+ km("foo", ImmutableMap.of()),
KM0.minus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of()),
+ km("foo", ImmutableMap.of()),
KM1.minus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of(2, 5L)),
+ km("foo", ImmutableMap.of(2, 5L)),
KM2.minus(KM1)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of()),
+ km("foo", ImmutableMap.of()),
KM2.minus(KM2)
);
}
- private static KafkaDataSourceMetadata KM(String topic, Map offsets)
+ private static KafkaDataSourceMetadata km(String topic, Map offsets)
{
return new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, offsets));
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 892349f755f..b19bf8ad371 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -277,21 +277,21 @@ public class KafkaIndexTaskTest
private static List> generateRecords(String topic)
{
return ImmutableList.of(
- new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(topic, 0, null, null),
- new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")),
- new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "notanumber", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "20.0", "notanumber")),
+ new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0"))
);
}
@@ -411,8 +411,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -461,8 +461,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -570,13 +570,13 @@ public class KafkaIndexTaskTest
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
- SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
- SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc5 = sd(task, "2011/P1D", 1);
+ SegmentDescriptor desc6 = sd(task, "2012/P1D", 0);
+ SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
@@ -696,6 +696,7 @@ public class KafkaIndexTaskTest
}
final Map nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+
Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets));
task.getRunner().setEndOffsets(nextOffsets, false);
@@ -727,15 +728,20 @@ public class KafkaIndexTaskTest
Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
-
+
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
- SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
- SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc5 = sd(task, "2011/P1D", 1);
+ SegmentDescriptor desc6 = sd(task, "2012/P1D", 0);
+ SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
+ Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
+ Assert.assertEquals(
+ new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
+ metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
+
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
@@ -844,8 +850,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L, 1, 0L))),
@@ -865,13 +871,13 @@ public class KafkaIndexTaskTest
}
List> records = ImmutableList.of(
- new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0"))
);
final String baseSequenceName = "sequence0";
@@ -973,8 +979,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -1023,9 +1029,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -1083,7 +1089,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2009/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -1164,8 +1170,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -1212,8 +1218,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -1307,10 +1313,10 @@ public class KafkaIndexTaskTest
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2013/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2049/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2013/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2049/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L))),
@@ -1463,8 +1469,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -1528,8 +1534,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata, should all be from the first task
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -1581,8 +1587,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
@@ -1599,8 +1605,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
- SegmentDescriptor desc4 = SD(task2, "2013/P1D", 0);
+ SegmentDescriptor desc3 = sd(task2, "2011/P1D", 1);
+ SegmentDescriptor desc4 = sd(task2, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
@@ -1643,11 +1649,11 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
// desc3 will not be created in KafkaIndexTask (0.12.x) as it does not create per Kafka partition Druid segments
- SegmentDescriptor desc3 = SD(task, "2011/P1D", 1);
- SegmentDescriptor desc4 = SD(task, "2012/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2011/P1D", 1);
+ SegmentDescriptor desc4 = sd(task, "2012/P1D", 0);
Assert.assertEquals(isIncrementalHandoffSupported
? ImmutableSet.of(desc1, desc2, desc4)
: ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
@@ -1722,9 +1728,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
- SegmentDescriptor desc3 = SD(task2, "2012/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
+ SegmentDescriptor desc3 = sd(task2, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L, 1, 1L))),
@@ -1820,8 +1826,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))),
@@ -1909,8 +1915,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))),
@@ -2037,8 +2043,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
@@ -2178,10 +2184,10 @@ public class KafkaIndexTaskTest
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2013/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2049/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2013/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2049/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L))),
@@ -2669,7 +2675,7 @@ public class KafkaIndexTaskTest
return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows"));
}
- private static byte[] JB(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
+ private static byte[] jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
{
try {
return new ObjectMapper().writeValueAsBytes(
@@ -2688,7 +2694,7 @@ public class KafkaIndexTaskTest
}
}
- private SegmentDescriptor SD(final Task task, final String intervalString, final int partitionNum)
+ private SegmentDescriptor sd(final Task task, final String intervalString, final int partitionNum)
{
final Interval interval = Intervals.of(intervalString);
return new SegmentDescriptor(interval, getLock(task, interval).getVersion(), partitionNum);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index f944bf04610..a5e75c917c6 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -66,25 +66,25 @@ public class KafkaRecordSupplierTest
private static List> generateRecords(String topic)
{
return ImmutableList.of(
- new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(topic, 0, null, null),
- new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
- new ProducerRecord<>(topic, 1, null, JB("2049", "f", "y", "10", "notanumber", "1.0")),
- new ProducerRecord<>(topic, 1, null, JB("2049", "f", "y", "10", "20.0", "notanumber")),
- new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")),
+ new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")),
+ new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0"))
);
}
- private static byte[] JB(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
+ private static byte[] jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
{
try {
return new ObjectMapper().writeValueAsBytes(
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
index f1e3b0fca65..2c5bce1744b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
@@ -29,10 +29,10 @@ import java.util.Map;
public class KinesisDataSourceMetadataTest
{
- private static final KinesisDataSourceMetadata KM0 = KM("foo", ImmutableMap.of());
- private static final KinesisDataSourceMetadata KM1 = KM("foo", ImmutableMap.of("0", "2L", "1", "3L"));
- private static final KinesisDataSourceMetadata KM2 = KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L"));
- private static final KinesisDataSourceMetadata KM3 = KM("foo", ImmutableMap.of("0", "2L", "2", "5L"));
+ private static final KinesisDataSourceMetadata KM0 = km("foo", ImmutableMap.of());
+ private static final KinesisDataSourceMetadata KM1 = km("foo", ImmutableMap.of("0", "2L", "1", "3L"));
+ private static final KinesisDataSourceMetadata KM2 = km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L"));
+ private static final KinesisDataSourceMetadata KM3 = km("foo", ImmutableMap.of("0", "2L", "2", "5L"));
@Test
public void testMatches()
@@ -71,27 +71,27 @@ public class KinesisDataSourceMetadataTest
public void testPlus()
{
Assert.assertEquals(
- KM("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
+ km("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
KM1.plus(KM3)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
+ km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
KM0.plus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
+ km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
KM1.plus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
+ km("foo", ImmutableMap.of("0", "2L", "1", "3L", "2", "5L")),
KM2.plus(KM1)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
+ km("foo", ImmutableMap.of("0", "2L", "1", "4L", "2", "5L")),
KM2.plus(KM2)
);
}
@@ -100,32 +100,32 @@ public class KinesisDataSourceMetadataTest
public void testMinus()
{
Assert.assertEquals(
- KM("foo", ImmutableMap.of("1", "3L")),
+ km("foo", ImmutableMap.of("1", "3L")),
KM1.minus(KM3)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of()),
+ km("foo", ImmutableMap.of()),
KM0.minus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of()),
+ km("foo", ImmutableMap.of()),
KM1.minus(KM2)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of("2", "5L")),
+ km("foo", ImmutableMap.of("2", "5L")),
KM2.minus(KM1)
);
Assert.assertEquals(
- KM("foo", ImmutableMap.of()),
+ km("foo", ImmutableMap.of()),
KM2.minus(KM2)
);
}
- private static KinesisDataSourceMetadata KM(String stream, Map sequences)
+ private static KinesisDataSourceMetadata km(String stream, Map sequences)
{
return new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, sequences));
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 52941166c00..78ce481980b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -198,26 +198,26 @@ public class KinesisIndexTaskTest extends EasyMockSupport
private static String shardId0 = "0";
private static KinesisRecordSupplier recordSupplier;
private static List> records = ImmutableList.of(
- new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", "y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", "y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", "y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", "y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(
stream,
"1",
"5",
- JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+ jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
),
new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))),
new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))),
- new OrderedPartitionableRecord<>(stream, "1", "9", JB("2013", "f", "y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "10", JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "11", JB("2049", "f", "y", "10", "notanumber", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "12", JB("2049", "f", "y", "10", "20.0", "notanumber")),
- new OrderedPartitionableRecord<>(stream, "0", "0", JB("2012", "g", "y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "0", "1", JB("2011", "h", "y", "10", "20.0", "1.0"))
+ new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")),
+ new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0"))
);
private static ServiceEmitter emitter;
@@ -404,8 +404,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(
@@ -484,8 +484,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2012/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -609,13 +609,13 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
- SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
- SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc5 = sd(task, "2011/P1D", 1);
+ SegmentDescriptor desc6 = sd(task, "2012/P1D", 0);
+ SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
@@ -770,12 +770,12 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc5 = SD(task, "2049/P1D", 0);
- SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc5 = sd(task, "2049/P1D", 0);
+ SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc7), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
@@ -857,8 +857,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -940,9 +940,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2010/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -1033,7 +1033,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2009/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -1171,8 +1171,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
@@ -1248,8 +1248,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
@@ -1395,10 +1395,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getUnparseable());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2013/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2049/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc3 = sd(task, "2013/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2049/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -1620,8 +1620,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -1731,8 +1731,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata, should all be from the first task
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -1829,8 +1829,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
@@ -1849,8 +1849,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
- SegmentDescriptor desc4 = SD(task2, "2013/P1D", 0);
+ SegmentDescriptor desc3 = sd(task2, "2011/P1D", 1);
+ SegmentDescriptor desc4 = sd(task2, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
@@ -1928,9 +1928,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
- SegmentDescriptor desc4 = SD(task, "2012/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
+ SegmentDescriptor desc4 = sd(task, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc4), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
@@ -2043,10 +2043,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
- SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
- SegmentDescriptor desc4 = SD(task2, "2012/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
+ SegmentDescriptor desc3 = sd(task2, "2011/P1D", 1);
+ SegmentDescriptor desc4 = sd(task2, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(
@@ -2196,8 +2196,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
- SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task1, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
@@ -2317,8 +2317,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(
@@ -2407,8 +2407,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
- SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
+ SegmentDescriptor desc1 = sd(task, "2010/P1D", 0);
+ SegmentDescriptor desc2 = sd(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
@@ -2877,7 +2877,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows"));
}
- private static List JB(
+ private static List jb(
String timestamp,
String dim1,
String dim2,
@@ -2903,7 +2903,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
}
}
- private SegmentDescriptor SD(final Task task, final String intervalString, final int partitionNum)
+ private SegmentDescriptor sd(final Task task, final String intervalString, final int partitionNum)
{
final Interval interval = Intervals.of(intervalString);
return new SegmentDescriptor(interval, getLock(task, interval).getVersion(), partitionNum);
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 166678cb7eb..dd999208077 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -76,20 +76,20 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
private static Shard shard1;
private static KinesisRecordSupplier recordSupplier;
private static List shard1Records = ImmutableList.of(
- new Record().withData(JB("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
- new Record().withData(JB("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"),
- new Record().withData(JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")).withSequenceNumber("2"),
+ new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
+ new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"),
+ new Record().withData(jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")).withSequenceNumber("2"),
new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("unparseable"))).withSequenceNumber("3"),
new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("unparseable2"))).withSequenceNumber("4"),
new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("{}"))).withSequenceNumber("5"),
- new Record().withData(JB("2013", "f", "y", "10", "20.0", "1.0")).withSequenceNumber("6"),
- new Record().withData(JB("2049", "f", "y", "notanumber", "20.0", "1.0")).withSequenceNumber("7"),
- new Record().withData(JB("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"),
- new Record().withData(JB("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9")
+ new Record().withData(jb("2013", "f", "y", "10", "20.0", "1.0")).withSequenceNumber("6"),
+ new Record().withData(jb("2049", "f", "y", "notanumber", "20.0", "1.0")).withSequenceNumber("7"),
+ new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"),
+ new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9")
);
private static List shard0Records = ImmutableList.of(
- new Record().withData(JB("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
- new Record().withData(JB("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1")
+ new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
+ new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1")
);
private static List