NIFI-1736 Move kafka.StreamScanner to nifi-utils. This closes #333

This commit is contained in:
ijokarumawak 2016-04-07 14:56:41 +09:00 committed by Oleg Zhurakousky
parent 3adb45eafc
commit 9235a28f82
3 changed files with 10 additions and 9 deletions

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.kafka; package org.apache.nifi.stream.io.util;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
/** /**
* *
*/ */
class StreamScanner { public class StreamScanner {
private final static int EOF = -1; private final static int EOF = -1;
@ -51,7 +51,7 @@ class StreamScanner {
* that neither {@link InputStream} nor its individual chunks (if * that neither {@link InputStream} nor its individual chunks (if
* delimiter is used) can ever be greater then this size. * delimiter is used) can ever be greater then this size.
*/ */
StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) { public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) {
this(is, delimiterBytes, maxDataSize, 8192); this(is, delimiterBytes, maxDataSize, 8192);
} }
@ -74,7 +74,7 @@ class StreamScanner {
* automatically as needed up to the Integer.MAX_VALUE; * automatically as needed up to the Integer.MAX_VALUE;
* *
*/ */
StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
this.is = new BufferedInputStream(is); this.is = new BufferedInputStream(is);
this.delimiterBytes = delimiterBytes; this.delimiterBytes = delimiterBytes;
this.buffer = ByteBuffer.allocate(initialBufferSize); this.buffer = ByteBuffer.allocate(initialBufferSize);
@ -89,7 +89,7 @@ class StreamScanner {
* <i>false</i> when it reaches the end of the stream after the last * <i>false</i> when it reaches the end of the stream after the last
* element was retrieved via {@link #next()} operation. * element was retrieved via {@link #next()} operation.
*/ */
boolean hasNext() { public boolean hasNext() {
int j = 0; int j = 0;
int readVal = 0; int readVal = 0;
while (this.data == null && readVal != EOF) { while (this.data == null && readVal != EOF) {
@ -124,7 +124,7 @@ class StreamScanner {
* @return byte array representing the next segment in the stream or the * @return byte array representing the next segment in the stream or the
* whole stream if no delimiter is used * whole stream if no delimiter is used
*/ */
byte[] next() { public byte[] next() {
try { try {
return this.data; return this.data;
} finally { } finally {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.kafka; package org.apache.nifi.stream.io.util;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;

View File

@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.stream.io.util.StreamScanner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -63,7 +64,7 @@ class KafkaPublisher implements AutoCloseable {
KafkaPublisher(Properties kafkaProperties) { KafkaPublisher(Properties kafkaProperties) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.producer = new KafkaProducer<byte[], byte[]>(kafkaProperties); this.producer = new KafkaProducer<>(kafkaProperties);
this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2; this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2;
try { try {
if (kafkaProperties.containsKey("partitioner.class")){ if (kafkaProperties.containsKey("partitioner.class")){
@ -132,7 +133,7 @@ class KafkaPublisher implements AutoCloseable {
partitionKey = this.getPartition(key, topicName); partitionKey = this.getPartition(key, topicName);
} }
if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) { if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) {
ProducerRecord<byte[], byte[]> message = new ProducerRecord<byte[], byte[]>(topicName, partitionKey, key, content); ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(topicName, partitionKey, key, content);
sendFutures.add(this.toKafka(message)); sendFutures.add(this.toKafka(message));
} }
segmentCounter++; segmentCounter++;