From 9235a28f82cece2eeb6de1b4767910d9b8bf8ddc Mon Sep 17 00:00:00 2001 From: ijokarumawak Date: Thu, 7 Apr 2016 14:56:41 +0900 Subject: [PATCH] NIFI-1736 Move kafka.StreamScanner to nifi-utils. This closes #333 --- .../apache/nifi/stream/io/util}/StreamScanner.java | 12 ++++++------ .../nifi/stream/io/util}/StreamScannerTests.java | 2 +- .../apache/nifi/processors/kafka/KafkaPublisher.java | 5 +++-- 3 files changed, 10 insertions(+), 9 deletions(-) rename {nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka => nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util}/StreamScanner.java (94%) rename {nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka => nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util}/StreamScannerTests.java (99%) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java similarity index 94% rename from nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java rename to nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java index 57bbbcf9a3..901f31a7e4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.kafka; +package org.apache.nifi.stream.io.util; import java.io.BufferedInputStream; import java.io.IOException; @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; /** * */ -class StreamScanner { +public class StreamScanner { private final static int EOF = -1; @@ -51,7 +51,7 @@ class StreamScanner { * that neither {@link InputStream} nor its individual chunks (if * delimiter is used) can ever be greater then this size. */ - StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) { + public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) { this(is, delimiterBytes, maxDataSize, 8192); } @@ -74,7 +74,7 @@ class StreamScanner { * automatically as needed up to the Integer.MAX_VALUE; * */ - StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { + public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { this.is = new BufferedInputStream(is); this.delimiterBytes = delimiterBytes; this.buffer = ByteBuffer.allocate(initialBufferSize); @@ -89,7 +89,7 @@ class StreamScanner { * false when it reaches the end of the stream after the last * element was retrieved via {@link #next()} operation. */ - boolean hasNext() { + public boolean hasNext() { int j = 0; int readVal = 0; while (this.data == null && readVal != EOF) { @@ -124,7 +124,7 @@ class StreamScanner { * @return byte array representing the next segment in the stream or the * whole stream if no delimiter is used */ - byte[] next() { + public byte[] next() { try { return this.data; } finally { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java similarity index 99% rename from nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java rename to nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java index 1ebc4c4958..2dc8f0b70f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.kafka; +package org.apache.nifi.stream.io.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index ebdf5c8ad1..bcf10a4240 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.stream.io.util.StreamScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ class KafkaPublisher implements AutoCloseable { KafkaPublisher(Properties kafkaProperties) { kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - this.producer = new KafkaProducer(kafkaProperties); + this.producer = new KafkaProducer<>(kafkaProperties); this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2; try { if (kafkaProperties.containsKey("partitioner.class")){ @@ -132,7 +133,7 @@ class KafkaPublisher implements AutoCloseable { partitionKey = this.getPartition(key, topicName); } if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) { - ProducerRecord message = new ProducerRecord(topicName, partitionKey, key, content); + ProducerRecord message = new ProducerRecord<>(topicName, partitionKey, key, content); sendFutures.add(this.toKafka(message)); } segmentCounter++;