diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java index 4d6c11d946..d59a81d818 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java @@ -53,7 +53,7 @@ public class SimpleIndexManager implements IndexManager { public SimpleIndexManager(final RepositoryConfiguration repoConfig) { this.repoConfig = repoConfig; - this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index")); + this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index", true)); } @Override diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java index eccff2aefb..2fe16761d9 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java @@ -245,11 +245,14 @@ public class LookupTableEventRecord implements Record { final Map previousAttributes = truncateAttributes((Map) record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength); final Map updatedAttributes = truncateAttributes((Map) record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength); + final List childUuids = (List) record.getFieldValue(EventFieldNames.CHILD_UUIDS); + final List parentUuids = (List) record.getFieldValue(EventFieldNames.PARENT_UUIDS); + final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder(); builder.setAlternateIdentifierUri((String) record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER)); - builder.setChildUuids((List) record.getFieldValue(EventFieldNames.CHILD_UUIDS)); + builder.setChildUuids(childUuids); builder.setDetails((String) record.getFieldValue(EventFieldNames.EVENT_DETAILS)); - builder.setParentUuids((List) record.getFieldValue(EventFieldNames.PARENT_UUIDS)); + builder.setParentUuids(parentUuids); builder.setPreviousAttributes(previousAttributes); builder.setRelationship((String) record.getFieldValue(EventFieldNames.RELATIONSHIP)); builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER)); @@ -263,20 +266,42 @@ public class LookupTableEventRecord implements Record { // Determine the event type final Integer eventTypeOrdinal = (Integer) record.getFieldValue(EventFieldNames.EVENT_TYPE); + ProvenanceEventType eventType; if (eventTypeOrdinal == null || eventTypeOrdinal > eventTypes.size() || eventTypeOrdinal < 0) { - builder.setEventType(ProvenanceEventType.UNKNOWN); + eventType = ProvenanceEventType.UNKNOWN; } else { try { - builder.setEventType(ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal))); + eventType = ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal)); } catch (final Exception e) { - builder.setEventType(ProvenanceEventType.UNKNOWN); + eventType = ProvenanceEventType.UNKNOWN; + } + } + builder.setEventType(eventType); + + // Determine appropriate UUID for the event + String uuid = null; + switch (eventType) { + case CLONE: + case FORK: + case REPLAY: + if (parentUuids != null && !parentUuids.isEmpty()) { + uuid = parentUuids.get(0); + } + break; + case JOIN: + if (childUuids != null && !childUuids.isEmpty()) { + uuid = childUuids.get(0); + } + break; + } + + if (uuid == null) { + uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key()); + if (uuid == null) { + uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key()); } } - String uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key()); - if (uuid == null) { - uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key()); - } builder.setFlowFileUUID(uuid); builder.setEventDuration((Integer) record.getFieldValue(EventFieldNames.EVENT_DURATION)); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index 6c5cc8d1ad..22d2a5f07a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -632,6 +632,8 @@ public class WriteAheadStorePartition implements EventStorePartition { logger.error("Failed to re-index Provenance Events for partition " + partitionName, e); } + executor.shutdown(); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); final long seconds = millis / 1000L; final long millisRemainder = millis % 1000L;