NIFI-3863: Initial implementation of Lookup Services. Implemented LookupRecord processors. This required some refactoring of RecordSetWriter interface, so refactored that interface and all implementations and references of it

This commit is contained in:
Mark Payne 2017-05-10 13:09:45 -04:00 committed by joewitt
parent 3b98abbf41
commit 9bd0246a96
66 changed files with 2619 additions and 430 deletions

View File

@ -16,7 +16,11 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessorInitializationContext;
@ -28,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
private ControllerServiceLookup serviceLookup;
private ComponentLog logger;
private StateManager stateManager;
private volatile ConfigurationContext configurationContext;
@Override
public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
@ -75,4 +80,27 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
protected StateManager getStateManager() {
return stateManager;
}
@OnEnabled
public final void abstractStoreConfigContext(final ConfigurationContext configContext) {
this.configurationContext = configContext;
}
@OnDisabled
public final void abstractClearConfigContext() {
this.configurationContext = null;
}
protected ConfigurationContext getConfigurationContext() {
final ConfigurationContext context = this.configurationContext;
if (context == null) {
throw new IllegalStateException("No Configuration Context exists");
}
return configurationContext;
}
protected PropertyValue getProperty(final PropertyDescriptor descriptor) {
return getConfigurationContext().getProperty(descriptor);
}
}

View File

@ -196,6 +196,11 @@
<artifactId>nifi-html-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-services-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-poi-nar</artifactId>

View File

