NIFI-2444 NIFI-2445 fixed PublishKafka

- fixed the logging issue NIFI-2444 by ensuring the ProcessLog is added to KafkaPublisher
- fixed KafkaPublisher's isAllAcked operation to ensure that it properly reports that the flow file has failed.
- added additional test
This closes #758.
This commit is contained in:
Oleg Zhurakousky 2016-08-01 10:56:46 -04:00 committed by Mark Payne
parent 52d97f966d
commit 54549891e3
8 changed files with 96 additions and 73 deletions

View File

@ -35,8 +35,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.producer.Partitioner;
@ -46,20 +44,18 @@ import kafka.producer.Partitioner;
*/
class KafkaPublisher implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
private final Producer<byte[], byte[]> kafkaProducer;
private long ackWaitTime = 30000;
private ComponentLog processLog;
private final ComponentLog componentLog;
private final Partitioner partitioner;
private final int ackCheckSize;
KafkaPublisher(Properties kafkaProperties) {
this(kafkaProperties, 100);
KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
this(kafkaProperties, 100, componentLog);
}
/**
@ -71,7 +67,7 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap
* {@link KafkaProducer}
*/
KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
@ -85,6 +81,7 @@ class KafkaPublisher implements Closeable {
} catch (Exception e) {
throw new IllegalStateException("Failed to create partitioner", e);
}
this.componentLog = componentLog;
}
/**
@ -220,28 +217,14 @@ class KafkaPublisher implements Closeable {
this.kafkaProducer.close();
}
/**
* Will set {@link ComponentLog} as an additional logger to forward log
* messages to NiFi bulletin
*/
void setProcessLog(ComponentLog processLog) {
this.processLog = processLog;
}
/**
*
*/
private void warnOrError(String message, Exception e) {
if (e == null) {
logger.warn(message);
if (this.processLog != null) {
this.processLog.warn(message);
}
this.componentLog.warn(message);
} else {
logger.error(message, e);
if (this.processLog != null) {
this.processLog.error(message, e);
}
this.componentLog.error(message);
}
}
@ -262,7 +245,7 @@ class KafkaPublisher implements Closeable {
}
public boolean isAllAcked() {
return this.messagesSent - 1 == this.lastMessageAcked;
return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
}
@Override

View File

