diff --git a/docs/development/extensions-core/kafka-extraction-namespace.md b/docs/development/extensions-core/kafka-extraction-namespace.md index c5f7020be6f..2c8a9c293b6 100644 --- a/docs/development/extensions-core/kafka-extraction-namespace.md +++ b/docs/development/extensions-core/kafka-extraction-namespace.md @@ -49,6 +49,10 @@ The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in `kaf See [lookups](../../querying/lookups.md) for how to configure and use lookups. +## Tombstones and Deleting Records + +The Kafka lookup extractor treats `null` Kafka messages as tombstones. This means that a record on the input topic with a `null` message payload on Kafka will remove the associated key from the lookup map, effectively deleting it. + ## Limitations Currently the Kafka lookup extractor feeds the entire Kafka stream into a local cache. If you are using on-heap caching, this can easily clobber your java heap if the Kafka stream spews a lot of unique keys. diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java index f3d2c1eb1cd..e0bd3e082c6 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -176,8 +176,15 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory for (final ConsumerRecord record : records) { final String key = record.key(); final String message = record.value(); - if (key == null || message == null) { - LOG.error("Bad key/message from topic [%s]: [%s]", topic, record); + if (key == null) { + LOG.error("Bad key from topic [%s]: [%s]", topic, record); + continue; + } + if (message == null) { + LOG.trace("Removed key[%s] val[%s]", key, message); + doubleEventCount.incrementAndGet(); + map.remove(key); + doubleEventCount.incrementAndGet(); continue; } doubleEventCount.incrementAndGet(); diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java index 1b744195203..4903cbd3b2e 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java @@ -265,6 +265,78 @@ public class TestKafkaExtractionCluster } } + @Test(timeout = 60_000L) + public void testLookupWithTombstone() throws Exception + { + try (final Producer producer = new KafkaProducer(makeProducerProperties())) { + checkServer(); + + assertUpdated(null, "foo"); + assertReverseUpdated(ImmutableList.of(), "foo"); + + long events = factory.getCompletedEventCount(); + + log.info("------------------------- Sending foo bar -------------------------------"); + producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar"))); + + long start = System.currentTimeMillis(); + while (events == factory.getCompletedEventCount()) { + Thread.sleep(10); + if (System.currentTimeMillis() > start + 60_000) { + throw new ISE("Took too long to update event"); + } + } + + log.info("------------------------- Checking foo bar -------------------------------"); + assertUpdated("bar", "foo"); + assertReverseUpdated(Collections.singletonList("foo"), "bar"); + + checkServer(); + events = factory.getCompletedEventCount(); + + log.info("----------------------- Sending foo tombstone -----------------------------"); + producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("foo"), null)); + while (events == factory.getCompletedEventCount()) { + Thread.sleep(10); + if (System.currentTimeMillis() > start + 60_000) { + throw new ISE("Took too long to update event"); + } + } + + log.info("----------------------- Checking foo removed -----------------------------"); + assertUpdated(null, "foo"); + assertReverseUpdated(ImmutableList.of(), "foo"); + } + } + + @Test(timeout = 60_000L) + public void testLookupWithInitTombstone() throws Exception + { + try (final Producer producer = new KafkaProducer(makeProducerProperties())) { + checkServer(); + + assertUpdated(null, "foo"); + assertReverseUpdated(ImmutableList.of(), "foo"); + + long events = factory.getCompletedEventCount(); + + long start = System.currentTimeMillis(); + + log.info("----------------------- Sending foo tombstone -----------------------------"); + producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("foo"), null)); + while (events == factory.getCompletedEventCount()) { + Thread.sleep(10); + if (System.currentTimeMillis() > start + 60_000) { + throw new ISE("Took too long to update event"); + } + } + + log.info("----------------------- Checking foo removed -----------------------------"); + assertUpdated(null, "foo"); + assertReverseUpdated(ImmutableList.of(), "foo"); + } + } + private void assertUpdated( String expected, String key