@ -103,7 +103,12 @@ public class StandardFieldValue implements FieldValue {
public void updateValue(final Object newValue) {
final Optional<Record> parentRecord = getParentRecord();
if (!parentRecord.isPresent()) {
throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record");
if (value instanceof Record) {
((Record) value).setValue(getField().getFieldName(), newValue);
return;
} else {
throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record");
}
}
parentRecord.get().setValue(getField().getFieldName(), newValue);

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
public abstract class AbstractRecordSetWriter implements RecordSetWriter {
private final OutputStream out;
private int recordCount = 0;
private boolean activeRecordSet = false;
public AbstractRecordSetWriter(final OutputStream out) {
this.out = out;
}
@Override
public void close() throws IOException {
this.out.close();
}
@Override
public WriteResult write(final RecordSet recordSet) throws IOException {
beginRecordSet();
Record record;
while ((record = recordSet.next()) != null) {
write(record);
recordCount++;
}
return finishRecordSet();
}
protected OutputStream getOutputStream() {
return out;
}
protected final int getRecordCount() {
return recordCount;
}
protected final boolean isRecordSetActive() {
return activeRecordSet;
}
@Override
public final void beginRecordSet() throws IOException {
if (activeRecordSet) {
throw new IllegalStateException("Cannot begin a RecordSet because a RecordSet has already begun");
}
activeRecordSet = true;
onBeginRecordSet();
}
@Override
public final WriteResult finishRecordSet() throws IOException {
if (!isRecordSetActive()) {
throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun");
}
final Map<String, String> attributes = onFinishRecordSet();
return WriteResult.of(recordCount, attributes == null ? Collections.emptyMap() : attributes);
}
/**
* Method that is called as a result of {@link #beginRecordSet()} being called. This gives subclasses
* the chance to react to a new RecordSet beginning but prevents the subclass from changing how this
* implementation maintains its internal state. By default, this method does nothing.
*
* @throws IOException if unable to write the necessary data for a new RecordSet
*/
protected void onBeginRecordSet() throws IOException {
}
/**
* Method that is called by {@link #finishRecordSet()} when a RecordSet is finished. This gives subclasses
* the chance to react to a RecordSet being completed but prevents the subclass from changing how this
* implementation maintains its internal state.
*
* @return a Map of key/value pairs that should be added to the FlowFile as attributes
*/
protected Map<String, String> onFinishRecordSet() throws IOException {
return Collections.emptyMap();
}
}

View File

@ -37,9 +37,26 @@ public interface RecordSetWriter extends RecordWriter {
* Writes the given result set to the given output stream
*
* @param recordSet the record set to serialize
* @param out the OutputStream to write to
*
* @return the results of writing the data
* @throws IOException if unable to write to the given OutputStream
*/
WriteResult write(RecordSet recordSet, OutputStream out) throws IOException;
WriteResult write(RecordSet recordSet) throws IOException;
/**
* Begins a new RecordSet
*
* @throws IOException if unable to write to the underlying OutputStream
* @throws IllegalStateException if a RecordSet has already been started
*/
void beginRecordSet() throws IOException;
/**
* Finishes the currently active RecordSet and returns a WriteResult that includes information about what was written
*
* @return the results of writing the data
* @throws IOException if unable to write to the given OutputStream
* @throws IllegalStateException if a RecordSet has not been started via {@link #beginRecordSet()}
*/
WriteResult finishRecordSet() throws IOException;
}

View File

@ -17,21 +17,20 @@
package org.apache.nifi.serialization;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.nifi.serialization.record.Record;
public interface RecordWriter {
public interface RecordWriter extends Closeable {
/**
* Writes the given result set to the given output stream
*
* @param record the record set to serialize
* @param out the OutputStream to write to
* @return the results of writing the data
* @throws IOException if unable to write to the given OutputStream
*/
WriteResult write(Record record, OutputStream out) throws IOException;
WriteResult write(Record record) throws IOException;
/**
* @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using

View File

@ -188,12 +188,14 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0));
final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema);
final StopWatch stopWatch = new StopWatch(true);
// use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure
child = session.create(originalFlowFile);
final FlowFile writableFlowFile = child;
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
child = session.write(child, (final OutputStream rawOut) -> {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
@ -212,7 +214,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
}
};
writeResult.set(recordSetWriter.write(recordSet, out));
try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
writeResult.set(recordSetWriter.write(recordSet));
mimeTypeRef.set(recordSetWriter.getMimeType());
}
} catch (Exception e) {
exceptionHolder.set(e);
}
@ -230,7 +235,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
successFlowFile = session.putAllAttributes(successFlowFile, attributes);
final URI uri = path.toUri();

View File

@ -42,7 +42,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
private final int failAfterN;
private int failAfterN;
public MockRecordParser() {
this(-1);
@ -52,6 +52,9 @@ public class MockRecordParser extends AbstractControllerService implements Recor
this.failAfterN = failAfterN;
}
public void failAfter(final int failAfterN) {
this.failAfterN = failAfterN;
}
public void addSchemaField(final String fieldName, final RecordFieldType type) {
fields.add(new RecordField(fieldName, type.getDataType()));

View File

@ -36,6 +36,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
private final int failAfterN;
private final boolean quoteValues;
public MockRecordWriter() {
this(null);
}
public MockRecordWriter(final String header) {
this(header, true, -1);
}
@ -56,12 +60,16 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new RecordSetWriter() {
private int recordCount = 0;
@Override
public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
out.write(header.getBytes());
out.write("\n".getBytes());
public WriteResult write(final RecordSet rs) throws IOException {
if (header != null) {
out.write(header.getBytes());
out.write("\n".getBytes());
}
int recordCount = 0;
Record record = null;
@ -75,14 +83,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
int i = 0;
for (final String fieldName : record.getSchema().getFieldNames()) {
final String val = record.getAsString(fieldName);
if (quoteValues) {
out.write("\"".getBytes());
if (val != null) {
if (val != null) {
if (quoteValues) {
out.write("\"".getBytes());
out.write(val.getBytes());
out.write("\"".getBytes());
} else {
out.write(val.getBytes());
}
out.write("\"".getBytes());
} else if (val != null) {
out.write(val.getBytes());
}
if (i++ < numCols - 1) {
@ -101,20 +109,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public WriteResult write(Record record, OutputStream out) throws IOException {
out.write(header.getBytes());
out.write("\n".getBytes());
public WriteResult write(Record record) throws IOException {
if (header != null) {
out.write(header.getBytes());
out.write("\n".getBytes());
}
final int numCols = record.getSchema().getFieldCount();
int i = 0;
for (final String fieldName : record.getSchema().getFieldNames()) {
final String val = record.getAsString(fieldName);
if (quoteValues) {
out.write("\"".getBytes());
out.write(val.getBytes());
out.write("\"".getBytes());
} else {
out.write(val.getBytes());
if (val != null) {
if (quoteValues) {
out.write("\"".getBytes());
out.write(val.getBytes());
out.write("\"".getBytes());
} else {
out.write(val.getBytes());
}
}
if (i++ < numCols - 1) {
@ -123,8 +135,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
out.write("\n".getBytes());
recordCount++;
return WriteResult.of(1, Collections.emptyMap());
}
@Override
public void close() throws IOException {
out.close();
}
@Override
public void beginRecordSet() throws IOException {
}
@Override
public WriteResult finishRecordSet() throws IOException {
return WriteResult.of(recordCount, Collections.emptyMap());
}
};
}
}

View File

@ -2460,19 +2460,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
writeRecursionSet.remove(sourceFlowFile);
final long bytesWritten = countingOut.getBytesWritten();
if (!closed) {
StandardProcessSession.this.bytesWritten += bytesWritten;
closed = true;
}
StandardProcessSession.this.bytesWritten += bytesWritten;
openOutputStreams.remove(sourceFlowFile);
flush();
removeTemporaryClaim(record);
flush();
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
.fromFlowFile(record.getCurrent())
.contentClaim(updatedClaim)

View File

@ -413,16 +413,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
FlowFile flowFile = session.create();
try {
final RecordSetWriter writer;
final RecordSchema schema;
try {
final RecordSchema schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
writer = writerFactory.createWriter(logger, schema);
schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
} catch (final Exception e) {
logger.error(
"Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "
+ "Record Writer cannot obtain the appropriate Schema, due to failure to connect to a remote Schema Registry "
+ "or due to the Schema Access Strategy being dependent upon FlowFile Attributes that are not available. "
+ "Will roll back the Kafka message offsets.", e);
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
try {
rollback(topicPartition);
@ -436,6 +432,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final FlowFile ff = flowFile;
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
flowFile = session.write(flowFile, rawOut -> {
final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
@ -479,15 +476,28 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
};
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
writeResult.set(writer.write(recordSet, out));
try (final OutputStream out = new BufferedOutputStream(rawOut);
final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) {
writeResult.set(writer.write(recordSet));
mimeTypeRef.set(writer.getMimeType());
} catch (final Exception e) {
logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e);
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
yield();
throw new ProcessException(e);
}
});
final WriteResult result = writeResult.get();
if (result.getRecordCount() > 0) {
final Map<String, String> attributes = new HashMap<>(result.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
attributes.put("record.count", String.valueOf(result.getRecordCount()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));

View File

@ -58,7 +58,6 @@ import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.RecordWriter;
import org.apache.nifi.serialization.record.RecordSchema;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
@ -324,14 +323,13 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final RecordWriter writer;
try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(flowFile, in);
final RecordSchema schema;
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
writer = writerFactory.createWriter(getLogger(), schema);
try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
schema = writerFactory.getSchema(flowFile, in);
} catch (final Exception e) {
getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e});
getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
continue;
}
@ -342,7 +340,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger());
lease.publish(flowFile, reader, writer, messageKeyField, topic);
lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);
}

View File

@ -32,9 +32,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
@ -93,7 +96,8 @@ public class PublisherLease implements Closeable {
}
}
void publish(final FlowFile flowFile, final RecordReader reader, final RecordWriter writer, final String messageKeyField, final String topic) throws IOException {
void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
final String messageKeyField, final String topic) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker();
}
@ -104,11 +108,11 @@ public class PublisherLease implements Closeable {
final RecordSet recordSet = reader.createRecordSet();
int recordCount = 0;
try {
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) {
while ((record = recordSet.next()) != null) {
recordCount++;
baos.reset();
writer.write(record, baos);
writer.write(record);
final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
@ -127,6 +131,8 @@ public class PublisherLease implements Closeable {
}
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
} catch (final SchemaNotFoundException snfe) {
throw new IOException(snfe);
} catch (final Exception e) {
tracker.fail(flowFile, e);
poison();

View File

@ -44,8 +44,8 @@ import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.RecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -65,7 +65,8 @@ public class TestPublishKafkaRecord_0_10 {
public void setup() throws InitializationException, IOException {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), any(String.class), any(String.class));
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class),
any(RecordSchema.class), any(String.class), any(String.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
@ -103,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@ -122,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@ -137,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@ -154,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@ -176,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
@ -206,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@ -240,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();

View File

@ -58,10 +58,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new RecordSetWriter() {
@Override
public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
public WriteResult write(final RecordSet rs) throws IOException {
out.write(header.getBytes());
out.write("\n".getBytes());
@ -102,7 +102,20 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public WriteResult write(Record record, OutputStream out) throws IOException {
public WriteResult write(Record record) throws IOException {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public void beginRecordSet() throws IOException {
}
@Override
public WriteResult finishRecordSet() throws IOException {
return null;
}
};

View File

@ -217,11 +217,11 @@ public class FetchParquetTest {
configure(proc);
final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
when(recordSetWriter.write(any(RecordSet.class), any(OutputStream.class))).thenThrow(new IOException("IOException"));
when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException"));
final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class))).thenReturn(recordSetWriter);
when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(FlowFile.class), any(OutputStream.class))).thenReturn(recordSetWriter);
testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory);
testRunner.enableControllerService(recordSetWriterFactory);

View File

@ -18,9 +18,11 @@
package org.apache.nifi.schemaregistry.services;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class AvroSchemaValidator implements Validator {
@ -36,7 +38,8 @@ public class AvroSchemaValidator implements Validator {
}
try {
new Schema.Parser().parse(input);
final Schema avroSchema = new Schema.Parser().parse(input);
AvroTypeUtil.createSchema(avroSchema, input, SchemaIdentifier.EMPTY);
return new ValidationResult.Builder()
.input(input)

View File

@ -34,6 +34,7 @@ import javax.script.Invocable;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
@ -52,15 +53,12 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
super.onEnabled(context);
}
public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException {
return createWriter(logger, getSchema(flowFile, in));
}
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
if (recordFactory.get() != null) {
try {
return recordFactory.get().createWriter(logger, schema);
return recordFactory.get().createWriter(logger, schema, flowFile, out);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}
@ -131,6 +129,7 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
}
} catch (final Exception ex) {
ex.printStackTrace();
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + ex.getLocalizedMessage();

View File

@ -104,7 +104,9 @@ class ScriptedRecordSetWriterTest {
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
def schema = recordSetWriterFactory.getSchema(mockFlowFile, inStream)
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema)
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream)
assertNotNull(recordSetWriter)
def recordSchema = new SimpleRecordSchema(
@ -119,8 +121,7 @@ class ScriptedRecordSetWriterTest {
new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
] as MapRecord[]
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
recordSetWriter.write(RecordSet.of(recordSchema, records), outputStream)
recordSetWriter.write(RecordSet.of(recordSchema, records))
def xml = new XmlSlurper().parseText(outputStream.toString())
assertEquals('1', xml.record[0].id.toString())

View File

@ -34,9 +34,15 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream
class GroovyRecordSetWriter implements RecordSetWriter {
private int recordCount = 0;
private final OutputStream out;
public GroovyRecordSetWriter(final OutputStream out) {
this.out = out;
}
@Override
WriteResult write(Record r, OutputStream out) throws IOException {
WriteResult write(Record r) throws IOException {
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
@ -44,7 +50,9 @@ class GroovyRecordSetWriter implements RecordSetWriter {
}
}
}
WriteResult.of(0, [:])
recordCount++;
WriteResult.of(1, [:])
}
@Override
@ -53,10 +61,10 @@ class GroovyRecordSetWriter implements RecordSetWriter {
}
@Override
WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
WriteResult write(final RecordSet rs) throws IOException {
int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(rawOut)).with {osw ->
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
@ -73,6 +81,18 @@ class GroovyRecordSetWriter implements RecordSetWriter {
}
WriteResult.of(count, [:])
}
public void beginRecordSet() throws IOException {
}
@Override
public WriteResult finishRecordSet() throws IOException {
return WriteResult.of(recordCount, [:]);
}
@Override
public void close() throws IOException {
}
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@ -83,9 +103,10 @@ class GroovyRecordSetWriterFactory extends AbstractControllerService implements
}
@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException {
return new GroovyRecordSetWriter()
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
return new GroovyRecordSetWriter(out)
}
}
writer = new GroovyRecordSetWriterFactory()

View File

@ -40,6 +40,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>

View File

@ -27,7 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@ -101,26 +101,27 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSetWriter writer;
final RecordSchema writeSchema;
try (final InputStream rawIn = session.read(flowFile);
final InputStream in = new BufferedInputStream(rawIn)) {
writeSchema = writerFactory.getSchema(flowFile, in);
writer = writerFactory.createWriter(getLogger(), writeSchema);
} catch (final Exception e) {
getLogger().error("Failed to convert records for {}; will route to failure", new Object[] {flowFile, e});
getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger();
final FlowFile original = flowFile;
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out);
final RecordSet recordSet = new RecordSet() {
@Override
@ -151,8 +152,11 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
}
};
final WriteResult writeResult = writer.write(recordSet, out);
writeResultRef.set(writeResult);
final WriteResult writeResult = writer.write(recordSet);
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
recordCount.set(writeResult.getRecordCount());
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
@ -160,22 +164,17 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
}
});
} catch (final Exception e) {
getLogger().error("Failed to convert {}", new Object[] {flowFile, e});
getLogger().error("Failed to process {}", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final WriteResult writeResult = writeResultRef.get();
final Map<String, String> attributes = new HashMap<>();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile});
final int count = recordCount.get();
session.adjustCounter("Records Processed", count, false);
getLogger().info("Successfully converted {} records for {}", new Object[] {count, flowFile});
}
protected abstract Record process(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context);

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;
public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
+ "the unchanged FlowFile will be routed to this relationship")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("Once a FlowFile has been processed and any derivative FlowFiles have been transferred, the original FlowFile will be transferred to this relationship.")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
if (isRouteOriginal()) {
relationships.add(REL_ORIGINAL);
}
relationships.add(REL_FAILURE);
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final T flowFileContext = getFlowFileContext(flowFile, context);
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema writeSchema;
try (final InputStream rawIn = session.read(flowFile);
final InputStream in = new BufferedInputStream(rawIn)) {
writeSchema = writerFactory.getSchema(flowFile, in);
} catch (final Exception e) {
getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final AtomicInteger numRecords = new AtomicInteger(0);
final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
final FlowFile original = flowFile;
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
numRecords.incrementAndGet();
for (final Relationship relationship : relationships) {
final RecordSetWriter recordSetWriter;
Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
if (tuple == null) {
FlowFile outFlowFile = session.create(original);
final OutputStream out = session.write(outFlowFile);
recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, original, out);
recordSetWriter.beginRecordSet();
tuple = new Tuple<>(outFlowFile, recordSetWriter);
writers.put(relationship, tuple);
} else {
recordSetWriter = tuple.getValue();
}
recordSetWriter.write(record);
}
}
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
}
}
});
for (final Map.Entry<Relationship, Tuple<FlowFile, RecordSetWriter>> entry : writers.entrySet()) {
final Relationship relationship = entry.getKey();
final Tuple<FlowFile, RecordSetWriter> tuple = entry.getValue();
final RecordSetWriter writer = tuple.getValue();
FlowFile childFlowFile = tuple.getKey();
final WriteResult writeResult = writer.finishRecordSet();
try {
writer.close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
}
final Map<String, String> attributes = new HashMap<>();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
childFlowFile = session.putAllAttributes(childFlowFile, attributes);
session.transfer(childFlowFile, relationship);
session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false);
session.getProvenanceReporter().route(childFlowFile, relationship);
}
} catch (final Exception e) {
getLogger().error("Failed to process {}", new Object[] {flowFile, e});
for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
try {
tuple.getValue().close();
} catch (final Exception e1) {
getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", new Object[] {tuple.getKey()});
}
session.remove(tuple.getKey());
}
session.transfer(flowFile, REL_FAILURE);
return;
} finally {
for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
final RecordSetWriter writer = tuple.getValue();
try {
writer.close();
} catch (final Exception e) {
getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[] {tuple.getKey(), e});
}
}
}
if (isRouteOriginal()) {
flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(numRecords));
session.transfer(flowFile, REL_ORIGINAL);
} else {
session.remove(flowFile);
}
getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[] {flowFile, writers.size(), numRecords});
}
protected abstract Set<Relationship> route(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context, T flowFileContext);
protected abstract boolean isRouteOriginal();
protected abstract T getFlowFileContext(FlowFile flowFile, ProcessContext context);
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.Tuple;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
})
@Tags({"lookup", "enrich", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
@CapabilityDescription("Extracts a field from a Record and looks up its value in a LookupService. If a result is returned by the LookupService, "
+ "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then "
+ "routed to either the 'matched' relationship or 'unmatched' relationship, indicating whether or not a result was returned by the LookupService, "
+ "allowing the processor to also function as a Routing processor. If any record in the incoming FlowFile has multiple fields match the configured "
+ "Lookup RecordPath or if no fields match, then that record will be routed to failure. If one or more fields match the Result RecordPath, all fields "
+ "that match will be updated.")
@SeeAlso({ConvertRecord.class, SplitRecord.class})
public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPath>> {
private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
private volatile LookupService<?> lookupService;
static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
.name("lookup-service")
.displayName("Lookup Service")
.description("The Lookup Service to use in order to lookup a value in each Record")
.identifiesControllerService(LookupService.class)
.required(true)
.build();
static final PropertyDescriptor LOOKUP_RECORD_PATH = new PropertyDescriptor.Builder()
.name("lookup-record-path")
.displayName("Lookup RecordPath")
.description("A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(true)
.required(true)
.build();
static final PropertyDescriptor RESULT_RECORD_PATH = new PropertyDescriptor.Builder()
.name("result-record-path")
.displayName("Result RecordPath")
.description("A RecordPath that points to the field whose value should be updated with whatever value is returned from the Lookup Service. "
+ "If not specified, the value that is returned from the Lookup Service will be ignored, except for determining whether the FlowFile should "
+ "be routed to the 'matched' or 'unmatched' Relationship.")
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(true)
.required(false)
.build();
static final Relationship REL_MATCHED = new Relationship.Builder()
.name("matched")
.description("All records for which the lookup returns a value will be routed to this relationship")
.build();
static final Relationship REL_UNMATCHED = new Relationship.Builder()
.name("unmatched")
.description("All records for which the lookup does not have a matching value will be routed to this relationship")
.build();
private static final Set<Relationship> MATCHED_COLLECTION = Collections.singleton(REL_MATCHED);
private static final Set<Relationship> UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED);
private static final Set<Relationship> FAILURE_COLLECTION = Collections.singleton(REL_FAILURE);
@OnScheduled
public void onScheduled(final ProcessContext context) {
this.lookupService = context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class);
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_MATCHED);
relationships.add(REL_UNMATCHED);
relationships.add(REL_FAILURE);
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.addAll(super.getSupportedPropertyDescriptors());
properties.add(LOOKUP_SERVICE);
properties.add(LOOKUP_RECORD_PATH);
properties.add(RESULT_RECORD_PATH);
return properties;
}
@Override
protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
final Tuple<RecordPath, RecordPath> flowFileContext) {
final RecordPathResult lookupPathResult = flowFileContext.getKey().evaluate(record);
final List<FieldValue> lookupFieldValues = lookupPathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList());
if (lookupFieldValues.isEmpty()) {
getLogger().error("Lookup RecordPath did not match any fields in a record for {}; routing record to failure", new Object[] {flowFile});
return FAILURE_COLLECTION;
}
if (lookupFieldValues.size() > 1) {
getLogger().error("Lookup RecordPath matched {} fields in a record for {}; routing record to failure", new Object[] {lookupFieldValues.size(), flowFile});
return FAILURE_COLLECTION;
}
final FieldValue fieldValue = lookupFieldValues.get(0);
final String lookupKey = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
final Optional<?> lookupValue;
try {
lookupValue = lookupService.lookup(lookupKey);
} catch (final Exception e) {
getLogger().error("Failed to lookup value '{}' in Lookup Service for a record in {}; routing record to failure", new Object[] {lookupKey, flowFile, e});
return Collections.singleton(REL_FAILURE);
}
if (!lookupValue.isPresent()) {
return UNMATCHED_COLLECTION;
}
// Ensure that the Record has the appropriate schema to account for the newly added values
final RecordPath resultPath = flowFileContext.getValue();
if (resultPath != null) {
record.incorporateSchema(writeSchema);
final Object replacementValue = lookupValue.get();
final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
}
return MATCHED_COLLECTION;
}
@Override
protected boolean isRouteOriginal() {
return false;
}
@Override
protected Tuple<RecordPath, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) {
final String lookupPathText = context.getProperty(LOOKUP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath lookupRecordPath = recordPathCache.getCompiled(lookupPathText);
final RecordPath resultRecordPath;
if (context.getProperty(RESULT_RECORD_PATH).isSet()) {
final String resultPathText = context.getProperty(RESULT_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
resultRecordPath = recordPathCache.getCompiled(resultPathText);
} else {
resultRecordPath = null;
}
return new Tuple<>(lookupRecordPath, resultRecordPath);
}
}

View File

@ -76,6 +76,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
@ -248,15 +249,20 @@ public class QueryRecord extends AbstractProcessor {
final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
final Set<FlowFile> createdFlowFiles = new HashSet<>();
final RecordSchema recordSchema;
try (final InputStream rawIn = session.read(original);
final InputStream in = new BufferedInputStream(rawIn)) {
recordSchema = resultSetWriterFactory.getSchema(original, in);
} catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
return;
}
int recordsRead = 0;
try {
final RecordSetWriter resultSetWriter;
try (final InputStream rawIn = session.read(original);
final InputStream in = new BufferedInputStream(rawIn)) {
resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), resultSetWriterFactory.getSchema(original, in));
}
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) {
continue;
@ -280,14 +286,17 @@ public class QueryRecord extends AbstractProcessor {
queryResult = query(session, original, sql, context, recordParserFactory);
}
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
final FlowFile outFlowFile = transformed;
try {
final ResultSet rs = queryResult.getResultSet();
transformed = session.write(transformed, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs);
writeResultRef.set(resultSetWriter.write(recordSet, out));
try (final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) {
final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs);
writeResultRef.set(resultSetWriter.write(resultSet));
mimeTypeRef.set(resultSetWriter.getMimeType());
} catch (final Exception e) {
throw new IOException(e);
}
@ -310,7 +319,7 @@ public class QueryRecord extends AbstractProcessor {
attributesToAdd.putAll(result.getAttributes());
}
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), resultSetWriter.getMimeType());
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
transformed = session.putAllAttributes(transformed, attributesToAdd);
transformedFlowFiles.put(transformed, relationship);

View File

@ -58,6 +58,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
@EventDriven
@ -133,10 +134,10 @@ public class SplitRecord extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSetWriter writer;
final RecordSchema schema;
try (final InputStream rawIn = session.read(original);
final InputStream in = new BufferedInputStream(rawIn)) {
writer = writerFactory.createWriter(getLogger(), writerFactory.getSchema(original, in));
schema = writerFactory.getSchema(original, in);
} catch (final Exception e) {
getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
@ -159,28 +160,26 @@ public class SplitRecord extends AbstractProcessor {
FlowFile split = session.create(original);
try {
final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
split = session.write(split, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
final Map<String, String> attributes = new HashMap<>();
final WriteResult writeResult;
try (final OutputStream out = session.write(split);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, split, out)) {
if (maxRecords == 1) {
final Record record = pushbackSet.next();
writeResultRef.set(writer.write(record, out));
writeResult = writer.write(record);
} else {
final RecordSet limitedSet = pushbackSet.limit(maxRecords);
writeResultRef.set(writer.write(limitedSet, out));
writeResult = writer.write(limitedSet);
}
}
});
final WriteResult writeResult = writeResultRef.get();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
final Map<String, String> attributes = new HashMap<>();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
}
session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
split = session.putAllAttributes(split, attributes);
} finally {
splits.add(split);

View File

@ -59,6 +59,7 @@ org.apache.nifi.processors.standard.ListenUDP
org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.LogMessage
org.apache.nifi.processors.standard.LookupRecord
org.apache.nifi.processors.standard.MergeContent
org.apache.nifi.processors.standard.ModifyBytes
org.apache.nifi.processors.standard.MonitorActivity

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestLookupRecord {
private TestRunner runner;
private MapLookup lookupService;
private MockRecordParser recordReader;
private MockRecordWriter recordWriter;
@Before
public void setup() throws InitializationException {
recordReader = new MockRecordParser();
recordWriter = new MockRecordWriter(null, false);
lookupService = new MapLookup();
runner = TestRunners.newTestRunner(LookupRecord.class);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, null);
recordReader.addRecord("Jane Doe", 47, null);
recordReader.addRecord("Jimmy Doe", 14, null);
}
@Test
public void testAllMatch() throws InitializationException {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,Soccer\nJane Doe,47,Basketball\nJimmy Doe,14,Football\n");
}
@Test
public void testAllUnmatched() throws InitializationException {
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testMixtureOfMatch() throws InitializationException {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("");
runner.run();
runner.assertTransferCount(LookupRecord.REL_FAILURE, 0);
runner.assertTransferCount(LookupRecord.REL_MATCHED, 1);
runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile matched = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
matched.assertAttributeEquals("record.count", "2");
matched.assertAttributeEquals("mime.type", "text/plain");
matched.assertContentEquals("John Doe,48,Soccer\nJimmy Doe,14,Football\n");
final MockFlowFile unmatched = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
unmatched.assertAttributeEquals("record.count", "1");
unmatched.assertAttributeEquals("mime.type", "text/plain");
unmatched.assertContentEquals("Jane Doe,47,\n");
}
@Test
public void testResultPathNotFound() throws InitializationException {
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/other");
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testLookupPathNotFound() throws InitializationException {
runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/other");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testUnparseableData() throws InitializationException {
recordReader.failAfter(1);
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
out.assertAttributeNotExists("record.count");
out.assertContentEquals("");
}
@Test
public void testNoResultPath() throws InitializationException {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.removeProperty(LookupRecord.RESULT_RECORD_PATH);
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testMultipleLookupPaths() throws InitializationException {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/*");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
private static class MapLookup extends AbstractControllerService implements StringLookupService {
private final Map<String, String> values = new HashMap<>();
public void addValue(final String key, final String value) {
values.put(key, value);
}
@Override
public Class<?> getValueType() {
return String.class;
}
@Override
public Optional<String> lookup(final String key) {
return Optional.ofNullable(values.get(key));
}
}
}

View File

@ -269,10 +269,10 @@ public class TestQueryRecord {
}
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new RecordSetWriter() {
@Override
public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
public WriteResult write(final RecordSet rs) throws IOException {
final int colCount = rs.getSchema().getFieldCount();
Assert.assertEquals(columnNames.size(), colCount);
@ -299,9 +299,23 @@ public class TestQueryRecord {
}
@Override
public WriteResult write(Record record, OutputStream out) throws IOException {
public WriteResult write(Record record) throws IOException {
return null;
}
@Override
public void close() throws IOException {
out.close();
}
@Override
public void beginRecordSet() throws IOException {
}
@Override
public WriteResult finishRecordSet() throws IOException {
return WriteResult.EMPTY;
}
};
}

View File

@ -0,0 +1,36 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lookup-service-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup;
public class LookupFailureException extends Exception {
public LookupFailureException() {
super();
}
public LookupFailureException(final String message) {
super(message);
}
public LookupFailureException(final String message, final Throwable cause) {
super(message, cause);
}
public LookupFailureException(final Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup;
import java.util.Optional;
import org.apache.nifi.controller.ControllerService;
public interface LookupService<T> extends ControllerService {
/**
* Looks up a value that corresponds to the given key
*
* @param key the key to lookup
* @return a value that corresponds to the given key
*
* @throws if unable to lookup a value for the given key
*/
Optional<T> lookup(String key) throws LookupFailureException;
/**
* @return the Class that represents the type of value that will be returned by {@link #lookup(String)}
*/
Class<?> getValueType();
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup;
import java.util.Optional;
import org.apache.nifi.serialization.record.Record;
public interface RecordLookupService extends LookupService<Record> {
/**
* Returns an Optional Record that corresponds to the given key
*
* @param key the key to lookup
* @return an Optional Record that corresponds to the given key
*
* @throws if unable to lookup a value for the given key
*/
@Override
Optional<Record> lookup(String key) throws LookupFailureException;
@Override
default Class<?> getValueType() {
return Record.class;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup;
import java.util.Optional;
public interface StringLookupService extends LookupService<String> {
/**
* Returns an Optional value that corresponds to the given key
*
* @param key the key to lookup
* @return an Optional String that represents the value for the given key
*
* @throws if unable to lookup a value for the given key
*/
@Override
Optional<String> lookup(String key);
@Override
default Class<?> getValueType() {
return String.class;
}
}

View File

@ -0,0 +1,35 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-services-bundle</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lookup-services-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-services</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,73 @@
nifi-lookup-services-nar
Copyright 2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache HttpComponents
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2014 The Apache Software Foundation
Apache HttpCore
Copyright 2005-2014 The Apache Software Foundation
This project contains annotations derived from JCIP-ANNOTATIONS
Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Commons Net
The following NOTICE information applies:
Apache Commons Net
Copyright 2001-2016 The Apache Software Foundation
(ASLv2) GeoIP2 Java API
The following NOTICE information applies:
GeoIP2 Java API
This software is Copyright (c) 2013 by MaxMind, Inc.
************************
Creative Commons Attribution-ShareAlike 3.0
************************
The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details.
(CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)

View File

@ -0,0 +1,51 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
this file to You under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-services-bundle</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lookup-services</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@Tags({"lookup", "enrich", "key", "value"})
@CapabilityDescription("Allows users to add key/value pairs as User-defined Properties. Each property that is added can be looked up by Property Name.")
public class SimpleKeyValueLookupService extends AbstractControllerService implements StringLookupService {
private volatile Map<String, String> lookupValues = new HashMap<>();
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.dynamic(true)
.addValidator(Validator.VALID)
.build();
}
@OnEnabled
public void cacheConfiguredValues(final ConfigurationContext context) {
lookupValues = context.getProperties().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> context.getProperty(entry.getKey()).getValue()));
}
@Override
public Optional<String> lookup(final String key) {
return Optional.ofNullable(lookupValues.get(key));
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.maxmind;
import java.util.Arrays;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
public class AnonymousIpSchema {
static final RecordField ANONYMOUS = new RecordField("anonymous", RecordFieldType.BOOLEAN.getDataType());
static final RecordField ANONYMOUS_VPN = new RecordField("anonymousVpn", RecordFieldType.BOOLEAN.getDataType());
static final RecordField HOSTING_PROVIDER = new RecordField("hostingProvider", RecordFieldType.BOOLEAN.getDataType());
static final RecordField PUBLIC_PROXY = new RecordField("publicProxy", RecordFieldType.BOOLEAN.getDataType());
static final RecordField TOR_EXIT_NODE = new RecordField("torExitNode", RecordFieldType.BOOLEAN.getDataType());
static final RecordSchema ANONYMOUS_IP_SCHEMA = new SimpleRecordSchema(Arrays.asList(ANONYMOUS, ANONYMOUS_VPN, HOSTING_PROVIDER, PUBLIC_PROXY, TOR_EXIT_NODE));
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.maxmind;
import java.util.Arrays;
import java.util.List;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
public class CitySchema {
static final RecordField SUBDIVISION_NAME = new RecordField("name", RecordFieldType.STRING.getDataType());
static final RecordField SUBDIVISION_ISO = new RecordField("isoCode", RecordFieldType.STRING.getDataType());
private static final List<RecordField> SUBDIVISION_FIELDS = Arrays.asList(SUBDIVISION_NAME, SUBDIVISION_ISO);
static final RecordSchema SUBDIVISION_SCHEMA = new SimpleRecordSchema(SUBDIVISION_FIELDS);
static final DataType SUBDIVISION_DATA_TYPE = RecordFieldType.RECORD.getRecordDataType(SUBDIVISION_SCHEMA);
static final RecordField COUNTRY_NAME = new RecordField("name", RecordFieldType.STRING.getDataType());
static final RecordField COUNTRY_ISO = new RecordField("isoCode", RecordFieldType.STRING.getDataType());
private static final List<RecordField> COUNTRY_FIELDS = Arrays.asList(COUNTRY_NAME, COUNTRY_ISO);
static final RecordSchema COUNTRY_SCHEMA = new SimpleRecordSchema(COUNTRY_FIELDS);
static final RecordField CITY = new RecordField("city", RecordFieldType.STRING.getDataType());
static final RecordField ACCURACY = new RecordField("accuracy", RecordFieldType.INT.getDataType());
static final RecordField METRO_CODE = new RecordField("metroCode", RecordFieldType.INT.getDataType());
static final RecordField TIMEZONE = new RecordField("timeZone", RecordFieldType.STRING.getDataType());
static final RecordField LATITUDE = new RecordField("latitude", RecordFieldType.DOUBLE.getDataType());
static final RecordField LONGITUDE = new RecordField("longitude", RecordFieldType.DOUBLE.getDataType());
static final RecordField SUBDIVISIONS = new RecordField("subdivisions", RecordFieldType.ARRAY.getArrayDataType(SUBDIVISION_DATA_TYPE));
static final RecordField COUNTRY = new RecordField("country", RecordFieldType.RECORD.getRecordDataType(COUNTRY_SCHEMA));
static final RecordField CONTINENT = new RecordField("continent", RecordFieldType.STRING.getDataType());
static final RecordField POSTALCODE = new RecordField("postalCode", RecordFieldType.STRING.getDataType());
private static final List<RecordField> GEO_FIELDS = Arrays.asList(CITY, ACCURACY, LATITUDE, LONGITUDE, SUBDIVISIONS, COUNTRY, POSTALCODE);
static final RecordSchema GEO_SCHEMA = new SimpleRecordSchema(GEO_FIELDS);
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.maxmind;
import java.util.Arrays;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
public class ContainerSchema {
static final RecordField GEO = new RecordField("geo", RecordFieldType.RECORD.getRecordDataType(CitySchema.GEO_SCHEMA));
static final RecordField ISP = new RecordField("isp", RecordFieldType.RECORD.getRecordDataType(IspSchema.ISP_SCHEMA));
static final RecordField DOMAIN_NAME = new RecordField("domainName", RecordFieldType.STRING.getDataType());
static final RecordField CONNECTION_TYPE = new RecordField("connectionType", RecordFieldType.STRING.getDataType());
static final RecordField ANONYMOUS_IP = new RecordField("anonymousIp", RecordFieldType.RECORD.getRecordDataType(AnonymousIpSchema.ANONYMOUS_IP_SCHEMA));
static final RecordSchema CONTAINER_SCHEMA = new SimpleRecordSchema(Arrays.asList(GEO, ISP, DOMAIN_NAME, CONNECTION_TYPE, ANONYMOUS_IP));
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.maxmind;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.maxmind.db.Metadata;
import com.maxmind.db.Reader;
import com.maxmind.db.Reader.FileMode;
import com.maxmind.geoip2.GeoIp2Provider;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.AnonymousIpResponse;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.ConnectionTypeResponse;
import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.model.DomainResponse;
import com.maxmind.geoip2.model.IspResponse;
/**
* <p>
* This class was copied from https://raw.githubusercontent.com/maxmind/GeoIP2-java/master/src/main/java/com/maxmind/geoip2/DatabaseReader.java It is written by Maxmind and it is available under
* Apache Software License V2
*
* The modification we're making to the code below is to stop using exceptions for mainline flow control. Specifically we don't want to throw an exception simply because an address was not found.
* </p>
*
* Instances of this class provide a reader for the GeoIP2 database format. IP addresses can be looked up using the <code>get</code> method.
*/
public class DatabaseReader implements GeoIp2Provider, Closeable {
private final Reader reader;
private final ObjectMapper om;
private DatabaseReader(Builder builder) throws IOException {
if (builder.stream != null) {
this.reader = new Reader(builder.stream);
} else if (builder.database != null) {
this.reader = new Reader(builder.database, builder.mode);
} else {
// This should never happen. If it does, review the Builder class
// constructors for errors.
throw new IllegalArgumentException("Unsupported Builder configuration: expected either File or URL");
}
this.om = new ObjectMapper();
this.om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
this.om.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
InjectableValues inject = new InjectableValues.Std().addValue("locales", builder.locales);
this.om.setInjectableValues(inject);
}
/**
* <p>
* Constructs a Builder for the DatabaseReader. The file passed to it must be a valid GeoIP2 database file.
* </p>
* <p>
* <code>Builder</code> creates instances of <code>DatabaseReader</code> from values set by the methods.
* </p>
* <p>
* Only the values set in the <code>Builder</code> constructor are required.
* </p>
*/
public final static class Builder {
final File database;
final InputStream stream;
List<String> locales = Arrays.asList("en");
FileMode mode = FileMode.MEMORY_MAPPED;
/**
* @param stream the stream containing the GeoIP2 database to use.
*/
public Builder(InputStream stream) {
this.stream = stream;
this.database = null;
}
/**
* @param database the GeoIP2 database file to use.
*/
public Builder(File database) {
this.database = database;
this.stream = null;
}
/**
* @param val List of locale codes to use in name property from most preferred to least preferred.
* @return Builder object
*/
public Builder locales(List<String> val) {
this.locales = val;
return this;
}
/**
* @param val The file mode used to open the GeoIP2 database
* @return Builder object
* @throws java.lang.IllegalArgumentException if you initialized the Builder with a URL, which uses {@link FileMode#MEMORY}, but you provided a different FileMode to this method.
*/
public Builder fileMode(FileMode val) {
if (this.stream != null && !FileMode.MEMORY.equals(val)) {
throw new IllegalArgumentException("Only FileMode.MEMORY is supported when using an InputStream.");
}
this.mode = val;
return this;
}
/**
* @return an instance of <code>DatabaseReader</code> created from the fields set on this builder.
* @throws IOException if there is an error reading the database
*/
public DatabaseReader build() throws IOException {
return new DatabaseReader(this);
}
}
/**
* @param ipAddress IPv4 or IPv6 address to lookup.
* @return An object of type T with the data for the IP address or null if no information could be found for the given IP address
* @throws IOException if there is an error opening or reading from the file.
*/
private <T> T get(InetAddress ipAddress, Class<T> cls, boolean hasTraits, String type) throws IOException, AddressNotFoundException {
ObjectNode node = (ObjectNode) this.reader.get(ipAddress);
if (node == null) {
return null;
}
ObjectNode ipNode;
if (hasTraits) {
if (!node.has("traits")) {
node.set("traits", this.om.createObjectNode());
}
ipNode = (ObjectNode) node.get("traits");
} else {
ipNode = node;
}
ipNode.put("ip_address", ipAddress.getHostAddress());
return this.om.treeToValue(node, cls);
}
/**
* <p>
* Closes the database.
* </p>
* <p>
* If you are using <code>FileMode.MEMORY_MAPPED</code>, this will
* <em>not</em> unmap the underlying file due to a limitation in Java's <code>MappedByteBuffer</code>. It will however set the reference to the buffer to <code>null</code>, allowing the garbage
* collector to collect it.
* </p>
*
* @throws IOException if an I/O error occurs.
*/
@Override
public void close() throws IOException {
this.reader.close();
}
@Override
public CountryResponse country(InetAddress ipAddress) throws IOException, GeoIp2Exception {
return this.get(ipAddress, CountryResponse.class, true, "Country");
}
@Override
public CityResponse city(InetAddress ipAddress) throws IOException, GeoIp2Exception {
return this.get(ipAddress, CityResponse.class, true, "City");
}
/**
* Look up an IP address in a GeoIP2 Anonymous IP.
*
* @param ipAddress IPv4 or IPv6 address to lookup.
* @return a AnonymousIpResponse for the requested IP address.
* @throws GeoIp2Exception if there is an error looking up the IP
* @throws IOException if there is an IO error
*/
public AnonymousIpResponse anonymousIp(InetAddress ipAddress) throws IOException, GeoIp2Exception {
return this.get(ipAddress, AnonymousIpResponse.class, false, "GeoIP2-Anonymous-IP");
}
/**
* Look up an IP address in a GeoIP2 Connection Type database.
*
* @param ipAddress IPv4 or IPv6 address to lookup.
* @return a ConnectTypeResponse for the requested IP address.
* @throws GeoIp2Exception if there is an error looking up the IP
* @throws IOException if there is an IO error
*/
public ConnectionTypeResponse connectionType(InetAddress ipAddress) throws IOException, GeoIp2Exception {
return this.get(ipAddress, ConnectionTypeResponse.class, false,
"GeoIP2-Connection-Type");
}
/**
* Look up an IP address in a GeoIP2 Domain database.
*
* @param ipAddress IPv4 or IPv6 address to lookup.
* @return a DomainResponse for the requested IP address.
* @throws GeoIp2Exception if there is an error looking up the IP
* @throws IOException if there is an IO error
*/
public DomainResponse domain(InetAddress ipAddress) throws IOException, GeoIp2Exception {
return this.get(ipAddress, DomainResponse.class, false, "GeoIP2-Domain");
}
/**
* Look up an IP address in a GeoIP2 ISP database.
*
* @param ipAddress IPv4 or IPv6 address to lookup.
* @return an IspResponse for the requested IP address.
* @throws GeoIp2Exception if there is an error looking up the IP
* @throws IOException if there is an IO error
*/
public IspResponse isp(InetAddress ipAddress) throws IOException, GeoIp2Exception {
return this.get(ipAddress, IspResponse.class, false, "GeoIP2-ISP");
}
/**
* @return the metadata for the open MaxMind DB file.
*/
public Metadata getMetadata() {
return this.reader.getMetadata();
}
}

View File

@ -0,0 +1,332 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.maxmind;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
import com.maxmind.geoip2.model.AnonymousIpResponse;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.ConnectionTypeResponse;
import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
import com.maxmind.geoip2.model.DomainResponse;
import com.maxmind.geoip2.model.IspResponse;
import com.maxmind.geoip2.record.Country;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Subdivision;
@Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"})
@CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind "
+ "Database file and specifying which types of enrichment should be provided for an IP Address. Each type of enrichment is a separate lookup, so configuring the "
+ "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. View the Usage of this component "
+ "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.")
public class IPLookupService extends AbstractControllerService implements RecordLookupService {
private volatile DatabaseReader databaseReader = null;
static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder()
.name("database-file")
.displayName("MaxMind Database File")
.description("Path to Maxmind IP Enrichment Database File")
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();
static final PropertyDescriptor LOOKUP_CITY = new PropertyDescriptor.Builder()
.name("lookup-city")
.displayName("Lookup Geo Enrichment")
.description("Specifies whether or not information about the geographic information, such as cities, corresponding to the IP address should be returned")
.allowableValues("true", "false")
.defaultValue("true")
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor LOOKUP_ISP = new PropertyDescriptor.Builder()
.name("lookup-isp")
.displayName("Lookup ISP")
.description("Specifies whether or not information about the Information Service Provider corresponding to the IP address should be returned")
.expressionLanguageSupported(false)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor LOOKUP_DOMAIN = new PropertyDescriptor.Builder()
.name("lookup-domain")
.displayName("Lookup Domain Name")
.description("Specifies whether or not information about the Domain Name corresponding to the IP address should be returned. "
+ "If true, the lookup will contain second-level domain information, such as foo.com but will not contain bar.foo.com")
.expressionLanguageSupported(false)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor LOOKUP_CONNECTION_TYPE = new PropertyDescriptor.Builder()
.name("lookup-connection-type")
.displayName("Lookup Connection Type")
.description("Specifies whether or not information about the Connection Type corresponding to the IP address should be returned. "
+ "If true, the lookup will contain a 'connectionType' field that (if populated) will contain a value of 'Dialup', 'Cable/DSL', 'Corporate', or 'Cellular'")
.expressionLanguageSupported(false)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor LOOKUP_ANONYMOUS_IP_INFO = new PropertyDescriptor.Builder()
.name("lookup-anonymous-ip")
.displayName("Lookup Anonymous IP Information")
.description("Specifies whether or not information about whether or not the IP address belongs to an anonymous network should be returned.")
.expressionLanguageSupported(false)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(GEO_DATABASE_FILE);
properties.add(LOOKUP_CITY);
properties.add(LOOKUP_ISP);
properties.add(LOOKUP_DOMAIN);
properties.add(LOOKUP_CONNECTION_TYPE);
properties.add(LOOKUP_ANONYMOUS_IP_INFO);
return properties;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws IOException {
final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue();
final File dbFile = new File(dbFileString);
final StopWatch stopWatch = new StopWatch(true);
final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build();
stopWatch.stop();
getLogger().info("Completed loading of Maxmind Database. Elapsed time was {} milliseconds.", new Object[] {stopWatch.getDuration(TimeUnit.MILLISECONDS)});
databaseReader = reader;
}
@OnStopped
public void closeReader() throws IOException {
final DatabaseReader reader = databaseReader;
if (reader != null) {
reader.close();
}
}
@Override
public Optional<Record> lookup(final String key) throws LookupFailureException {
if (key == null) {
return Optional.empty();
}
final InetAddress inetAddress;
try {
inetAddress = InetAddress.getByName(key);
} catch (final IOException ioe) {
getLogger().warn("Could not resolve the IP for value '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
"providing the service with an invalid IP address", new Object[] {key}, ioe);
return Optional.empty();
}
final Record geoRecord;
if (getProperty(LOOKUP_CITY).asBoolean()) {
final CityResponse cityResponse;
try {
cityResponse = databaseReader.city(inetAddress);
} catch (final Exception e) {
throw new LookupFailureException("Failed to lookup City information for IP Address " + inetAddress, e);
}
geoRecord = createRecord(cityResponse);
} else {
geoRecord = null;
}
final Record ispRecord;
if (getProperty(LOOKUP_ISP).asBoolean()) {
final IspResponse ispResponse;
try {
ispResponse = databaseReader.isp(inetAddress);
} catch (final Exception e) {
throw new LookupFailureException("Failed to lookup ISP information for IP Address " + inetAddress, e);
}
ispRecord = createRecord(ispResponse);
} else {
ispRecord = null;
}
final String domainName;
if (getProperty(LOOKUP_DOMAIN).asBoolean()) {
final DomainResponse domainResponse;
try {
domainResponse = databaseReader.domain(inetAddress);
} catch (final Exception e) {
throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e);
}
domainName = domainResponse == null ? null : domainResponse.getDomain();
} else {
domainName = null;
}
final String connectionType;
if (getProperty(LOOKUP_CONNECTION_TYPE).asBoolean()) {
final ConnectionTypeResponse connectionTypeResponse;
try {
connectionTypeResponse = databaseReader.connectionType(inetAddress);
} catch (final Exception e) {
throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e);
}
if (connectionTypeResponse == null) {
connectionType = null;
} else {
final ConnectionType type = connectionTypeResponse.getConnectionType();
connectionType = type == null ? null : type.name();
}
} else {
connectionType = null;
}
final Record anonymousIpRecord;
if (getProperty(LOOKUP_ANONYMOUS_IP_INFO).asBoolean()) {
final AnonymousIpResponse anonymousIpResponse;
try {
anonymousIpResponse = databaseReader.anonymousIp(inetAddress);
} catch (final Exception e) {
throw new LookupFailureException("Failed to lookup Anonymous IP Information for IP Address " + inetAddress, e);
}
anonymousIpRecord = createRecord(anonymousIpResponse);
} else {
anonymousIpRecord = null;
}
return Optional.ofNullable(createContainerRecord(geoRecord, ispRecord, domainName, connectionType, anonymousIpRecord));
}
private Record createRecord(final CityResponse city) {
if (city == null) {
return null;
}
final Map<String, Object> values = new HashMap<>();
values.put(CitySchema.CITY.getFieldName(), city.getCity().getName());
final Location location = city.getLocation();
values.put(CitySchema.ACCURACY.getFieldName(), location.getAccuracyRadius());
values.put(CitySchema.METRO_CODE.getFieldName(), location.getMetroCode());
values.put(CitySchema.TIMEZONE.getFieldName(), location.getTimeZone());
values.put(CitySchema.LATITUDE.getFieldName(), location.getLatitude());
values.put(CitySchema.LONGITUDE.getFieldName(), location.getLongitude());
values.put(CitySchema.CONTINENT.getFieldName(), city.getContinent().getName());
values.put(CitySchema.POSTALCODE.getFieldName(), city.getPostal().getCode());
values.put(CitySchema.COUNTRY.getFieldName(), createRecord(city.getCountry()));
final Object[] subdivisions = new Object[city.getSubdivisions().size()];
int i = 0;
for (final Subdivision subdivision : city.getSubdivisions()) {
subdivisions[i++] = createRecord(subdivision);
}
values.put(CitySchema.SUBDIVISIONS.getFieldName(), subdivisions);
return new MapRecord(CitySchema.GEO_SCHEMA, values);
}
private Record createRecord(final Subdivision subdivision) {
if (subdivision == null) {
return null;
}
final Map<String, Object> values = new HashMap<>(2);
values.put(CitySchema.SUBDIVISION_NAME.getFieldName(), subdivision.getName());
values.put(CitySchema.SUBDIVISION_ISO.getFieldName(), subdivision.getIsoCode());
return new MapRecord(CitySchema.SUBDIVISION_SCHEMA, values);
}
private Record createRecord(final Country country) {
if (country == null) {
return null;
}
final Map<String, Object> values = new HashMap<>(2);
values.put(CitySchema.COUNTRY_NAME.getFieldName(), country.getName());
values.put(CitySchema.COUNTRY_ISO.getFieldName(), country.getIsoCode());
return new MapRecord(CitySchema.COUNTRY_SCHEMA, values);
}
private Record createRecord(final IspResponse isp) {
if (isp == null) {
return null;
}
final Map<String, Object> values = new HashMap<>(4);
values.put(IspSchema.ASN.getFieldName(), isp.getAutonomousSystemNumber());
values.put(IspSchema.ASN_ORG.getFieldName(), isp.getAutonomousSystemOrganization());
values.put(IspSchema.NAME.getFieldName(), isp.getIsp());
values.put(IspSchema.ORG.getFieldName(), isp.getOrganization());
return new MapRecord(IspSchema.ISP_SCHEMA, values);
}
private Record createRecord(final AnonymousIpResponse anonymousIp) {
if (anonymousIp == null) {
return null;
}
final Map<String, Object> values = new HashMap<>(5);
values.put(AnonymousIpSchema.ANONYMOUS.getFieldName(), anonymousIp.isAnonymous());
values.put(AnonymousIpSchema.ANONYMOUS_VPN.getFieldName(), anonymousIp.isAnonymousVpn());
values.put(AnonymousIpSchema.HOSTING_PROVIDER.getFieldName(), anonymousIp.isHostingProvider());
values.put(AnonymousIpSchema.PUBLIC_PROXY.getFieldName(), anonymousIp.isPublicProxy());
values.put(AnonymousIpSchema.TOR_EXIT_NODE.getFieldName(), anonymousIp.isTorExitNode());
return new MapRecord(AnonymousIpSchema.ANONYMOUS_IP_SCHEMA, values);
}
private Record createContainerRecord(final Record geoRecord, final Record ispRecord, final String domainName, final String connectionType, final Record anonymousIpRecord) {
final Map<String, Object> values = new HashMap<>(4);
values.put("geo", geoRecord);
values.put("isp", ispRecord);
values.put("domainName", domainName);
values.put("connectionType", connectionType);
values.put("anonymousIp", anonymousIpRecord);
final Record containerRecord = new MapRecord(ContainerSchema.CONTAINER_SCHEMA, values);
return containerRecord;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.maxmind;
import java.util.Arrays;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
public class IspSchema {
static final RecordField NAME = new RecordField("name", RecordFieldType.STRING.getDataType());
static final RecordField ORG = new RecordField("organization", RecordFieldType.STRING.getDataType());
static final RecordField ASN = new RecordField("asn", RecordFieldType.INT.getDataType());
static final RecordField ASN_ORG = new RecordField("asnOrganization", RecordFieldType.STRING.getDataType());
static final RecordSchema ISP_SCHEMA = new SimpleRecordSchema(Arrays.asList(NAME, ORG, ASN, ASN_ORG));
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.lookup.maxmind.IPLookupService
org.apache.nifi.lookup.SimpleKeyValueLookupService

View File

@ -0,0 +1,102 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>IPLookupService</title>
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<p>
The IPLookupService is powered by a MaxMind database and can return several different types of enrichment information
about a given IP address. Below is the schema of the Record that is returned by this service (in Avro Schema format).
</p>
<code>
<pre>
{
"name": "ipEnrichment",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "geo",
"type": {
"name": "cityGeo",
"type": "record",
"fields": [
{ "name": "city", "type": "string" },
{ "name": "accuracy", "type": "int", "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
{ "name": "metroCode", "type": "int" },
{ "name": "timeZone", "type": "string" },
{ "name": "latitude", "type": "double" },
{ "name": "longitude", "type": "double" },
{ "name": "country", "type": {
"type": "record",
"name": "country",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
} },
{ "name": "subdivisions", "type": {
"type": "array",
"items": {
"type": "record",
"name": "subdivision",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
}
}
},
{ "name": "continent", "type": "string" },
{ "name": "postalCode", "type": "string" }
]
}
},
{
"name": "isp",
"type": {
"name": "ispEnrich",
"type": "record",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "organization", "type": "string" },
{ "name": "asn", "type": "int" },
{ "name": "asnOrganization", "type": "string" }
]
}
},
{
"name": "domainName",
"type": "string"
},
{
"name": "connectionType",
"type": "string",
"doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
}
]
}
</pre>
</code>
</body>
</html>

View File

@ -0,0 +1,28 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-lookup-services-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-lookup-services</module>
<module>nifi-lookup-services-nar</module>
</modules>
</project>

View File

@ -19,6 +19,7 @@ package org.apache.nifi.serialization;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
@ -66,9 +67,11 @@ public interface RecordSetWriterFactory extends ControllerService {
* @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service
* because it allows messages to be logged for the component that is calling this Controller Service.
* @param schema the schema that will be used for writing records
* @param flowFile the FlowFile to write to
* @param out the OutputStream to write to
*
* @return a RecordSetWriter that can write record sets to an OutputStream
* @throws IOException if unable to read from the given InputStream
*/
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException;
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException;
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.avro;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashMap;
@ -31,6 +32,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaField;
@ -57,7 +59,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
"The FlowFile will have the Avro schema embedded into the content, as is typical with Avro");
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema) throws IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final FlowFile flowFile, final OutputStream out) throws IOException {
final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
try {
@ -78,9 +80,9 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
}
if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
return new WriteAvroResultWithSchema(avroSchema);
return new WriteAvroResultWithSchema(avroSchema, out);
} else {
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema));
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out);
}
} catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);