@ -388,8 +388,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
@Override
protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
throws ProcessException {
KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context));
kafkaPublisher.setProcessLog(this.getLogger());
KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger());
return kafkaPublisher;
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@ -29,6 +30,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
@ -70,7 +72,7 @@ public class KafkaPublisherTest {
String topicName = "validateSuccessfulSendAsWhole";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
KafkaPublisherResult result = publisher.publish(publishingContext);
@ -96,7 +98,7 @@ public class KafkaPublisherTest {
String topicName = "validateSuccessfulSendAsDelimited";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
@ -132,7 +134,7 @@ public class KafkaPublisherTest {
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
// simulates the first re-try
int lastAckedMessageIndex = 1;
@ -179,7 +181,7 @@ public class KafkaPublisherTest {
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
// simulates the first re-try
int lastAckedMessageIndex = 3;
@ -221,7 +223,7 @@ public class KafkaPublisherTest {
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publisher.publish(publishingContext);

View File

@ -33,27 +33,22 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
* with sending contents of the {@link FlowFile}s to Kafka.
*/
class KafkaPublisher implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
private final Producer<byte[], byte[]> kafkaProducer;
private volatile long ackWaitTime = 30000;
private volatile ComponentLog processLog;
private final ComponentLog componentLog;
private final int ackCheckSize;
KafkaPublisher(Properties kafkaProperties) {
this(kafkaProperties, 100);
KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
this(kafkaProperties, 100, componentLog);
}
/**
@ -65,9 +60,10 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap
* {@link KafkaProducer}
*/
KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
this.ackCheckSize = ackCheckSize;
this.componentLog = componentLog;
}
/**
@ -199,28 +195,14 @@ class KafkaPublisher implements Closeable {
this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
}
/**
* Will set {@link ComponentLog} as an additional logger to forward log
* messages to NiFi bulletin
*/
void setProcessLog(ComponentLog processLog) {
this.processLog = processLog;
}
/**
*
*/
private void warnOrError(String message, Exception e) {
if (e == null) {
logger.warn(message);
if (this.processLog != null) {
this.processLog.warn(message);
}
this.componentLog.warn(message);
} else {
logger.error(message, e);
if (this.processLog != null) {
this.processLog.error(message, e);
}
this.componentLog.error(message, e);
}
}
@ -244,7 +226,7 @@ class KafkaPublisher implements Closeable {
}
public boolean isAllAcked() {
return this.messagesSent - 1 == this.lastMessageAcked;
return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
}
@Override

View File

@ -228,7 +228,8 @@ public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
return new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger());
return publisher;
}
/**

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@ -32,6 +33,7 @@ import java.util.Properties;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
@ -71,7 +73,7 @@ public class KafkaPublisherTest {
String topicName = "validateSuccessfulSendAsWhole";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
KafkaPublisherResult result = publisher.publish(publishingContext);
@ -97,7 +99,7 @@ public class KafkaPublisherTest {
String topicName = "validateSuccessfulSendAsDelimited";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
@ -133,7 +135,7 @@ public class KafkaPublisherTest {
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
// simulates the first re-try
int lastAckedMessageIndex = 1;
@ -180,7 +182,7 @@ public class KafkaPublisherTest {
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
// simulates the first re-try
int lastAckedMessageIndex = 3;
@ -221,7 +223,7 @@ public class KafkaPublisherTest {
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publisher.publish(publishingContext);
@ -240,7 +242,7 @@ public class KafkaPublisherTest {
Properties kafkaProperties = this.buildProducerProperties();
kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName());
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));

View File

@ -153,7 +153,7 @@ public class PublishKafkaTest {
runner.setProperty(PublishKafka.KEY, "key1");
runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis");
final String text = "Hello World\nGoodbye\nfail\n2";
runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
@ -164,6 +164,7 @@ public class PublishKafkaTest {
Producer<byte[], byte[]> producer = putKafka.getProducer();
verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
runner.shutdown();
putKafka.destroy();
}
@SuppressWarnings("unchecked")
@ -191,6 +192,35 @@ public class PublishKafkaTest {
runner.shutdown();
}
@SuppressWarnings("unchecked")
@Test
public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
String topicName = "validateSendFailureAndThenResendSuccess";
StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
runner.setProperty(PublishKafka.KEY, "key1");
runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
final String text = "futurefail\nHello World\nGoodbye\n2";
runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
assertNotNull(ff);
runner.enqueue(ff);
runner.run(1, false);
assertEquals(0, runner.getQueueSize().getObjectCount());
Producer<byte[], byte[]> producer = putKafka.getProducer();
// 6 sends due to duplication
verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
runner.shutdown();
}
@SuppressWarnings("unchecked")
@Test
public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {

View File

@ -22,14 +22,19 @@ import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
@ -45,6 +50,8 @@ public class StubPublishKafka extends PublishKafka {
private final int ackCheckSize;
private final ExecutorService executor = Executors.newCachedThreadPool();
StubPublishKafka(int ackCheckSize) {
this.ackCheckSize = ackCheckSize;
}
@ -53,6 +60,10 @@ public class StubPublishKafka extends PublishKafka {
return producer;
}
public void destroy() {
this.executor.shutdownNow();
}
@SuppressWarnings("unchecked")
@Override
protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
@ -66,12 +77,17 @@ public class StubPublishKafka extends PublishKafka {
f.setAccessible(true);
f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
publisher.setAckWaitTime(15000);
producer = mock(Producer.class);
this.instrumentProducer(producer, false);
Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
kf.setAccessible(true);
kf.set(publisher, producer);
Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog");
componentLogF.setAccessible(true);
componentLogF.set(publisher, mock(ComponentLog.class));
Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
ackCheckSizeField.setAccessible(true);
ackCheckSizeField.set(publisher, this.ackCheckSize);
@ -84,8 +100,8 @@ public class StubPublishKafka extends PublishKafka {
@SuppressWarnings("unchecked")
private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) {
when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
@SuppressWarnings("rawtypes")
@Override
public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0];
@ -94,11 +110,19 @@ public class StubPublishKafka extends PublishKafka {
StubPublishKafka.this.failed = true;
throw new RuntimeException("intentional");
}
Future future = mock(Future.class);
if ("futurefail".equals(value) && !StubPublishKafka.this.failed) {
StubPublishKafka.this.failed = true;
when(future.get(Mockito.anyLong(), Mockito.any())).thenThrow(ExecutionException.class);
}
Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() {
@Override
public RecordMetadata call() throws Exception {
if ("futurefail".equals(value) && !StubPublishKafka.this.failed) {
StubPublishKafka.this.failed = true;
throw new TopicAuthorizationException("Unauthorized");
} else {
TopicPartition partition = new TopicPartition("foo", 0);
RecordMetadata meta = new RecordMetadata(partition, 0, 0);
return meta;
}
}
});
return future;
}
});