Remove kafka lookup records when a record is tombstoned (#12819)

* remove kafka lookup records from factory when record tombstoned

* update kafka lookup docs to include tombstone behaviour

* change test wait time down to 10ms

Co-authored-by: David Palmer <david.palmer@adscale.co.nz>
This commit is contained in:
Hamish Ball 2022-08-09 17:12:51 +12:00 committed by GitHub
parent a7e89de610
commit abd7a9748d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 2 deletions

View File

@ -49,6 +49,10 @@ The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in `kaf
See [lookups](../../querying/lookups.md) for how to configure and use lookups.
## Tombstones and Deleting Records
The Kafka lookup extractor treats `null` Kafka messages as tombstones. This means that a record on the input topic with a `null` message payload on Kafka will remove the associated key from the lookup map, effectively deleting it.
## Limitations
Currently the Kafka lookup extractor feeds the entire Kafka stream into a local cache. If you are using on-heap caching, this can easily clobber your java heap if the Kafka stream spews a lot of unique keys.

View File

@ -176,8 +176,15 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
for (final ConsumerRecord<String, String> record : records) {
final String key = record.key();
final String message = record.value();
if (key == null || message == null) {
LOG.error("Bad key/message from topic [%s]: [%s]", topic, record);
if (key == null) {
LOG.error("Bad key from topic [%s]: [%s]", topic, record);
continue;
}
if (message == null) {
LOG.trace("Removed key[%s] val[%s]", key, message);
doubleEventCount.incrementAndGet();
map.remove(key);
doubleEventCount.incrementAndGet();
continue;
}
doubleEventCount.incrementAndGet();

View File

@ -265,6 +265,78 @@ public class TestKafkaExtractionCluster
}
}
@Test(timeout = 60_000L)
public void testLookupWithTombstone() throws Exception
{
try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) {
checkServer();
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.of(), "foo");
long events = factory.getCompletedEventCount();
log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
long start = System.currentTimeMillis();
while (events == factory.getCompletedEventCount()) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking foo bar -------------------------------");
assertUpdated("bar", "foo");
assertReverseUpdated(Collections.singletonList("foo"), "bar");
checkServer();
events = factory.getCompletedEventCount();
log.info("----------------------- Sending foo tombstone -----------------------------");
producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("foo"), null));
while (events == factory.getCompletedEventCount()) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("----------------------- Checking foo removed -----------------------------");
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.of(), "foo");
}
}
@Test(timeout = 60_000L)
public void testLookupWithInitTombstone() throws Exception
{
try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) {
checkServer();
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.of(), "foo");
long events = factory.getCompletedEventCount();
long start = System.currentTimeMillis();
log.info("----------------------- Sending foo tombstone -----------------------------");
producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("foo"), null));
while (events == factory.getCompletedEventCount()) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("----------------------- Checking foo removed -----------------------------");
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.of(), "foo");
}
}
private void assertUpdated(
String expected,
String key