View File

@ -32,9 +32,11 @@ import java.util.Collections;
public abstract class WriteAvroResult implements RecordSetWriter {
private final Schema schema;
private final OutputStream out;
public WriteAvroResult(final Schema schema) {
public WriteAvroResult(final Schema schema, final OutputStream out) {
this.schema = schema;
this.out = out;
}
protected Schema getSchema() {
@ -42,7 +44,7 @@ public abstract class WriteAvroResult implements RecordSetWriter {
}
@Override
public WriteResult write(final Record record, final OutputStream out) throws IOException {
public WriteResult write(final Record record) throws IOException {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@ -58,12 +60,4 @@ public abstract class WriteAvroResult implements RecordSetWriter {
public String getMimeType() {
return "application/avro-binary";
}
public static String normalizeNameForAvro(String inputName) {
String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
if (Character.isDigit(normalizedName.charAt(0))) {
normalizedName = "_" + normalizedName;
}
return normalizedName;
}
}

View File

@ -20,7 +20,7 @@ package org.apache.nifi.avro;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
@ -29,66 +29,53 @@ import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
public class WriteAvroResultWithExternalSchema extends WriteAvroResult {
public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
private final SchemaAccessWriter schemaAccessWriter;
private final RecordSchema recordSchema;
private final Schema avroSchema;
private final BinaryEncoder encoder;
private final OutputStream buffered;
private final DatumWriter<GenericRecord> datumWriter;
public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter) {
super(avroSchema);
public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema,
final SchemaAccessWriter schemaAccessWriter, final OutputStream out) throws IOException {
super(out);
this.recordSchema = recordSchema;
this.schemaAccessWriter = schemaAccessWriter;
this.avroSchema = avroSchema;
this.buffered = new BufferedOutputStream(out);
datumWriter = new GenericDatumWriter<>(avroSchema);
schemaAccessWriter.writeHeader(recordSchema, buffered);
encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
}
@Override
public WriteResult write(final RecordSet rs, final OutputStream outStream) throws IOException {
Record record = rs.next();
if (record == null) {
return WriteResult.of(0, Collections.emptyMap());
}
int nrOfRows = 0;
final Schema schema = getSchema();
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final BufferedOutputStream bufferedOut = new BufferedOutputStream(outStream);
schemaAccessWriter.writeHeader(recordSchema, bufferedOut);
final BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null);
do {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
datumWriter.write(rec, encoder);
encoder.flush();
nrOfRows++;
} while ((record = rs.next()) != null);
bufferedOut.flush();
return WriteResult.of(nrOfRows, schemaAccessWriter.getAttributes(recordSchema));
protected void onBeginRecordSet() throws IOException {
schemaAccessWriter.writeHeader(recordSchema, buffered);
}
@Override
public WriteResult write(final Record record, final OutputStream out) throws IOException {
final Schema schema = getSchema();
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
schemaAccessWriter.writeHeader(recordSchema, bufferedOut);
final BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null);
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
datumWriter.write(rec, encoder);
protected Map<String, String> onFinishRecordSet() throws IOException {
encoder.flush();
buffered.flush();
return schemaAccessWriter.getAttributes(recordSchema);
}
bufferedOut.flush();
@Override
public WriteResult write(final Record record) throws IOException {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema);
datumWriter.write(rec, encoder);
return WriteResult.of(1, schemaAccessWriter.getAttributes(recordSchema));
}
@Override
public String getMimeType() {
return "application/avro-binary";
}
}

View File

@ -25,57 +25,38 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
public class WriteAvroResultWithSchema extends WriteAvroResult {
public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
public WriteAvroResultWithSchema(final Schema schema) {
super(schema);
private final DataFileWriter<GenericRecord> dataFileWriter;
private final Schema schema;
public WriteAvroResultWithSchema(final Schema schema, final OutputStream out) throws IOException {
super(out);
this.schema = schema;
final GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, out);
}
@Override
public WriteResult write(final RecordSet rs, final OutputStream outStream) throws IOException {
Record record = rs.next();
if (record == null) {
return WriteResult.of(0, Collections.emptyMap());
}
int nrOfRows = 0;
final Schema schema = getSchema();
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, outStream);
do {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
dataFileWriter.append(rec);
nrOfRows++;
} while ((record = rs.next()) != null);
}
return WriteResult.of(nrOfRows, Collections.emptyMap());
public void close() throws IOException {
dataFileWriter.close();
}
@Override
public WriteResult write(final Record record, final OutputStream out) throws IOException {
if (record == null) {
return WriteResult.of(0, Collections.emptyMap());
}
final Schema schema = getSchema();
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, out);
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
dataFileWriter.append(rec);
}
public WriteResult write(final Record record) throws IOException {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
dataFileWriter.append(rec);
return WriteResult.of(1, Collections.emptyMap());
}
@Override
public String getMimeType() {
return "application/avro-binary";
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.csv;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
@ -27,6 +28,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@ -67,8 +69,8 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) throws SchemaNotFoundException, IOException {
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema),
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException {
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader);
}
}

