Fix flaky KafkaIndexTaskTest. (#12657)

* Fix flaky KafkaIndexTaskTest.

The testRunTransactionModeRollback case had many race conditions. Most notably,
it would commit a transaction and then immediately check to see that the results
were *not* indexed. This is racey because it relied on the indexing thread being
slower than the test thread.

Now, the case waits for the transaction to be processed by the indexing thread
before checking the results.

* Changes from review.
This commit is contained in:
Gian Merlino 2022-06-24 13:53:51 -07:00 committed by GitHub
parent 6ddb828c7a
commit d5abd06b96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 78 deletions

View File

@ -38,7 +38,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
{
private static final String TYPE = "index_kafka";
private final KafkaIndexTaskIOConfig ioConfig;
private final ObjectMapper configMapper;
// This value can be tuned in some tests
@ -65,7 +64,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
getFormattedGroupId(dataSchema.getDataSource(), TYPE)
);
this.configMapper = configMapper;
this.ioConfig = ioConfig;
Preconditions.checkArgument(
ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(),

View File

@ -96,6 +96,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -124,7 +125,6 @@ import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
@ -141,7 +141,6 @@ import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
@ -181,6 +180,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -252,11 +252,9 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
private Long maxTotalRows = null;
private Period intermediateHandoffPeriod = null;
private AppenderatorsManager appenderatorsManager;
private String topic;
private List<ProducerRecord<byte[], byte[]>> records;
private final Set<Integer> checkpointRequestsHash = new HashSet<>();
private RowIngestionMetersFactory rowIngestionMetersFactory;
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
{
@ -356,7 +354,6 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
topic = getTopicName();
records = generateRecords(topic);
reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + System.currentTimeMillis(), "json");
appenderatorsManager = new TestAppenderatorsManager();
makeToolboxFactory();
}
@ -1289,7 +1286,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
@ -1324,13 +1321,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
final QuerySegmentSpec interval = OBJECT_MAPPER.readValue(
"\"2008/2012\"", QuerySegmentSpec.class
);
List<ScanResultValue> scanResultValues = scanData(task, interval);
List<Map<String, Object>> scanResultValues = scanData(task, interval);
//verify that there are no records indexed in the rollbacked time period
Assert.assertEquals(3, Iterables.size(scanResultValues));
int i = 0;
for (ScanResultValue result : scanResultValues) {
final Map<String, Object> event = ((List<Map<String, Object>>) result.getEvents()).get(0);
for (Map<String, Object> event : scanResultValues) {
Assert.assertEquals((long) i++, event.get("kafka.offset"));
Assert.assertEquals(topic, event.get("kafka.topic"));
Assert.assertEquals("application/json", event.get("kafka.header.encoding"));
@ -1401,13 +1397,11 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
final QuerySegmentSpec interval = OBJECT_MAPPER.readValue(
"\"2008/2012\"", QuerySegmentSpec.class
);
List<ScanResultValue> scanResultValues = scanData(task, interval);
//verify that there are no records indexed in the rollbacked time period
List<Map<String, Object>> scanResultValues = scanData(task, interval);
Assert.assertEquals(3, Iterables.size(scanResultValues));
int i = 0;
for (ScanResultValue result : scanResultValues) {
final Map<String, Object> event = ((List<Map<String, Object>>) result.getEvents()).get(0);
for (Map<String, Object> event : scanResultValues) {
Assert.assertEquals("application/json", event.get("kafka.testheader.encoding"));
Assert.assertEquals("y", event.get("dim2"));
}
@ -2572,7 +2566,9 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L)),
// End offset is one after 12 real messages + 2 txn control messages (last seen message: offset 13).
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 14L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
@ -2594,62 +2590,65 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
kafkaProducer.commitTransaction();
}
while (countEvents(task) != 2) {
Thread.sleep(25);
}
awaitConsumedOffsets(task, ImmutableMap.of(0, 1L)); // Consume two real messages
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
//verify the 2 indexed records
final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue(
"\"2008/2010\"", QuerySegmentSpec.class
);
Iterable<ScanResultValue> scanResultValues = scanData(task, firstInterval);
final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue("\"2008/2010\"", QuerySegmentSpec.class);
Iterable<Map<String, Object>> scanResultValues = scanData(task, firstInterval);
Assert.assertEquals(2, Iterables.size(scanResultValues));
// Insert 3 more records and rollback
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(Iterables.skip(records, 2), 3)) {
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(Iterables.limit(records, 5), 2)) {
kafkaProducer.send(record).get();
}
kafkaProducer.flush();
kafkaProducer.abortTransaction();
}
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue(
"\"2010/2012\"", QuerySegmentSpec.class
);
scanResultValues = scanData(task, rollbackedInterval);
//verify that there are no records indexed in the rollbacked time period
Assert.assertEquals(0, Iterables.size(scanResultValues));
// Insert remaining data
// Insert up through first 8 items
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5)) {
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(Iterables.limit(records, 8), 5)) {
kafkaProducer.send(record).get();
}
kafkaProducer.commitTransaction();
}
final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue(
"\"2008/2049\"", QuerySegmentSpec.class
);
Iterable<ScanResultValue> scanResultValues1 = scanData(task, endInterval);
awaitConsumedOffsets(task, ImmutableMap.of(0, 9L)); // Consume 8 real messages + 2 txn controls
Assert.assertEquals(2, countEvents(task));
final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue("\"2010/2012\"", QuerySegmentSpec.class);
scanResultValues = scanData(task, rollbackedInterval);
//verify that there are no records indexed in the rollbacked time period
Assert.assertEquals(0, Iterables.size(scanResultValues));
final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue("\"2008/2049\"", QuerySegmentSpec.class);
Iterable<Map<String, Object>> scanResultValues1 = scanData(task, endInterval);
Assert.assertEquals(2, Iterables.size(scanResultValues1));
// Insert all remaining messages. One will get picked up.
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 8)) {
kafkaProducer.send(record).get();
}
kafkaProducer.commitTransaction();
}
// Wait for task to exit and publish
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
// Check metrics
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
@ -2664,7 +2663,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
publishedDescriptors()
);
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L))),
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 14L))),
newDataSchemaMetadata()
);
}
@ -2824,11 +2823,44 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(task, task1);
}
private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec spec)
/**
* Wait for a task to consume certain offsets (inclusive).
*/
private void awaitConsumedOffsets(final KafkaIndexTask task, final Map<Integer, Long> targetOffsets)
throws InterruptedException
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
NEW_DATA_SCHEMA.getDataSource()).intervals(spec).build();
return task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
while (true) {
final ConcurrentMap<Integer, Long> currentOffsets = task.getRunner().getCurrentOffsets();
// For Kafka, currentOffsets are the last read offsets plus one.
boolean allDone = true;
for (final Map.Entry<Integer, Long> entry : targetOffsets.entrySet()) {
final Long currentOffset = currentOffsets.get(entry.getKey());
if (currentOffset == null || currentOffset <= entry.getValue()) {
allDone = false;
break;
}
}
if (allDone) {
return;
} else {
Thread.sleep(5);
}
}
}
private List<Map<String, Object>> scanData(final Task task, QuerySegmentSpec spec)
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(NEW_DATA_SCHEMA.getDataSource())
.intervals(spec)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.build();
return task.getQueryRunner(query)
.run(QueryPlus.wrap(query))
.flatMap(result -> Sequences.simple((List<Map<String, Object>>) result.getEvents()))
.toList();
}
private void insertData() throws ExecutionException, InterruptedException
@ -2836,7 +2868,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
insertData(records);
}
private void insertData(Iterable<ProducerRecord<byte[], byte[]>> records) throws ExecutionException, InterruptedException
private void insertData(Iterable<ProducerRecord<byte[], byte[]>> records)
throws ExecutionException, InterruptedException
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
@ -2943,28 +2976,28 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
{
return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
(query, future) -> {
// do nothing
}
)
)
.put(
ScanQuery.class,
new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory()
),
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.build()
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
(query, future) -> {
// do nothing
}
)
)
.put(
ScanQuery.class,
new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory()
),
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.build()
);
}
@ -2972,7 +3005,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
{
directory = tempFolder.newFolder();
final TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
RowIngestionMetersFactory rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
for (Module module : new KafkaIndexTaskModule().getJacksonModules()) {
@ -3135,13 +3168,16 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
//multiple objects in one Kafka record will yield 2 rows in druid
String wellformed = toJsonString(true, "2049", "d2", "y", "10", "22.0", "2.0") +
toJsonString(true, "2049", "d3", "y", "10", "23.0", "3.0");
toJsonString(true, "2049", "d3", "y", "10", "23.0", "3.0");
//multiple objects in one Kafka record but some objects are in ill-formed format
//as a result, the whole ProducerRecord will be discarded
String illformed = "{\"timestamp\":2049, \"dim1\": \"d4\", \"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }" +
"{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
"{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
String malformed =
"{\"timestamp\":2049, \"dim1\": \"d4\", \"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }"
+
"{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }"
+
"{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
ProducerRecord<byte[], byte[]>[] producerRecords = new ProducerRecord[]{
// pretty formatted
@ -3149,7 +3185,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
//well-formed
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
//ill-formed
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(illformed)),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(malformed)),
//a well-formed record after ill-formed to demonstrate that the ill-formed can be successfully skipped
new ProducerRecord<>(topic, 0, null, jbb(true, "2049", "d7", "y", "10", "20.0", "1.0"))
};

View File

@ -348,7 +348,7 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
List<SegmentDescriptor> actualDescriptors
) throws IOException
{
Assert.assertEquals(expectedDescriptors.size(), actualDescriptors.size());
Assert.assertEquals("number of segments", expectedDescriptors.size(), actualDescriptors.size());
final Comparator<SegmentDescriptor> comparator = (s1, s2) -> {
final int intervalCompare = Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval());
if (intervalCompare == 0) {
@ -379,7 +379,9 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
if (expectedDesc.expectedDim1Values.isEmpty()) {
continue; // Treating empty expectedDim1Values as a signal that checking the dim1 column value is not needed.
}
Assertions.assertThat(readSegmentColumn("dim1", actualDesc)).isIn(expectedDesc.expectedDim1Values);
Assertions.assertThat(readSegmentColumn("dim1", actualDesc))
.describedAs("dim1 values")
.isIn(expectedDesc.expectedDim1Values);
}
}
@ -447,7 +449,7 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
new LongSumAggregatorFactory("rows", "rows")
)
).granularity(Granularities.ALL)
.intervals("0000/3000")
.intervals(Intervals.ONLY_ETERNITY)
.build();
List<Result<TimeseriesResultValue>> results = task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();