NIFI-3088: Ensure that on recovery of FlowFile Repository, if we find a FlowFile that maps to an unknown queue that we log a warning that the queue is missing and drop the FlowFile, rather than throwing an NPE

This closes #1266

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Mark Payne 2016-11-23 10:33:06 -05:00 committed by jpercivall
parent 22143e386d
commit 91ff810dba
4 changed files with 29 additions and 4 deletions

View File

@ -38,10 +38,13 @@ import org.apache.nifi.repository.schema.Repetition;
import org.apache.nifi.repository.schema.SchemaRecordReader;
import org.apache.nifi.repository.schema.SchemaRecordWriter;
import org.apache.nifi.repository.schema.SimpleRecordField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDe;
import org.wali.UpdateType;
public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
private static final int MAX_ENCODING_VERSION = 1;
private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1;
@ -154,7 +157,19 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
final FlowFileQueue queue = getFlowFileQueue(queueId);
return new StandardRepositoryRecord(queue, flowFileRecord);
final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, flowFileRecord);
requireFlowFileQueue(repoRecord, queueId);
return repoRecord;
}
private void requireFlowFileQueue(final StandardRepositoryRecord repoRecord, final String queueId) {
if (queueId == null || queueId.trim().isEmpty()) {
logger.warn("{} does not have a Queue associated with it; this record will be discarded", repoRecord.getCurrent());
repoRecord.markForAbort();
} else if (repoRecord.getOriginalQueue() == null) {
logger.warn("{} maps to unknown Queue {}; this record will be discarded", repoRecord.getCurrent(), queueId);
repoRecord.markForAbort();
}
}
private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) {
@ -189,6 +204,9 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
final StandardRepositoryRecord repoRecord = createRecord(record);
final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
repoRecord.setSwapLocation(swapLocation);
final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
requireFlowFileQueue(repoRecord, queueId);
return repoRecord;
}

View File

@ -354,6 +354,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
// Determine the next sequence number for FlowFiles
int numFlowFilesMissingQueue = 0;
long maxId = minimumSequenceNumber;
for (final RepositoryRecord record : recordList) {
final long recordId = serdeFactory.getRecordIdentifier(record);
@ -363,7 +364,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
final FlowFileRecord flowFile = record.getCurrent();
final FlowFileQueue queue = record.getOriginalQueue();
if (queue != null) {
if (queue == null) {
numFlowFilesMissingQueue++;
} else {
queue.put(flowFile);
}
}
@ -371,7 +374,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
// return the appropriate number.
flowFileSequenceGenerator.set(maxId + 1);
logger.info("Successfully restored {} FlowFiles", recordList.size());
logger.info("Successfully restored {} FlowFiles", recordList.size() - numFlowFilesMissingQueue);
if (numFlowFilesMissingQueue > 0) {
logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
}
final Runnable checkpointRunnable = new Runnable() {
@Override

View File

@ -65,7 +65,7 @@ public class RepositoryRecordFieldMap implements Record {
return contentClaimFieldMap;
case RepositoryRecordSchema.QUEUE_IDENTIFIER:
final FlowFileQueue queue = record.getDestination() == null ? record.getOriginalQueue() : record.getDestination();
return queue.getIdentifier();
return queue == null ? null : queue.getIdentifier();
default:
return null;
}

View File

@ -59,6 +59,7 @@ public class RepositoryRecordSchema {
final List<RecordField> createOrUpdateFields = new ArrayList<>();
createOrUpdateFields.add(ACTION_TYPE_FIELD);
createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE));
final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields);