diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java index 916fd76116..c0c9d180f1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java @@ -38,10 +38,13 @@ import org.apache.nifi.repository.schema.Repetition; import org.apache.nifi.repository.schema.SchemaRecordReader; import org.apache.nifi.repository.schema.SchemaRecordWriter; import org.apache.nifi.repository.schema.SimpleRecordField; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.wali.SerDe; import org.wali.UpdateType; public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe { + private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class); private static final int MAX_ENCODING_VERSION = 1; private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1; @@ -154,7 +157,19 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER); final FlowFileQueue queue = getFlowFileQueue(queueId); - return new StandardRepositoryRecord(queue, flowFileRecord); + final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, flowFileRecord); + requireFlowFileQueue(repoRecord, queueId); + return repoRecord; + } + + private void requireFlowFileQueue(final StandardRepositoryRecord repoRecord, final String queueId) { + if (queueId == null || queueId.trim().isEmpty()) { + logger.warn("{} does not have a Queue associated with it; this record will be discarded", repoRecord.getCurrent()); + repoRecord.markForAbort(); + } else if (repoRecord.getOriginalQueue() == null) { + logger.warn("{} maps to unknown Queue {}; this record will be discarded", repoRecord.getCurrent(), queueId); + repoRecord.markForAbort(); + } } private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) { @@ -189,6 +204,9 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement final StandardRepositoryRecord repoRecord = createRecord(record); final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); repoRecord.setSwapLocation(swapLocation); + + final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER); + requireFlowFileQueue(repoRecord, queueId); return repoRecord; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 2a323de6d0..071527ca1f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -354,6 +354,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } // Determine the next sequence number for FlowFiles + int numFlowFilesMissingQueue = 0; long maxId = minimumSequenceNumber; for (final RepositoryRecord record : recordList) { final long recordId = serdeFactory.getRecordIdentifier(record); @@ -363,7 +364,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis final FlowFileRecord flowFile = record.getCurrent(); final FlowFileQueue queue = record.getOriginalQueue(); - if (queue != null) { + if (queue == null) { + numFlowFilesMissingQueue++; + } else { queue.put(flowFile); } } @@ -371,7 +374,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will // return the appropriate number. flowFileSequenceGenerator.set(maxId + 1); - logger.info("Successfully restored {} FlowFiles", recordList.size()); + logger.info("Successfully restored {} FlowFiles", recordList.size() - numFlowFilesMissingQueue); + if (numFlowFilesMissingQueue > 0) { + logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue); + } final Runnable checkpointRunnable = new Runnable() { @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java index 9804decc88..5fe48899f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java @@ -65,7 +65,7 @@ public class RepositoryRecordFieldMap implements Record { return contentClaimFieldMap; case RepositoryRecordSchema.QUEUE_IDENTIFIER: final FlowFileQueue queue = record.getDestination() == null ? record.getOriginalQueue() : record.getDestination(); - return queue.getIdentifier(); + return queue == null ? null : queue.getIdentifier(); default: return null; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java index 5887c8af05..f99b5d9957 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java @@ -59,6 +59,7 @@ public class RepositoryRecordSchema { final List createOrUpdateFields = new ArrayList<>(); createOrUpdateFields.add(ACTION_TYPE_FIELD); createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()); + createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE)); final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields);