From b508d6bfbc2b6a68fe2ab152de96611f06a8724a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 14 Feb 2019 15:11:59 -0500 Subject: [PATCH] NIFI-6033, NIFI-6034, NIFI-6035, NIFI-6036, NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If multiple FlowFiles were written to same Content Claim, and a Processor attempted to read two of them wi within a single session, it would seek to the wrong part of the content or else throw a ContentNotFoundException. Updated logic for considering a processor to be 'running' / having 'active threads' if the processor is invalid upon NiFi restart but scheduled to run. Fixed NPE in FreeFormTextWriter. If MergeRecord reaches minimum number of records, flush writer after writing content out so that its minimum size can accurately be checked. This closes #3309. Signed-off-by: Bryan Bende --- .../record/MockRecordWriter.java | 29 +++++++--- .../controller/AbstractComponentNode.java | 17 +++--- .../controller/StandardProcessorNode.java | 5 +- .../repository/StandardProcessSession.java | 4 +- .../queue/clustered/LoadBalancedQueueIT.java | 4 +- .../scheduling/ProcessorLifecycleIT.java | 50 ----------------- .../nifi/processors/standard/MergeRecord.java | 6 +-- .../processors/standard/merge/RecordBin.java | 54 ++++++++++++------- .../standard/merge/RecordBinManager.java | 9 +--- .../processors/standard/TestMergeRecord.java | 23 +++++++- .../SchemaRegistryRecordSetWriter.java | 6 ++- 11 files changed, 101 insertions(+), 106 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java index d7579e82a5..9d6b0878f1 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -17,11 +17,6 @@ package org.apache.nifi.serialization.record; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Map; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -30,27 +25,43 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { private final String header; private final int failAfterN; private final boolean quoteValues; + private final boolean bufferOutput; public MockRecordWriter() { this(null); } public MockRecordWriter(final String header) { - this(header, true, -1); + this(header, true, -1, false); } public MockRecordWriter(final String header, final boolean quoteValues) { - this(header, quoteValues, -1); + this(header, quoteValues, false); } public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) { + this(header, quoteValues, failAfterN, false); + } + + public MockRecordWriter(final String header, final boolean quoteValues, final boolean bufferOutput) { + this(header, quoteValues, -1, bufferOutput); + } + + public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput) { this.header = header; this.quoteValues = quoteValues; this.failAfterN = failAfterN; + this.bufferOutput = bufferOutput; } @Override @@ -59,7 +70,9 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream rawOut) { + final OutputStream out = bufferOutput ? new BufferedOutputStream(rawOut) : rawOut; + return new RecordSetWriter() { private int recordCount = 0; private boolean headerWritten = false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index 1f039230a3..f78c9d20d9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -204,8 +204,12 @@ public abstract class AbstractComponentNode implements ComponentNode { } } - logger.debug("Resetting Validation State of {} due to setting properties", this); - resetValidationState(); + if (isTriggerValidation()) { + logger.debug("Resetting Validation State of {} due to setting properties", this); + resetValidationState(); + } else { + logger.debug("Properties set for {} but not resettingn validation state because validation is paused", this); + } } finally { lock.unlock(); } @@ -642,13 +646,8 @@ public abstract class AbstractComponentNode implements ComponentNode { public void resumeValidationTrigger() { triggerValidation = true; - final ValidationStatus validationStatus = getValidationStatus(); - if (validationStatus == ValidationStatus.VALIDATING) { - logger.debug("Resuming Triggering of Validation State for {}; status is VALIDATING so will trigger async validation now", this); - validationTrigger.triggerAsync(this); - } else { - logger.debug("Resuming Triggering of Validation State for {}; status is {} so will not trigger async validation now", this, validationStatus); - } + logger.debug("Resuming Triggering of Validation State for {}; Resetting validation state", this); + resetValidationState(); } private boolean isTriggerValidation() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index e59e7fdacd..3e407f5a22 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1367,7 +1367,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } if (starting) { // will ensure that the Processor represented by this node can only be started once - hasActiveThreads = true; initiateStart(taskScheduler, administrativeYieldMillis, timeoutMillis, processContext, schedulingAgentCallback); } else { final String procName = processorRef.get().toString(); @@ -1395,7 +1394,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads(); final Map threadInfoMap = Stream.of(infos) - .collect(Collectors.toMap(info -> info.getThreadId(), Function.identity(), (a, b) -> a)); + .collect(Collectors.toMap(ThreadInfo::getThreadId, Function.identity(), (a, b) -> a)); final List threadList = new ArrayList<>(activeThreads.size()); for (final Map.Entry entry : activeThreads.entrySet()) { @@ -1509,6 +1508,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { try { + hasActiveThreads = true; + activateThread(); try { ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 071a052e89..604eb7e8de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2173,8 +2173,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1. if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) { if (currentReadClaim == claim) { - if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) { - final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed(); + if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) { + final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset(); if (bytesToSkip > 0) { StreamUtils.skip(currentReadClaimStream, bytesToSkip); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java index 615ae007f2..e28e178034 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java @@ -307,7 +307,7 @@ public class LoadBalancedQueueIT { } } - @Test(timeout = 60_000) + @Test(timeout = 90_000) public void testFailover() throws IOException, InterruptedException { localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null); nodeIdentifiers.add(localNodeId); @@ -371,7 +371,7 @@ public class LoadBalancedQueueIT { final int expectedFlowFileReceiveCount = flowFilesPerNode + flowFilesPerNode / 2; // Wait up to 10 seconds for the server's FlowFile Repository to be updated - final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L); + final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L); while (serverRepoRecords.size() < expectedFlowFileReceiveCount && System.currentTimeMillis() < endTime) { Thread.sleep(10L); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java index 55b015de35..890d799790 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java @@ -485,21 +485,6 @@ public class ProcessorLifecycleIT { assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE); } - /** - * Validate that processor will not be validated on failing - * PropertyDescriptor validation. - */ - @Test(expected = IllegalStateException.class) - public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception { - final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); - flowManager = fcsb.getFlowManager(); - - ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); - ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), - fcsb.getSystemBundle().getBundleDetails().getCoordinate()); - processScheduler.startProcessor(testProcNode, true); - fail(); - } /** * Validate that processor will not be validated on failing @@ -527,41 +512,6 @@ public class ProcessorLifecycleIT { fail(); } - /** - * The successful processor start with ControllerService dependency. - */ - @Test - public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception { - final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); - flowManager = fcsb.getFlowManager(); - - ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); - - ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "foo", - fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true); - testGroup.addControllerService(testServiceNode); - - ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), - fcsb.getSystemBundle().getBundleDetails().getCoordinate()); - testGroup.addProcessor(testProcNode); - - properties.put("S", testServiceNode.getIdentifier()); - testProcNode.setProperties(properties); - - TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); - testProcessor.withService = true; - this.noop(testProcessor); - - testServiceNode.performValidation(); - processScheduler.enableControllerService(testServiceNode); - - testProcNode.performValidation(); - processScheduler.startProcessor(testProcNode, true); - - Thread.sleep(500); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); - } - /** * Scenario where onTrigger() is executed with random delay limited to diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java index ecebb8e091..130d6b6515 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -314,7 +314,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue(); final boolean block; - if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) { + if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) { block = true; } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) { block = true; @@ -378,12 +378,12 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) { final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue(); - if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) { + if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) { return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE); } final Optional optionalText = schema.getSchemaText(); - final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString(); + final String schemaText = optionalText.orElseGet(() -> AvroTypeUtil.extractAvroSchema(schema).toString()); final String groupId; final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java index 23f5edffe4..d15ba0fa07 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java @@ -46,8 +46,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; public class RecordBin { - public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; - public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age"; private final ComponentLog logger; private final ProcessSession session; @@ -70,6 +68,8 @@ public class RecordBin { private static final AtomicLong idGenerator = new AtomicLong(0L); private final long id = idGenerator.getAndIncrement(); + private volatile int requiredRecordCount = -1; + public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) { this.session = session; @@ -95,7 +95,6 @@ public class RecordBin { } public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException { - if (isComplete()) { logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this}); return false; @@ -148,6 +147,12 @@ public class RecordBin { flowFileMigrated = true; this.flowFiles.add(flowFile); + if (recordCount >= getMinimumRecordCount()) { + // If we have met our minimum record count, we need to flush so that when we reach the desired number of bytes + // the bin is considered 'full enough'. + recordWriter.flush(); + } + if (isFull()) { logger.debug(this + " is now full. Completing bin."); complete("Bin is full"); @@ -232,6 +237,29 @@ public class RecordBin { } } + private int getMinimumRecordCount() { + final int currentCount = requiredRecordCount; + if (currentCount > -1) { + return currentCount; + } + + int requiredCount; + final Optional recordCountAttribute = thresholds.getRecordCountAttribute(); + if (recordCountAttribute.isPresent()) { + final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get()); + try { + requiredCount = Integer.parseInt(recordCountValue); + } catch (final NumberFormatException e) { + requiredCount = 1; + } + } else { + requiredCount = thresholds.getMinRecords(); + } + + this.requiredRecordCount = requiredCount; + return requiredCount; + } + public boolean isFullEnough() { readLock.lock(); try { @@ -239,19 +267,7 @@ public class RecordBin { return false; } - int requiredRecordCount; - final Optional recordCountAttribute = thresholds.getRecordCountAttribute(); - if (recordCountAttribute.isPresent()) { - final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get()); - try { - requiredRecordCount = Integer.parseInt(recordCountValue); - } catch (final NumberFormatException e) { - requiredRecordCount = 1; - } - } else { - requiredRecordCount = thresholds.getMinRecords(); - } - + final int requiredRecordCount = getMinimumRecordCount(); return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes()); } finally { readLock.unlock(); @@ -386,11 +402,11 @@ public class RecordBin { attributes.putAll(writeResult.getAttributes()); attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType()); - attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size())); - attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge())); + attributes.put(MergeRecord.MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size())); + attributes.put(MergeRecord.MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge())); merged = session.putAllAttributes(merged, attributes); - flowFiles.stream().forEach(ff -> session.putAttribute(ff, "merge.uuid", merged.getAttribute(CoreAttributes.UUID.key()))); + flowFiles.forEach(ff -> session.putAttribute(ff, MergeRecord.MERGE_UUID_ATTRIBUTE, merged.getAttribute(CoreAttributes.UUID.key()))); session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason); session.transfer(merged, MergeRecord.REL_MERGED); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java index 312832e0ae..d1dde2a360 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java @@ -26,8 +26,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processors.standard.MergeContent; import org.apache.nifi.processors.standard.MergeRecord; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import java.io.IOException; @@ -109,12 +107,9 @@ public class RecordBinManager { * @param block if another thread is already writing to the desired bin, passing true for this parameter will block until the other thread(s) have finished so * that the records can still be added to the desired bin. Passing false will result in moving on to another bin. * - * @throws SchemaNotFoundException if unable to find the schema for the record writer - * @throws MalformedRecordException if unable to read a record * @throws IOException if there is an IO problem reading from the stream or writing to the stream */ - public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block) - throws IOException, MalformedRecordException, SchemaNotFoundException { + public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block) throws IOException { final List currentBins; lock.lock(); @@ -244,7 +239,7 @@ public class RecordBinManager { } public int completeFullEnoughBins() throws IOException { - return handleCompletedBins(bin -> bin.isFullEnough()); + return handleCompletedBins(RecordBin::isFullEnough); } private int handleCompletedBins(final Predicate completionTest) throws IOException { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java index 4ba57afa11..c54bf2ad70 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java @@ -44,7 +44,7 @@ public class TestMergeRecord { runner = TestRunners.newTestRunner(new MergeRecord()); readerService = new CommaSeparatedRecordReader(); - writerService = new MockRecordWriter("header", false); + writerService = new MockRecordWriter("header", false, true); runner.addControllerService("reader", readerService); @@ -57,6 +57,25 @@ public class TestMergeRecord { runner.setProperty(MergeRecord.RECORD_WRITER, "writer"); } + @Test + public void testSmallOutputIsFlushed() { + runner.setProperty(MergeRecord.MIN_RECORDS, "1"); + runner.setProperty(MergeRecord.MAX_RECORDS, "1"); + + runner.enqueue("Name, Age\nJohn, 35\nJane, 34"); + + runner.run(1); + runner.assertTransferCount(MergeRecord.REL_MERGED, 1); + runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0); + mff.assertAttributeEquals("record.count", "2"); + mff.assertContentEquals("header\nJohn,35\nJane,34\n"); + + runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach( + ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE))); + } + @Test public void testMergeSimple() { runner.setProperty(MergeRecord.MIN_RECORDS, "2"); @@ -73,7 +92,7 @@ public class TestMergeRecord { mff.assertAttributeEquals("record.count", "2"); mff.assertContentEquals("header\nJohn,35\nJane,34\n"); - runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).stream().forEach( + runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach( ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE))); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java index 734168ba41..57ec05a64a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java @@ -135,8 +135,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic this.configurationContext = context; final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue(); - final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class); - this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService); + if (strategy != null) { + final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class); + this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService); + } } @Override