mirror of https://github.com/apache/nifi.git
NIFI-4752: Addressed issue with some event types having potentially the wrong FlowFile UUID listed (could have child UUID when it's supposed to have parent flowfile UUID). In testing fix, also found an issue with Search threads not being daemon and Re-Index threads not propertly being shutdown so addressed those as well.
This closes #2390. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
10e3b14433
commit
6153fb6465
|
@ -53,7 +53,7 @@ public class SimpleIndexManager implements IndexManager {
|
||||||
|
|
||||||
public SimpleIndexManager(final RepositoryConfiguration repoConfig) {
|
public SimpleIndexManager(final RepositoryConfiguration repoConfig) {
|
||||||
this.repoConfig = repoConfig;
|
this.repoConfig = repoConfig;
|
||||||
this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index"));
|
this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index", true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -245,11 +245,14 @@ public class LookupTableEventRecord implements Record {
|
||||||
final Map<String, String> previousAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength);
|
final Map<String, String> previousAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength);
|
||||||
final Map<String, String> updatedAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength);
|
final Map<String, String> updatedAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength);
|
||||||
|
|
||||||
|
final List<String> childUuids = (List<String>) record.getFieldValue(EventFieldNames.CHILD_UUIDS);
|
||||||
|
final List<String> parentUuids = (List<String>) record.getFieldValue(EventFieldNames.PARENT_UUIDS);
|
||||||
|
|
||||||
final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
|
final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
|
||||||
builder.setAlternateIdentifierUri((String) record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER));
|
builder.setAlternateIdentifierUri((String) record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER));
|
||||||
builder.setChildUuids((List<String>) record.getFieldValue(EventFieldNames.CHILD_UUIDS));
|
builder.setChildUuids(childUuids);
|
||||||
builder.setDetails((String) record.getFieldValue(EventFieldNames.EVENT_DETAILS));
|
builder.setDetails((String) record.getFieldValue(EventFieldNames.EVENT_DETAILS));
|
||||||
builder.setParentUuids((List<String>) record.getFieldValue(EventFieldNames.PARENT_UUIDS));
|
builder.setParentUuids(parentUuids);
|
||||||
builder.setPreviousAttributes(previousAttributes);
|
builder.setPreviousAttributes(previousAttributes);
|
||||||
builder.setRelationship((String) record.getFieldValue(EventFieldNames.RELATIONSHIP));
|
builder.setRelationship((String) record.getFieldValue(EventFieldNames.RELATIONSHIP));
|
||||||
builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
|
builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
|
||||||
|
@ -263,20 +266,42 @@ public class LookupTableEventRecord implements Record {
|
||||||
|
|
||||||
// Determine the event type
|
// Determine the event type
|
||||||
final Integer eventTypeOrdinal = (Integer) record.getFieldValue(EventFieldNames.EVENT_TYPE);
|
final Integer eventTypeOrdinal = (Integer) record.getFieldValue(EventFieldNames.EVENT_TYPE);
|
||||||
|
ProvenanceEventType eventType;
|
||||||
if (eventTypeOrdinal == null || eventTypeOrdinal > eventTypes.size() || eventTypeOrdinal < 0) {
|
if (eventTypeOrdinal == null || eventTypeOrdinal > eventTypes.size() || eventTypeOrdinal < 0) {
|
||||||
builder.setEventType(ProvenanceEventType.UNKNOWN);
|
eventType = ProvenanceEventType.UNKNOWN;
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
builder.setEventType(ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal)));
|
eventType = ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal));
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
builder.setEventType(ProvenanceEventType.UNKNOWN);
|
eventType = ProvenanceEventType.UNKNOWN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder.setEventType(eventType);
|
||||||
|
|
||||||
|
// Determine appropriate UUID for the event
|
||||||
|
String uuid = null;
|
||||||
|
switch (eventType) {
|
||||||
|
case CLONE:
|
||||||
|
case FORK:
|
||||||
|
case REPLAY:
|
||||||
|
if (parentUuids != null && !parentUuids.isEmpty()) {
|
||||||
|
uuid = parentUuids.get(0);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case JOIN:
|
||||||
|
if (childUuids != null && !childUuids.isEmpty()) {
|
||||||
|
uuid = childUuids.get(0);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (uuid == null) {
|
||||||
|
uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key());
|
||||||
|
if (uuid == null) {
|
||||||
|
uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key());
|
|
||||||
if (uuid == null) {
|
|
||||||
uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key());
|
|
||||||
}
|
|
||||||
builder.setFlowFileUUID(uuid);
|
builder.setFlowFileUUID(uuid);
|
||||||
|
|
||||||
builder.setEventDuration((Integer) record.getFieldValue(EventFieldNames.EVENT_DURATION));
|
builder.setEventDuration((Integer) record.getFieldValue(EventFieldNames.EVENT_DURATION));
|
||||||
|
|
|
@ -632,6 +632,8 @@ public class WriteAheadStorePartition implements EventStorePartition {
|
||||||
logger.error("Failed to re-index Provenance Events for partition " + partitionName, e);
|
logger.error("Failed to re-index Provenance Events for partition " + partitionName, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
executor.shutdown();
|
||||||
|
|
||||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||||
final long seconds = millis / 1000L;
|
final long seconds = millis / 1000L;
|
||||||
final long millisRemainder = millis % 1000L;
|
final long millisRemainder = millis % 1000L;
|
||||||
|
|
Loading…
Reference in New Issue