NIFI-1645 refactored PutKafka

- used newest API available in 0.8.* version
- added PutKafka integration tests
- Kafka module code coverage is at 85%

NIFI-1645 polishing

NIFI-1645 PR comments round 1

NIFI-1645 PR comments round 2

NIFI-1645 change to use async Kafka producer

NIFI-1645 polishing

NIFI-1645 polishing

NIFI-1645 polishing

NIFI-1645 changed from java.util.Scanner to custom StreamScanner

NIFI-1645 polishing

NIFI-1645 final polish
This commit is contained in:
Oleg Zhurakousky 2016-03-22 10:50:07 -04:00
parent 736896246c
commit e0e00ff282
8 changed files with 1172 additions and 1162 deletions

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
/**
* Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor with
* sending content of {@link FlowFile}s to Kafka.
*/
public class KafkaPublisher implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
private final KafkaProducer<byte[], byte[]> producer;
private final Partitioner partitioner;
private final long ackWaitTime;
private ProcessorLog processLog;
/**
* Creates an instance of this class as well as the instance of the
* corresponding Kafka {@link KafkaProducer} using provided Kafka
* configuration properties.
*/
KafkaPublisher(Properties kafkaProperties) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.producer = new KafkaProducer<byte[], byte[]>(kafkaProperties);
this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2;
try {
if (kafkaProperties.containsKey("partitioner.class")){
this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
} else {
this.partitioner = null;
}
} catch (Exception e) {
throw new IllegalStateException("Failed to create partitioner", e);
}
}
/**
*
*/
void setProcessLog(ProcessorLog processLog) {
this.processLog = processLog;
}
/**
* Publishes messages to Kafka topic. It supports three publishing
* mechanisms.
* <ul>
* <li>Sending the entire content stream as a single Kafka message.</li>
* <li>Splitting the incoming content stream into chunks and sending
* individual chunks as separate Kafka messages.</li>
* <li>Splitting the incoming content stream into chunks and sending only
* the chunks that have failed previously @see
* {@link SplittableMessageContext#getFailedSegments()}.</li>
* </ul>
* This method assumes content stream affinity where it is expected that the
* content stream that represents the same Kafka message(s) will remain the
* same across possible retries. This is required specifically for cases
* where delimiter is used and a single content stream may represent
* multiple Kafka messages. The failed segment list will keep the index of
* of each content stream segment that had failed to be sent to Kafka, so
* upon retry only the failed segments are sent.
*
* @param messageContext
* instance of {@link SplittableMessageContext} which hold
* context information about the message to be sent
* @param contentStream
* instance of open {@link InputStream} carrying the content of
* the message(s) to be send to Kafka
* @param partitionKey
* the value of the partition key. Only relevant is user wishes
* to provide a custom partition key instead of relying on
* variety of provided {@link Partitioner}(s)
* @return The set containing the failed segment indexes for messages that
* failed to be sent to Kafka.
*/
BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey) {
List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments();
int segmentCounter = 0;
StreamScanner scanner = new StreamScanner(contentStream, messageContext.getDelimiterPattern());
while (scanner.hasNext()) {
byte[] content = scanner.next();
if (content.length > 0){
byte[] key = messageContext.getKeyBytes();
String topicName = messageContext.getTopicName();
if (partitionKey == null && key != null) {
partitionKey = this.getPartition(key, topicName);
}
if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) {
ProducerRecord<byte[], byte[]> message = new ProducerRecord<byte[], byte[]>(topicName, partitionKey, key, content);
sendFutures.add(this.toKafka(message));
}
segmentCounter++;
}
}
scanner.close();
return this.processAcks(sendFutures);
}
/**
*
*/
private BitSet processAcks(List<Future<RecordMetadata>> sendFutures) {
int segmentCounter = 0;
BitSet failedSegments = new BitSet();
for (Future<RecordMetadata> future : sendFutures) {
try {
future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
failedSegments.set(segmentCounter);
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for acks from Kafka");
if (this.processLog != null) {
this.processLog.warn("Interrupted while waiting for acks from Kafka");
}
} catch (ExecutionException e) {
failedSegments.set(segmentCounter);
logger.error("Failed while waiting for acks from Kafka", e);
if (this.processLog != null) {
this.processLog.error("Failed while waiting for acks from Kafka", e);
}
} catch (TimeoutException e) {
failedSegments.set(segmentCounter);
logger.warn("Timed out while waiting for acks from Kafka");
if (this.processLog != null) {
this.processLog.warn("Timed out while waiting for acks from Kafka");
}
}
segmentCounter++;
}
return failedSegments;
}
/**
*
*/
private int getPartition(Object key, String topicName) {
int partSize = this.producer.partitionsFor(topicName).size();
return this.partitioner.partition(key, partSize);
}
/**
* Closes {@link KafkaProducer}
*/
@Override
public void close() throws Exception {
this.producer.close();
}
/**
* Sends the provided {@link KeyedMessage} to Kafka async returning
* {@link Future}
*/
private Future<RecordMetadata> toKafka(ProducerRecord<byte[], byte[]> message) {
if (logger.isDebugEnabled()) {
logger.debug("Publishing message to '" + message.topic() + "' topic.");
}
return this.producer.send(message);
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import java.util.Random;
import kafka.producer.Partitioner;
/**
* Collection of implementation of common Kafka {@link Partitioner}s.
*/
final public class Partitioners {
private Partitioners() {
}
/**
* {@link Partitioner} that implements 'round-robin' mechanism which evenly
* distributes load between all available partitions.
*/
public static class RoundRobinPartitioner implements Partitioner {
private volatile int index;
@Override
public int partition(Object key, int numberOfPartitions) {
int partitionIndex = this.next(numberOfPartitions);
return partitionIndex;
}
private int next(int numberOfPartitions) {
if (index == numberOfPartitions) {
index = 0;
}
int indexToReturn = index++;
return indexToReturn;
}
}
/**
* {@link Partitioner} that implements 'random' mechanism which randomly
* distributes the load between all available partitions.
*/
public static class RandomPartitioner implements Partitioner {
private final Random random;
public RandomPartitioner() {
this.random = new Random();
}
@Override
public int partition(Object key, int numberOfPartitions) {
return this.random.nextInt(numberOfPartitions);
}
}
/**
* {@link Partitioner} that implements 'key hash' mechanism which
* distributes the load between all available partitions based on hashing
* the value of the key.
*/
public static class HashPartitioner implements Partitioner {
@Override
public int partition(Object key, int numberOfPartitions) {
if (key != null) {
return (key.hashCode() & Integer.MAX_VALUE) % numberOfPartitions;
}
return 0;
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import java.util.BitSet;
import org.apache.nifi.flowfile.FlowFile;
/**
* Context object that serves as a bridge between the content of a FlowFile and
* Kafka message(s). It contains all necessary information to allow
* {@link KafkaPublisher} to determine how a each content of the
* {@link FlowFile} must be sent to Kafka.
*/
final class SplittableMessageContext {
private final String topicName;
private final String delimiterPattern;
private final byte[] keyBytes;
private volatile BitSet failedSegments;
/**
* @param topicName
* the name of the Kafka topic
* @param keyBytes
* the instance of byte[] representing the key. Can be null.
* @param delimiterPattern
* the string representing the delimiter regex pattern. Can be
* null. For cases where it is null the EOF pattern will be used
* - "(\\W)\\Z".
*/
SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) {
this.topicName = topicName;
this.keyBytes = keyBytes;
this.delimiterPattern = delimiterPattern != null ? delimiterPattern : "(\\W)\\Z";
}
/**
*
*/
@Override
public String toString() {
return "topic: '" + topicName + "'; delimiter: '" + delimiterPattern + "'";
}
/**
*
*/
void setFailedSegments(int... failedSegments) {
this.failedSegments = new BitSet();
for (int failedSegment : failedSegments) {
this.failedSegments.set(failedSegment);
}
}
/**
*
*/
void setFailedSegmentsAsByteArray(byte[] failedSegments) {
this.failedSegments = BitSet.valueOf(failedSegments);
}
/**
* Returns the list of integers representing the segments (chunks) of the
* delimited content stream that had failed to be sent to Kafka topic.
*/
BitSet getFailedSegments() {
return this.failedSegments;
}
/**
* Returns the name of the Kafka topic
*/
String getTopicName() {
return this.topicName;
}
/**
* Returns the value of the delimiter regex pattern.
*/
String getDelimiterPattern() {
return this.delimiterPattern;
}
/**
* Returns the key bytes as String
*/
String getKeyBytesAsString() {
return new String(this.keyBytes);
}
/**
* Returns the key bytes
*/
byte[] getKeyBytes() {
return this.keyBytes;
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.Arrays;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
/**
*
*/
class StreamScanner {
private final InputStream is;
private final byte[] delimiter;
private final NonThreadSafeCircularBuffer buffer;
private final ByteArrayOutputStream baos;
private byte[] data;
private boolean eos;
/**
*
*/
StreamScanner(InputStream is, String delimiter) {
this.is = new BufferedInputStream(is);
this.delimiter = delimiter.getBytes();
buffer = new NonThreadSafeCircularBuffer(this.delimiter);
baos = new ByteArrayOutputStream();
}
/**
*
*/
boolean hasNext() {
this.data = null;
if (!this.eos) {
try {
boolean keepReading = true;
while (keepReading) {
byte b = (byte) this.is.read();
if (b > -1) {
baos.write(b);
if (buffer.addAndCompare(b)) {
this.data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiter.length);
keepReading = false;
}
} else {
this.data = baos.toByteArray();
keepReading = false;
this.eos = true;
}
}
baos.reset();
} catch (Exception e) {
throw new IllegalStateException("Failed while reading InputStream", e);
}
}
return this.data != null;
}
/**
*
*/
byte[] next() {
return this.data;
}
void close() {
this.baos.close();
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaPublisherTest {
private static final String sampleData = "The true sign of intelligence is not knowledge but imagination.\n"
+ "It's not that I'm so smart, it's just that I stay with problems longer.\n"
+ "The only source of knowledge is experience.\n"
+ "Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.\n";
private static final String sampleData2 = "foo|bar|baz";
private static EmbeddedKafka kafkaLocal;
private static EmbeddedKafkaProducerHelper producerHelper;
@BeforeClass
public static void bforeClass() {
kafkaLocal = new EmbeddedKafka();
kafkaLocal.start();
producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
}
@AfterClass
public static void afterClass() throws Exception {
producerHelper.close();
kafkaLocal.stop();
}
String test = "Khalid El Bakraoui rented an apartment in Brussels that was raided last week and both are suspected of having ties to "
+ "the terror attacks in Paris in November, the source said. While Belgian officials say both brothers were suicide bombers, a U.S. "
+ "official briefed earlier on preliminary evidence from the investigation says authorities are looking at the possibility that one of "
+ "the airport explosions may have been caused by a bomb inside a suitcase and the other was a suicide bombing. But identifying the brothers "
+ "should help spring the investigation forward, says Cedric Leighton, a CNN military analyst and the former deputy director for the Joint Chiefs of Staff.";
@Test
public void validateSuccessfulSendAsWhole() throws Exception {
InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
String topicName = "validateSuccessfulSendAsWhole";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null);
publisher.publish(messageContext, fis, null);
fis.close();
publisher.close();
ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
assertNotNull(iter.next());
try {
iter.next();
} catch (ConsumerTimeoutException e) {
// that's OK since this is the Kafka mechanism to unblock
}
}
@Test
public void validateSuccessfulSendAsDelimited() throws Exception {
InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
String topicName = "validateSuccessfulSendAsDelimited";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n");
publisher.publish(messageContext, fis, null);
publisher.close();
ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
assertNotNull(iter.next());
assertNotNull(iter.next());
assertNotNull(iter.next());
assertNotNull(iter.next());
try {
iter.next();
fail();
} catch (ConsumerTimeoutException e) {
// that's OK since this is the Kafka mechanism to unblock
}
}
@Test
public void validateSuccessfulSendAsDelimited2() throws Exception {
InputStream fis = new ByteArrayInputStream(sampleData2.getBytes(StandardCharsets.UTF_8));
String topicName = "validateSuccessfulSendAsDelimited2";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "|");
publisher.publish(messageContext, fis, null);
publisher.close();
ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
assertNotNull(iter.next());
assertNotNull(iter.next());
assertNotNull(iter.next());
try {
iter.next();
fail();
} catch (ConsumerTimeoutException e) {
// that's OK since this is the Kafka mechanism to unblock
}
}
@Test
public void validateSuccessfulReSendOfFailedSegments() throws Exception {
InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
String topicName = "validateSuccessfulReSendOfFailedSegments";
Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n");
messageContext.setFailedSegments(1, 3);
publisher.publish(messageContext, fis, null);
publisher.close();
ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
String m1 = new String(iter.next().message());
String m2 = new String(iter.next().message());
assertEquals("It's not that I'm so smart, it's just that I stay with problems longer.", m1);
assertEquals("Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.", m2);
try {
iter.next();
fail();
} catch (ConsumerTimeoutException e) {
// that's OK since this is the Kafka mechanism to unblock
}
}
private Properties buildProducerProperties() {
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "0.0.0.0:" + kafkaLocal.getKafkaPort());
kafkaProperties.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
kafkaProperties.setProperty("acks", "1");
kafkaProperties.put("auto.create.topics.enable", "true");
kafkaProperties.setProperty("partitioner.class", "org.apache.nifi.processors.kafka.Partitioners$RoundRobinPartitioner");
kafkaProperties.setProperty("timeout.ms", "5000");
return kafkaProperties;
}
private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort());
props.put("group.id", "test");
props.put("consumer.timeout.ms", "5000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
return iter;
}
}

View File

@ -17,462 +17,189 @@
package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.Properties;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import kafka.common.FailedToSendMessageException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class TestPutKafka {
@Test
public void testMultipleKeyValuePerFlowFile() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
private static EmbeddedKafka kafkaLocal;
runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles.
private static EmbeddedKafkaProducerHelper producerHelper;
@BeforeClass
public static void bforeClass() {
kafkaLocal = new EmbeddedKafka();
kafkaLocal.start();
producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
}
@AfterClass
public static void afterClass() throws Exception {
producerHelper.close();
kafkaLocal.stop();
}
@Test
public void testDelimitedMessagesWithKey() {
String topicName = "testDelimitedMessagesWithKey";
PutKafka putKafka = new PutKafka();
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PutKafka.TOPIC, topicName);
runner.setProperty(PutKafka.CLIENT_NAME, "foo");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes());
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message()));
assertEquals("Goodbye", new String(consumer.next().message()));
assertEquals("1", new String(consumer.next().message()));
assertEquals("2", new String(consumer.next().message()));
assertEquals("3", new String(consumer.next().message()));
assertEquals("4", new String(consumer.next().message()));
assertEquals("5", new String(consumer.next().message()));
final List<ProducerRecord<byte[], byte[]>> messages = ((MockProducer) proc.getProducer()).getMessages();
assertEquals(11, messages.size());
assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value()));
assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value()));
for (int i = 1; i <= 9; i++) {
assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value()));
}
runner.shutdown();
}
@Test
public void testWithImmediateFailure() {
final TestableProcessor proc = new TestableProcessor(0);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
@Ignore
public void testWithFailureAndPartialResend() throws Exception {
String topicName = "testWithImmediateFailure";
PutKafka putKafka = new PutKafka();
final TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PutKafka.TOPIC, topicName);
runner.setProperty(PutKafka.CLIENT_NAME, "foo");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.SEED_BROKERS, "0.0.0.0:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
final String text = "Hello World\nGoodbye\n1\n2";
runner.enqueue(text.getBytes());
runner.run(2);
afterClass(); // kill Kafka right before send to ensure producer fails
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
mff.assertContentEquals(text);
}
MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS);
BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes());
assertTrue(fs.get(0));
assertTrue(fs.get(1));
assertTrue(fs.get(2));
assertTrue(fs.get(3));
String delimiter = ff.getAttribute(PutKafka.ATTR_DELIMITER);
assertEquals("\n", delimiter);
String key = ff.getAttribute(PutKafka.ATTR_KEY);
assertEquals("key1", key);
String topic = ff.getAttribute(PutKafka.ATTR_TOPIC);
assertEquals(topicName, topic);
@Test
public void testPartialFailure() {
final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages.
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
bforeClass();
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
Map<String, String> attr = new HashMap<>(ff.getAttributes());
/*
* So here we are emulating partial success. Basically even though all 4
* messages failed to be sent by changing the ATTR_FAILED_SEGMENTS value
* we essentially saying that only two failed and need to be resent.
*/
BitSet _fs = new BitSet();
_fs.set(1);
_fs.set(3);
attr.put(PutKafka.ATTR_FAILED_SEGMENTS, new String(_fs.toByteArray(), StandardCharsets.UTF_8));
ff.putAttributes(attr);
runner.enqueue(ff);
runner.run(1, false);
MockFlowFile sff = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
assertNull(sff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS));
assertNull(sff.getAttribute(PutKafka.ATTR_TOPIC));
assertNull(sff.getAttribute(PutKafka.ATTR_KEY));
assertNull(sff.getAttribute(PutKafka.ATTR_DELIMITER));
final byte[] bytes = "1\n2\n3\n4".getBytes();
runner.enqueue(bytes);
runner.run(2);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
successFF.assertContentEquals("1\n2\n");
final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
failureFF.assertContentEquals("3\n4");
}
@Test
public void testPartialFailureWithSuccessBeforeAndAfter() {
final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes();
runner.enqueue(bytes);
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 2);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
final List<MockFlowFile> success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
for (final MockFlowFile successFF : success) {
if ('1' == successFF.toByteArray()[0]) {
successFF.assertContentEquals("1\n2\n");
} else if ('5' == successFF.toByteArray()[0]) {
successFF.assertContentEquals("5\n6");
} else {
Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray()));
assertEquals("Goodbye", new String(consumer.next().message()));
assertEquals("2", new String(consumer.next().message()));
try {
consumer.next();
fail();
} catch (Exception e) {
// ignore
}
}
final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
failureFF.assertContentEquals("3\n4\n");
}
@Test
public void testWithEmptyMessages() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
String topicName = "testWithEmptyMessages";
PutKafka putKafka = new PutKafka();
final TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PutKafka.TOPIC, topicName);
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.CLIENT_NAME, "foo");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run(2);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> msgs = ((MockProducer) proc.getProducer()).getMessages();
assertEquals(4, msgs.size());
for (int i = 1; i <= 4; i++) {
assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value()));
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertNotNull(consumer.next());
assertNotNull(consumer.next());
assertNotNull(consumer.next());
assertNotNull(consumer.next());
try {
consumer.next();
fail();
} catch (Exception e) {
// ignore
}
}
@Test
public void testProvenanceReporterMessagesCount() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run(2);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
assertTrue(event.getDetails().startsWith("Sent 4 messages"));
}
@Test
public void testProvenanceReporterWithoutDelimiterMessagesCount() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run(2);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
}
@Test
public void testRoundRobinAcrossMultipleMessages() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
runner.enqueue("hello".getBytes());
runner.enqueue("there".getBytes());
runner.enqueue("how are you".getBytes());
runner.enqueue("today".getBytes());
runner.run(5);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, records.get(i).partition().intValue());
}
assertEquals(1, records.get(3).partition().intValue());
}
@Test
public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes());
runner.run(2);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, records.get(i).partition().intValue());
}
assertEquals(1, records.get(3).partition().intValue());
}
@Test
public void testUserDefinedPartition() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
runner.setProperty(PutKafka.PARTITION, "${part}");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final Map<String, String> attrs = new HashMap<>();
attrs.put("part", "3");
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
runner.run(2);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 4; i++) {
assertEquals(3, records.get(i).partition().intValue());
}
}
@Test
public void testUserDefinedPartitionWithInvalidValue() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
runner.setProperty(PutKafka.PARTITION, "${part}");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final Map<String, String> attrs = new HashMap<>();
attrs.put("part", "bogus");
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
runner.run(2);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
// should all be the same partition, regardless of what partition it is.
final int partition = records.get(0).partition().intValue();
for (int i = 0; i < 4; i++) {
assertEquals(partition, records.get(i).partition().intValue());
}
}
@Test
public void testFullBuffer() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B");
proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value.
runner.enqueue("1\n2\n3\n4\n".getBytes());
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n");
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n");
}
/**
* Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used
*/
private static class TestableProcessor extends PutKafka {
private final MockProducer producer;
public TestableProcessor() {
this(null);
}
public TestableProcessor(final Integer failAfter) {
this(failAfter, null);
}
public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) {
producer = new MockProducer();
producer.setFailAfter(failAfter);
producer.setStopFailingAfter(stopFailingAfter);
}
@Override
protected Producer<byte[], byte[]> getProducer() {
return producer;
}
public void setMaxQueueSize(final long bytes) {
producer.setMaxQueueSize(bytes);
}
}
/**
* We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied
* Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it
* to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor,
* this doesn't allow us to test failure conditions adequately.
*/
private static class MockProducer implements Producer<byte[], byte[]> {
private int sendCount = 0;
private Integer failAfter;
private Integer stopFailingAfter;
private long queueSize = 0L;
private long maxQueueSize = Long.MAX_VALUE;
private final List<ProducerRecord<byte[], byte[]>> messages = new ArrayList<>();
public MockProducer() {
}
public void setMaxQueueSize(final long bytes) {
this.maxQueueSize = bytes;
}
public List<ProducerRecord<byte[], byte[]>> getMessages() {
return messages;
}
public void setFailAfter(final Integer successCount) {
failAfter = successCount;
}
public void setStopFailingAfter(final Integer stopFailingAfter) {
this.stopFailingAfter = stopFailingAfter;
}
@Override
public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record) {
return send(record, null);
}
@Override
public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
sendCount++;
final ByteArraySerializer serializer = new ByteArraySerializer();
final int keyBytes = serializer.serialize(record.topic(), record.key()).length;
final int valueBytes = serializer.serialize(record.topic(), record.value()).length;
if (maxQueueSize - queueSize < keyBytes + valueBytes) {
throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes));
}
queueSize += keyBytes + valueBytes;
if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) {
final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
callback.onCompletion(null, e);
} else {
messages.add(record);
final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L);
callback.onCompletion(meta, null);
}
// we don't actually look at the Future in the processor, so we can just return null
return null;
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
final Node leader = new Node(1, "localhost", 1111);
final Node node2 = new Node(2, "localhost-2", 2222);
final Node node3 = new Node(3, "localhost-3", 3333);
final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]);
final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]);
final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]);
final List<PartitionInfo> infos = new ArrayList<>(3);
infos.add(partInfo1);
infos.add(partInfo2);
infos.add(partInfo3);
return infos;
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.emptyMap();
}
@Override
public void close() {
}
private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());
props.put("group.id", "test");
props.put("consumer.timeout.ms", "5000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
return iter;
}
}

View File

@ -12,12 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootCategory=WARN, stdout
log4j.rootCategory=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
log4j.category.org.apache.nifi.processors.kafka=INFO
log4j.category.kafka=ERROR
#log4j.category.org.apache.nifi.startup=INFO
log4j.category.org.apache.nifi.processors.kafka=DEBUG