NIFI-6033, NIFI-6034, NIFI-6035, NIFI-6036, NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If multiple FlowFiles were written to same Content Claim, and a Processor attempted to read two of them wi within a single session, it would seek to the wrong part of the content or else throw a ContentNotFoundException. Updated logic for considering a processor to be 'running' / having 'active threads' if the processor is invalid upon NiFi restart but scheduled to run. Fixed NPE in FreeFormTextWriter. If MergeRecord reaches minimum number of records, flush writer after writing content out so that its minimum size can accurately be checked.

This closes #3309.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-02-14 15:11:59 -05:00 committed by Bryan Bende
parent f61947627e
commit b508d6bfbc
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
11 changed files with 101 additions and 106 deletions

View File

@ -17,11 +17,6 @@
package org.apache.nifi.serialization.record;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -30,27 +25,43 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
private final String header;
private final int failAfterN;
private final boolean quoteValues;
private final boolean bufferOutput;
public MockRecordWriter() {
this(null);
}
public MockRecordWriter(final String header) {
this(header, true, -1);
this(header, true, -1, false);
}
public MockRecordWriter(final String header, final boolean quoteValues) {
this(header, quoteValues, -1);
this(header, quoteValues, false);
}
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
this(header, quoteValues, failAfterN, false);
}
public MockRecordWriter(final String header, final boolean quoteValues, final boolean bufferOutput) {
this(header, quoteValues, -1, bufferOutput);
}
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput) {
this.header = header;
this.quoteValues = quoteValues;
this.failAfterN = failAfterN;
this.bufferOutput = bufferOutput;
}
@Override
@ -59,7 +70,9 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream rawOut) {
final OutputStream out = bufferOutput ? new BufferedOutputStream(rawOut) : rawOut;
return new RecordSetWriter() {
private int recordCount = 0;
private boolean headerWritten = false;

View File

@ -204,8 +204,12 @@ public abstract class AbstractComponentNode implements ComponentNode {
}
}
logger.debug("Resetting Validation State of {} due to setting properties", this);
resetValidationState();
if (isTriggerValidation()) {
logger.debug("Resetting Validation State of {} due to setting properties", this);
resetValidationState();
} else {
logger.debug("Properties set for {} but not resettingn validation state because validation is paused", this);
}
} finally {
lock.unlock();
}
@ -642,13 +646,8 @@ public abstract class AbstractComponentNode implements ComponentNode {
public void resumeValidationTrigger() {
triggerValidation = true;
final ValidationStatus validationStatus = getValidationStatus();
if (validationStatus == ValidationStatus.VALIDATING) {
logger.debug("Resuming Triggering of Validation State for {}; status is VALIDATING so will trigger async validation now", this);
validationTrigger.triggerAsync(this);
} else {
logger.debug("Resuming Triggering of Validation State for {}; status is {} so will not trigger async validation now", this, validationStatus);
}
logger.debug("Resuming Triggering of Validation State for {}; Resetting validation state", this);
resetValidationState();
}
private boolean isTriggerValidation() {

View File

@ -1367,7 +1367,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
if (starting) { // will ensure that the Processor represented by this node can only be started once
hasActiveThreads = true;
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMillis, processContext, schedulingAgentCallback);
} else {
final String procName = processorRef.get().toString();
@ -1395,7 +1394,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
final Map<Long, ThreadInfo> threadInfoMap = Stream.of(infos)
.collect(Collectors.toMap(info -> info.getThreadId(), Function.identity(), (a, b) -> a));
.collect(Collectors.toMap(ThreadInfo::getThreadId, Function.identity(), (a, b) -> a));
final List<ActiveThreadInfo> threadList = new ArrayList<>(activeThreads.size());
for (final Map.Entry<Thread, ActiveTask> entry : activeThreads.entrySet()) {
@ -1509,6 +1508,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
try {
hasActiveThreads = true;
activateThread();
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);

View File

@ -2173,8 +2173,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) {
if (currentReadClaim == claim) {
if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) {
final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed();
if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) {
final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset();
if (bytesToSkip > 0) {
StreamUtils.skip(currentReadClaimStream, bytesToSkip);
}

View File

@ -307,7 +307,7 @@ public class LoadBalancedQueueIT {
}
}
@Test(timeout = 60_000)
@Test(timeout = 90_000)
public void testFailover() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
@ -371,7 +371,7 @@ public class LoadBalancedQueueIT {
final int expectedFlowFileReceiveCount = flowFilesPerNode + flowFilesPerNode / 2;
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
while (serverRepoRecords.size() < expectedFlowFileReceiveCount && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}

View File

@ -485,21 +485,6 @@ public class ProcessorLifecycleIT {
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
}
/**
* Validate that processor will not be validated on failing
* PropertyDescriptor validation.
*/
@Test(expected = IllegalStateException.class)
public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
processScheduler.startProcessor(testProcNode, true);
fail();
}
/**
* Validate that processor will not be validated on failing
@ -527,41 +512,6 @@ public class ProcessorLifecycleIT {
fail();
}
/**
* The successful processor start with ControllerService dependency.
*/
@Test
public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "foo",
fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
testGroup.addControllerService(testServiceNode);
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testGroup.addProcessor(testProcNode);
properties.put("S", testServiceNode.getIdentifier());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
testProcessor.withService = true;
this.noop(testProcessor);
testServiceNode.performValidation();
processScheduler.enableControllerService(testServiceNode);
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
}
/**
* Scenario where onTrigger() is executed with random delay limited to

View File

@ -314,7 +314,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
final boolean block;
if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
block = true;
} else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
block = true;
@ -378,12 +378,12 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) {
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
}
final Optional<String> optionalText = schema.getSchemaText();
final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString();
final String schemaText = optionalText.orElseGet(() -> AvroTypeUtil.extractAvroSchema(schema).toString());
final String groupId;
final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();

View File

@ -46,8 +46,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class RecordBin {
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
private final ComponentLog logger;
private final ProcessSession session;
@ -70,6 +68,8 @@ public class RecordBin {
private static final AtomicLong idGenerator = new AtomicLong(0L);
private final long id = idGenerator.getAndIncrement();
private volatile int requiredRecordCount = -1;
public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) {
this.session = session;
@ -95,7 +95,6 @@ public class RecordBin {
}
public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException {
if (isComplete()) {
logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
return false;
@ -148,6 +147,12 @@ public class RecordBin {
flowFileMigrated = true;
this.flowFiles.add(flowFile);
if (recordCount >= getMinimumRecordCount()) {
// If we have met our minimum record count, we need to flush so that when we reach the desired number of bytes
// the bin is considered 'full enough'.
recordWriter.flush();
}
if (isFull()) {
logger.debug(this + " is now full. Completing bin.");
complete("Bin is full");
@ -232,6 +237,29 @@ public class RecordBin {
}
}
private int getMinimumRecordCount() {
final int currentCount = requiredRecordCount;
if (currentCount > -1) {
return currentCount;
}
int requiredCount;
final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
if (recordCountAttribute.isPresent()) {
final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
try {
requiredCount = Integer.parseInt(recordCountValue);
} catch (final NumberFormatException e) {
requiredCount = 1;
}
} else {
requiredCount = thresholds.getMinRecords();
}
this.requiredRecordCount = requiredCount;
return requiredCount;
}
public boolean isFullEnough() {
readLock.lock();
try {
@ -239,19 +267,7 @@ public class RecordBin {
return false;
}
int requiredRecordCount;
final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
if (recordCountAttribute.isPresent()) {
final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
try {
requiredRecordCount = Integer.parseInt(recordCountValue);
} catch (final NumberFormatException e) {
requiredRecordCount = 1;
}
} else {
requiredRecordCount = thresholds.getMinRecords();
}
final int requiredRecordCount = getMinimumRecordCount();
return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes());
} finally {
readLock.unlock();
@ -386,11 +402,11 @@ public class RecordBin {
attributes.putAll(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType());
attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
attributes.put(MergeRecord.MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
attributes.put(MergeRecord.MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
merged = session.putAllAttributes(merged, attributes);
flowFiles.stream().forEach(ff -> session.putAttribute(ff, "merge.uuid", merged.getAttribute(CoreAttributes.UUID.key())));
flowFiles.forEach(ff -> session.putAttribute(ff, MergeRecord.MERGE_UUID_ATTRIBUTE, merged.getAttribute(CoreAttributes.UUID.key())));
session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason);
session.transfer(merged, MergeRecord.REL_MERGED);

View File

@ -26,8 +26,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.standard.MergeContent;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import java.io.IOException;
@ -109,12 +107,9 @@ public class RecordBinManager {
* @param block if another thread is already writing to the desired bin, passing <code>true</code> for this parameter will block until the other thread(s) have finished so
* that the records can still be added to the desired bin. Passing <code>false</code> will result in moving on to another bin.
*
* @throws SchemaNotFoundException if unable to find the schema for the record writer
* @throws MalformedRecordException if unable to read a record
* @throws IOException if there is an IO problem reading from the stream or writing to the stream
*/
public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block)
throws IOException, MalformedRecordException, SchemaNotFoundException {
public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block) throws IOException {
final List<RecordBin> currentBins;
lock.lock();
@ -244,7 +239,7 @@ public class RecordBinManager {
}
public int completeFullEnoughBins() throws IOException {
return handleCompletedBins(bin -> bin.isFullEnough());
return handleCompletedBins(RecordBin::isFullEnough);
}
private int handleCompletedBins(final Predicate<RecordBin> completionTest) throws IOException {

View File

@ -44,7 +44,7 @@ public class TestMergeRecord {
runner = TestRunners.newTestRunner(new MergeRecord());
readerService = new CommaSeparatedRecordReader();
writerService = new MockRecordWriter("header", false);
writerService = new MockRecordWriter("header", false, true);
runner.addControllerService("reader", readerService);
@ -57,6 +57,25 @@ public class TestMergeRecord {
runner.setProperty(MergeRecord.RECORD_WRITER, "writer");
}
@Test
public void testSmallOutputIsFlushed() {
runner.setProperty(MergeRecord.MIN_RECORDS, "1");
runner.setProperty(MergeRecord.MAX_RECORDS, "1");
runner.enqueue("Name, Age\nJohn, 35\nJane, 34");
runner.run(1);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
mff.assertAttributeEquals("record.count", "2");
mff.assertContentEquals("header\nJohn,35\nJane,34\n");
runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
}
@Test
public void testMergeSimple() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
@ -73,7 +92,7 @@ public class TestMergeRecord {
mff.assertAttributeEquals("record.count", "2");
mff.assertContentEquals("header\nJohn,35\nJane,34\n");
runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).stream().forEach(
runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
}

View File

@ -135,8 +135,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
this.configurationContext = context;
final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
if (strategy != null) {
final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
}
}
@Override