diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml index 55bf1ce74d..7bf02b2528 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml @@ -59,5 +59,10 @@ nifi-mock test + + org.apache.nifi + nifi-mock-record-utils + test + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/LoggingRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/LoggingRecordSink.java new file mode 100644 index 0000000000..1abea61228 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/LoggingRecordSink.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.sink; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@Tags({"record", "sink", "log"}) +@CapabilityDescription("Provides a RecordSinkService that can be used to log records to the application log (nifi-app.log, e.g.) using the specified writer for formatting.") +public class LoggingRecordSink extends AbstractControllerService implements RecordSinkService { + + private List properties; + private volatile RecordSetWriterFactory writerFactory; + private volatile LogLevel logLevel; + + public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder() + .name("logsink-log-level") + .displayName("Log Level") + .required(true) + .description("The Log Level at which to log records (INFO, DEBUG, e.g.)") + .allowableValues(LogLevel.values()) + .defaultValue(LogLevel.INFO.name()) + .build(); + + @Override + protected void init(final ControllerServiceInitializationContext context) throws InitializationException { + final List properties = new ArrayList<>(); + properties.add(RecordSinkService.RECORD_WRITER_FACTORY); + properties.add(LOG_LEVEL); + + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + writerFactory = context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + logLevel = LogLevel.valueOf(context.getProperty(LOG_LEVEL).getValue()); + } + + @Override + public WriteResult sendData(RecordSet recordSet, Map attributes, boolean sendZeroResults) throws IOException { + WriteResult writeResult; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + ComponentLog log = getLogger(); + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), baos, attributes)) { + writer.beginRecordSet(); + Record r; + while ((r = recordSet.next()) != null) { + baos.reset(); + writer.write(r); + writer.flush(); + log.log(logLevel, baos.toString()); + } + writeResult = writer.finishRecordSet(); + writer.flush(); + } + + } catch (SchemaNotFoundException e) { + throw new IOException(e); + } + return writeResult; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 240c2e6df2..ec4250c37e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.record.sink.lookup.RecordSinkServiceLookup +org.apache.nifi.record.sink.LoggingRecordSink diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestLoggingRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestLoggingRecordSink.java new file mode 100644 index 0000000000..841be12d11 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestLoggingRecordSink.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.sink; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + + +public class TestLoggingRecordSink { + + private LoggingRecordSink recordSink; + private RecordSet recordSet; + + @Before + public void setup() throws InitializationException { + TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + recordSink = new LoggingRecordSink(); + runner.addControllerService("log", recordSink); + + MockRecordWriter writerFactory = new MockRecordWriter(); + runner.addControllerService("writer", writerFactory); + runner.setProperty(recordSink, LoggingRecordSink.RECORD_WRITER_FACTORY, "writer"); + + runner.enableControllerService(writerFactory); + runner.enableControllerService(recordSink); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("a", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("b", RecordFieldType.BOOLEAN.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map valueMap1 = new HashMap<>(); + valueMap1.put("a", "Hello"); + valueMap1.put("b", true); + final Record record1 = new MapRecord(schema, valueMap1); + + final Map valueMap2 = new HashMap<>(); + valueMap2.put("a", "World"); + valueMap2.put("b", false); + final Record record2 = new MapRecord(schema, valueMap2); + + recordSet = RecordSet.of(schema, record1, record2); + } + + @Test + public void testLogging() { + try { + final WriteResult writeResult = recordSink.sendData(recordSet, Collections.emptyMap(), false); + assertNotNull(writeResult); + assertEquals(2, writeResult.getRecordCount()); + } catch (IOException ioe) { + fail("Should have completed successfully"); + } + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/lookup/TestProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestProcessor.java similarity index 97% rename from nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/lookup/TestProcessor.java rename to nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestProcessor.java index d7bd69f8b0..615e50f174 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/lookup/TestProcessor.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/TestProcessor.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.record.sink.lookup; +package org.apache.nifi.record.sink; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.AbstractProcessor; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/lookup/TestRecordSinkServiceLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/lookup/TestRecordSinkServiceLookup.java index ed6dfc594e..fcfab20491 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/lookup/TestRecordSinkServiceLookup.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/lookup/TestRecordSinkServiceLookup.java @@ -17,6 +17,7 @@ package org.apache.nifi.record.sink.lookup; +import org.apache.nifi.record.sink.TestProcessor; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult;