NIFI-2192: Fixed OOM issue in KafkaPublisher

This closes #618.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Oleg Zhurakousky 2016-07-08 08:22:24 -04:00 committed by Mark Payne
parent 22f72c3d2e
commit 7a901952b5
4 changed files with 122 additions and 18 deletions

View File

@ -56,6 +56,12 @@ class KafkaPublisher implements Closeable {
private final Partitioner partitioner; private final Partitioner partitioner;
private final int ackCheckSize;
KafkaPublisher(Properties kafkaProperties) {
this(kafkaProperties, 100);
}
/** /**
* Creates an instance of this class as well as the instance of the * Creates an instance of this class as well as the instance of the
* corresponding Kafka {@link KafkaProducer} using provided Kafka * corresponding Kafka {@link KafkaProducer} using provided Kafka
@ -65,10 +71,11 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap * instance of {@link Properties} used to bootstrap
* {@link KafkaProducer} * {@link KafkaProducer}
*/ */
KafkaPublisher(Properties kafkaProperties) { KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.kafkaProducer = new KafkaProducer<>(kafkaProperties); this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
this.ackCheckSize = ackCheckSize;
try { try {
if (kafkaProperties.containsKey("partitioner.class")) { if (kafkaProperties.containsKey("partitioner.class")) {
this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance(); this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
@ -117,7 +124,9 @@ class KafkaPublisher implements Closeable {
byte[] messageBytes; byte[] messageBytes;
int tokenCounter = 0; int tokenCounter = 0;
for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { boolean continueSending = true;
KafkaPublisherResult result = null;
for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
if (prevLastAckedMessageIndex < tokenCounter) { if (prevLastAckedMessageIndex < tokenCounter) {
Integer partitionId = publishingContext.getPartitionId(); Integer partitionId = publishingContext.getPartitionId();
if (partitionId == null && publishingContext.getKeyBytes() != null) { if (partitionId == null && publishingContext.getKeyBytes() != null) {
@ -126,11 +135,25 @@ class KafkaPublisher implements Closeable {
ProducerRecord<byte[], byte[]> message = ProducerRecord<byte[], byte[]> message =
new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes); new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes);
resultFutures.add(this.kafkaProducer.send(message)); resultFutures.add(this.kafkaProducer.send(message));
if (tokenCounter % this.ackCheckSize == 0) {
int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
resultFutures.clear();
if (lastAckedMessageIndex % this.ackCheckSize != 0) {
continueSending = false;
result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
}
prevLastAckedMessageIndex = lastAckedMessageIndex;
}
} }
} }
if (result == null) {
int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); resultFutures.clear();
result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
}
return result;
} }
/** /**

View File

@ -50,6 +50,12 @@ class KafkaPublisher implements Closeable {
private volatile ComponentLog processLog; private volatile ComponentLog processLog;
private final int ackCheckSize;
KafkaPublisher(Properties kafkaProperties) {
this(kafkaProperties, 100);
}
/** /**
* Creates an instance of this class as well as the instance of the * Creates an instance of this class as well as the instance of the
* corresponding Kafka {@link KafkaProducer} using provided Kafka * corresponding Kafka {@link KafkaProducer} using provided Kafka
@ -59,8 +65,9 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap * instance of {@link Properties} used to bootstrap
* {@link KafkaProducer} * {@link KafkaProducer}
*/ */
KafkaPublisher(Properties kafkaProperties) { KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
this.kafkaProducer = new KafkaProducer<>(kafkaProperties); this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
this.ackCheckSize = ackCheckSize;
} }
/** /**
@ -100,15 +107,31 @@ class KafkaPublisher implements Closeable {
byte[] messageBytes; byte[] messageBytes;
int tokenCounter = 0; int tokenCounter = 0;
for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { boolean continueSending = true;
KafkaPublisherResult result = null;
for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
if (prevLastAckedMessageIndex < tokenCounter) { if (prevLastAckedMessageIndex < tokenCounter) {
ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes); ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes);
resultFutures.add(this.kafkaProducer.send(message)); resultFutures.add(this.kafkaProducer.send(message));
if (tokenCounter % this.ackCheckSize == 0){
int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
resultFutures.clear();
if (lastAckedMessageIndex % this.ackCheckSize != 0) {
continueSending = false;
result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
}
prevLastAckedMessageIndex = lastAckedMessageIndex;
}
} }
} }
if (result == null) {
int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); resultFutures.clear();
result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
}
return result;
} }
/** /**

View File

@ -81,7 +81,7 @@ public class PublishKafkaTest {
@Test @Test
public void validateSingleCharacterDemarcatedMessages() { public void validateSingleCharacterDemarcatedMessages() {
String topicName = "validateSingleCharacterDemarcatedMessages"; String topicName = "validateSingleCharacterDemarcatedMessages";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka); TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo"); runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@ -99,9 +99,9 @@ public class PublishKafkaTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void validateMultiCharacterDemarcatedMessagesAndCustomPartitioner() { public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka); TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo"); runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@ -121,9 +121,56 @@ public class PublishKafkaTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void validateOnSendFailureAndThenResendSuccess() throws Exception { public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
StubPublishKafka putKafka = new StubPublishKafka(1);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
runner.setProperty(PublishKafka.KEY, "key1");
runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
assertEquals(0, runner.getQueueSize().getObjectCount());
Producer<byte[], byte[]> producer = putKafka.getProducer();
verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
runner.shutdown();
}
@SuppressWarnings("unchecked")
@Test
public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
String topicName = "validateSendFailureAndThenResendSuccess"; String topicName = "validateSendFailureAndThenResendSuccess";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
runner.setProperty(PublishKafka.KEY, "key1");
runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
final String text = "Hello World\nGoodbye\nfail\n2";
runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
runner.run(1, false);
assertEquals(0, runner.getQueueSize().getObjectCount());
Producer<byte[], byte[]> producer = putKafka.getProducer();
verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
runner.shutdown();
}
@SuppressWarnings("unchecked")
@Test
public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
String topicName = "validateSendFailureAndThenResendSuccess";
StubPublishKafka putKafka = new StubPublishKafka(1);
TestRunner runner = TestRunners.newTestRunner(putKafka); TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
@ -148,7 +195,7 @@ public class PublishKafkaTest {
@Test @Test
public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception { public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
String topicName = "validateSendFailureAndThenResendSuccess"; String topicName = "validateSendFailureAndThenResendSuccess";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka); TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
@ -177,7 +224,7 @@ public class PublishKafkaTest {
@Test @Test
public void validateDemarcationIntoEmptyMessages() { public void validateDemarcationIntoEmptyMessages() {
String topicName = "validateDemarcationIntoEmptyMessages"; String topicName = "validateDemarcationIntoEmptyMessages";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
final TestRunner runner = TestRunners.newTestRunner(putKafka); final TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.KEY, "key1"); runner.setProperty(PublishKafka.KEY, "key1");
@ -197,7 +244,7 @@ public class PublishKafkaTest {
@Test @Test
public void validateComplexRightPartialDemarcatedMessages() { public void validateComplexRightPartialDemarcatedMessages() {
String topicName = "validateComplexRightPartialDemarcatedMessages"; String topicName = "validateComplexRightPartialDemarcatedMessages";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka); TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo"); runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@ -216,7 +263,7 @@ public class PublishKafkaTest {
@Test @Test
public void validateComplexLeftPartialDemarcatedMessages() { public void validateComplexLeftPartialDemarcatedMessages() {
String topicName = "validateComplexLeftPartialDemarcatedMessages"; String topicName = "validateComplexLeftPartialDemarcatedMessages";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka); TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo"); runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@ -236,7 +283,7 @@ public class PublishKafkaTest {
@Test @Test
public void validateComplexPartialMatchDemarcatedMessages() { public void validateComplexPartialMatchDemarcatedMessages() {
String topicName = "validateComplexPartialMatchDemarcatedMessages"; String topicName = "validateComplexPartialMatchDemarcatedMessages";
StubPublishKafka putKafka = new StubPublishKafka(); StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka); TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName); runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo"); runner.setProperty(PublishKafka.CLIENT_ID, "foo");

View File

@ -43,6 +43,12 @@ public class StubPublishKafka extends PublishKafka {
private volatile boolean failed; private volatile boolean failed;
private final int ackCheckSize;
StubPublishKafka(int ackCheckSize) {
this.ackCheckSize = ackCheckSize;
}
public Producer<byte[], byte[]> getProducer() { public Producer<byte[], byte[]> getProducer() {
return producer; return producer;
} }
@ -65,7 +71,12 @@ public class StubPublishKafka extends PublishKafka {
Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
kf.setAccessible(true); kf.setAccessible(true);
kf.set(publisher, producer); kf.set(publisher, producer);
Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
ackCheckSizeField.setAccessible(true);
ackCheckSizeField.set(publisher, this.ackCheckSize);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
return publisher; return publisher;