View File

@ -20,38 +20,44 @@ package org.apache.nifi.csv;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
public class WriteCSVResult implements RecordSetWriter {
private final CSVFormat csvFormat;
public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter {
private final RecordSchema recordSchema;
private final SchemaAccessWriter schemaWriter;
private final String dateFormat;
private final String timeFormat;
private final String timestampFormat;
private final boolean includeHeaderLine;
private final CSVPrinter printer;
private final Object[] fieldValues;
public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter,
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) {
this.csvFormat = csvFormat;
public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) throws IOException {
super(out);
this.recordSchema = recordSchema;
this.schemaWriter = schemaWriter;
this.dateFormat = dateFormat;
this.timeFormat = timeFormat;
this.timestampFormat = timestampFormat;
this.includeHeaderLine = includeHeaderLine;
final String[] columnNames = recordSchema.getFieldNames().toArray(new String[0]);
final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine);
final OutputStreamWriter streamWriter = new OutputStreamWriter(out);
printer = new CSVPrinter(streamWriter, formatWithHeader);
fieldValues = new Object[recordSchema.getFieldCount()];
}
private String getFormat(final RecordField field) {
@ -69,60 +75,29 @@ public class WriteCSVResult implements RecordSetWriter {
}
@Override
public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
int count = 0;
final String[] columnNames = recordSchema.getFieldNames().toArray(new String[0]);
final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine);
schemaWriter.writeHeader(recordSchema, rawOut);
try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut);
final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable);
final CSVPrinter printer = new CSVPrinter(streamWriter, formatWithHeader)) {
try {
Record record;
while ((record = rs.next()) != null) {
final Object[] colVals = new Object[recordSchema.getFieldCount()];
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
colVals[i++] = record.getAsString(recordField, getFormat(recordField));
}
printer.printRecord(colVals);
count++;
}
} catch (final Exception e) {
throw new IOException("Failed to serialize results", e);
}
}
return WriteResult.of(count, schemaWriter.getAttributes(recordSchema));
protected void onBeginRecordSet() throws IOException {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}
@Override
public WriteResult write(final Record record, final OutputStream rawOut) throws IOException {
protected Map<String, String> onFinishRecordSet() throws IOException {
return schemaWriter.getAttributes(recordSchema);
}
try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut);
final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable);
final CSVPrinter printer = new CSVPrinter(streamWriter, csvFormat)) {
@Override
public void close() throws IOException {
printer.close();
}
try {
final RecordSchema schema = record.getSchema();
final Object[] colVals = new Object[schema.getFieldCount()];
int i = 0;
for (final RecordField recordField : schema.getFields()) {
colVals[i++] = record.getAsString(recordField, getFormat(recordField));
}
printer.printRecord(colVals);
} catch (final Exception e) {
throw new IOException("Failed to serialize results", e);
}
@Override
public WriteResult write(final Record record) throws IOException {
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
}
return WriteResult.of(1, Collections.emptyMap());
printer.printRecord(fieldValues);
return WriteResult.of(1, schemaWriter.getAttributes(recordSchema));
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.nifi.json;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
@ -26,6 +27,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@ -62,8 +64,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) throws SchemaNotFoundException, IOException {
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), prettyPrint,
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException {
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
}

View File

@ -20,13 +20,13 @@ package org.apache.nifi.json;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.sql.SQLException;
import java.text.DateFormat;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
@ -34,87 +34,76 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.SerializedForm;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
public class WriteJsonResult implements RecordSetWriter {
public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter {
private final ComponentLog logger;
private final boolean prettyPrint;
private final SchemaAccessWriter schemaAccess;
private final RecordSchema recordSchema;
private final JsonFactory factory = new JsonFactory();
private final DateFormat dateFormat;
private final DateFormat timeFormat;
private final DateFormat timestampFormat;
private final JsonGenerator generator;
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final boolean prettyPrint,
final String dateFormat, final String timeFormat, final String timestampFormat) {
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
super(out);
this.logger = logger;
this.recordSchema = recordSchema;
this.prettyPrint = prettyPrint;
this.schemaAccess = schemaAccess;
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
this.generator = factory.createJsonGenerator(out);
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
}
}
@Override
protected void onBeginRecordSet() throws IOException {
final OutputStream out = getOutputStream();
schemaAccess.writeHeader(recordSchema, out);
generator.writeStartArray();
}
@Override
public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
int count = 0;
schemaAccess.writeHeader(recordSchema, rawOut);
try (final JsonGenerator generator = factory.createJsonGenerator(new NonCloseableOutputStream(rawOut))) {
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
}
generator.writeStartArray();
Record record;
while ((record = rs.next()) != null) {
count++;
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
}
generator.writeEndArray();
} catch (final SQLException e) {
throw new IOException("Failed to serialize Result Set to stream", e);
}
return WriteResult.of(count, schemaAccess.getAttributes(recordSchema));
protected Map<String, String> onFinishRecordSet() throws IOException {
generator.writeEndArray();
return schemaAccess.getAttributes(recordSchema);
}
@Override
public WriteResult write(final Record record, final OutputStream rawOut) throws IOException {
schemaAccess.writeHeader(recordSchema, rawOut);
try (final JsonGenerator generator = factory.createJsonGenerator(new NonCloseableOutputStream(rawOut))) {
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
}
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
} catch (final SQLException e) {
throw new IOException("Failed to write records to stream", e);
public void close() throws IOException {
if (generator != null) {
generator.close();
}
super.close();
}
@Override
public WriteResult write(final Record record) throws IOException {
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
return WriteResult.of(1, schemaAccess.getAttributes(recordSchema));
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask)
throws JsonGenerationException, IOException, SQLException {
throws JsonGenerationException, IOException {
final Optional<SerializedForm> serializedForm = record.getSerializedForm();
if (serializedForm.isPresent()) {
@ -155,7 +144,7 @@ public class WriteJsonResult implements RecordSetWriter {
@SuppressWarnings("unchecked")
private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType, final boolean moreCols)
throws JsonGenerationException, IOException, SQLException {
throws JsonGenerationException, IOException {
if (value == null) {
generator.writeNull();
return;
@ -268,7 +257,7 @@ public class WriteJsonResult implements RecordSetWriter {
}
private void writeArray(final Object[] values, final String fieldName, final JsonGenerator generator, final DataType elementType)
throws JsonGenerationException, IOException, SQLException {
throws JsonGenerationException, IOException {
generator.writeStartArray();
for (int i = 0; i < values.length; i++) {
final boolean moreEntries = i < values.length - 1;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.text;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
@ -81,8 +82,8 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
return new FreeFormTextWriter(textValue, characterSet);
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new FreeFormTextWriter(textValue, characterSet, out);
}
@Override

View File

@ -27,42 +27,24 @@ import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
public class FreeFormTextWriter implements RecordSetWriter {
public class FreeFormTextWriter extends AbstractRecordSetWriter implements RecordSetWriter {
private static final byte NEW_LINE = (byte) '\n';
private final PropertyValue propertyValue;
private final Charset charset;
private final OutputStream out;
public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet) {
propertyValue = textPropertyValue;
charset = characterSet;
}
@Override
public WriteResult write(final RecordSet recordSet, final OutputStream out) throws IOException {
int count = 0;
try {
Record record;
while ((record = recordSet.next()) != null) {
final RecordSchema schema = record.getSchema();
final List<String> colNames = getColumnNames(schema);
count++;
write(record, out, colNames);
}
} catch (final Exception e) {
throw new ProcessException(e);
}
return WriteResult.of(count, Collections.emptyMap());
public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out) {
super(out);
this.propertyValue = textPropertyValue;
this.charset = characterSet;
this.out = out;
}
private List<String> getColumnNames(final RecordSchema schema) {
@ -78,7 +60,7 @@ public class FreeFormTextWriter implements RecordSetWriter {
}
@Override
public WriteResult write(final Record record, final OutputStream out) throws IOException {
public WriteResult write(final Record record) throws IOException {
write(record, out, getColumnNames(record.getSchema()));
return WriteResult.of(1, Collections.emptyMap());
}

View File

@ -27,6 +27,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
@ -47,6 +48,7 @@ import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
@ -60,7 +62,7 @@ import org.junit.Test;
public abstract class TestWriteAvroResult {
protected abstract WriteAvroResult createWriter(Schema schema);
protected abstract RecordSetWriter createWriter(Schema schema, OutputStream out) throws IOException;
protected abstract GenericRecord readRecord(InputStream in, Schema schema) throws IOException;
@ -80,7 +82,7 @@ public abstract class TestWriteAvroResult {
}
private void testLogicalTypes(Schema schema) throws ParseException, IOException {
final WriteAvroResult writer = createWriter(schema);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("timeMillis", RecordFieldType.TIME.getDataType()));
@ -108,12 +110,12 @@ public abstract class TestWriteAvroResult {
values.put("decimal", expectedDecimal.doubleValue());
final Record record = new MapRecord(recordSchema, values);
final byte[] data;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.write(RecordSet.of(record.getSchema(), record), baos);
data = baos.toByteArray();
try (final RecordSetWriter writer = createWriter(schema, baos)) {
writer.write(RecordSet.of(record.getSchema(), record));
}
final byte[] data = baos.toByteArray();
try (final InputStream in = new ByteArrayInputStream(data)) {
final GenericRecord avroRecord = readRecord(in, schema);
final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
@ -138,7 +140,7 @@ public abstract class TestWriteAvroResult {
@Test
public void testDataTypes() throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc"));
final WriteAvroResult writer = createWriter(schema);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final List<RecordField> subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType()));
final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields);
@ -178,13 +180,14 @@ public abstract class TestWriteAvroResult {
final Record record = new MapRecord(recordSchema, values);
final byte[] data;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final WriteResult writeResult = writer.write(RecordSet.of(record.getSchema(), record), baos);
verify(writeResult);
data = baos.toByteArray();
final WriteResult writeResult;
try (final RecordSetWriter writer = createWriter(schema, baos)) {
writeResult = writer.write(RecordSet.of(record.getSchema(), record));
}
verify(writeResult);
final byte[] data = baos.toByteArray();
try (final InputStream in = new ByteArrayInputStream(data)) {
final GenericRecord avroRecord = readRecord(in, schema);
assertMatch(record, avroRecord);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
@ -26,12 +27,13 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.StringType;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.RecordSetWriter;
public class TestWriteAvroResultWithSchema extends TestWriteAvroResult {
@Override
protected WriteAvroResult createWriter(final Schema schema) {
return new WriteAvroResultWithSchema(schema);
protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException {
return new WriteAvroResultWithSchema(schema, out);
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.avro.Schema;
@ -27,14 +28,15 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.nifi.schema.access.SchemaTextAsAttribute;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.junit.Assert;
public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
@Override
protected WriteAvroResult createWriter(final Schema schema) {
return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute());
protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException {
return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute(), out);
}
@Override

View File

@ -71,37 +71,38 @@ public class TestWriteCSVResult {
}
final RecordSchema schema = new SimpleRecordSchema(fields);
final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(),
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final long now = System.currentTimeMillis();
final Map<String, Object> valueMap = new HashMap<>();
valueMap.put("string", "string");
valueMap.put("boolean", true);
valueMap.put("byte", (byte) 1);
valueMap.put("char", 'c');
valueMap.put("short", (short) 8);
valueMap.put("int", 9);
valueMap.put("bigint", BigInteger.valueOf(8L));
valueMap.put("long", 8L);
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
valueMap.put("date", new Date(now));
valueMap.put("time", new Time(now));
valueMap.put("timestamp", new Timestamp(now));
valueMap.put("record", null);
valueMap.put("choice", 48L);
valueMap.put("array", null);
final Record record = new MapRecord(schema, valueMap);
final RecordSet rs = RecordSet.of(schema, record);
try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
final String output;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
result.write(rs, baos);
output = new String(baos.toByteArray(), StandardCharsets.UTF_8);
final Map<String, Object> valueMap = new HashMap<>();
valueMap.put("string", "string");
valueMap.put("boolean", true);
valueMap.put("byte", (byte) 1);
valueMap.put("char", 'c');
valueMap.put("short", (short) 8);
valueMap.put("int", 9);
valueMap.put("bigint", BigInteger.valueOf(8L));
valueMap.put("long", 8L);
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
valueMap.put("date", new Date(now));
valueMap.put("time", new Time(now));
valueMap.put("timestamp", new Timestamp(now));
valueMap.put("record", null);
valueMap.put("choice", 48L);
valueMap.put("array", null);
final Record record = new MapRecord(schema, valueMap);
final RecordSet rs = RecordSet.of(schema, record);
result.write(rs);
}
final String output = new String(baos.toByteArray(), StandardCharsets.UTF_8);
headerBuilder.deleteCharAt(headerBuilder.length() - 1);
final String headerLine = headerBuilder.toString();

View File

@ -72,9 +72,7 @@ public class TestWriteJsonResult {
}
final RecordSchema schema = new SimpleRecordSchema(fields);
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), true, RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
final long time = df.parse("2017/01/01 17:00:00.000").getTime();
@ -105,12 +103,14 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, valueMap);
final RecordSet rs = RecordSet.of(schema, record);
final String output;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.write(rs, baos);
output = baos.toString();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
writer.write(rs);
}
final String output = baos.toString();
final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/json/output/dataTypes.json")));
assertEquals(expected, output);
}
@ -139,15 +139,15 @@ public class TestWriteJsonResult {
final RecordSet rs = RecordSet.of(schema, record1, record2);
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), true, RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
final byte[] data;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.write(rs, baos);
data = baos.toByteArray();
writer.write(rs);
}
final byte[] data = baos.toByteArray();
final String expected = "[ " + serialized1 + ", " + serialized2 + " ]";
final String output = new String(data, StandardCharsets.UTF_8);
@ -171,14 +171,13 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, values);
final RecordSet rs = RecordSet.of(schema, record);
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), false, null, null, null);
final byte[] data;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.write(rs, baos);
data = baos.toByteArray();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
writer.write(rs);
}
final byte[] data = baos.toByteArray();
final String expected = "[{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}]";
final String output = new String(data, StandardCharsets.UTF_8);

View File

@ -66,6 +66,11 @@
<artifactId>nifi-record-serialization-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>

View File

@ -27,6 +27,8 @@
<module>nifi-distributed-cache-services-bundle</module>
<module>nifi-load-distribution-service-api</module>
<module>nifi-http-context-map-api</module>
<module>nifi-lookup-service-api</module>
<module>nifi-lookup-services-bundle</module>
<module>nifi-ssl-context-bundle</module>
<module>nifi-ssl-context-service-api</module>
<module>nifi-http-context-map-bundle</module>

16
pom.xml
View File

@ -990,6 +990,22 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-services</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-services-nar</artifactId>
<version>1.3.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>