From 4d21f9b34eb254014ba1e4a997417236c914fd08 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 16 Oct 2018 14:27:29 -0400 Subject: [PATCH] NIFI-5709, NIFI-5710: Addressed issue that causes NiFi to not be able to read provenance events when the a new FlowFile is created and then auto-terminated in the same session; minor bug fixes outlined in NIFI-5710 This closes #3083 --- .../nifi/cluster/manager/StatusMerger.java | 3 +- .../server/StandardLoadBalanceProtocol.java | 5 +- .../repository/StandardRepositoryRecord.java | 26 ++++++--- .../TestStandardRepositoryRecord.java | 54 +++++++++++++++++++ .../apache/nifi/web/api/dto/DtoFactory.java | 6 +++ 5 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java index b44f8d6741..0043ca0685 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java @@ -61,6 +61,7 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -731,7 +732,7 @@ public class StatusMerger { gcDiagnosticsDto.setMemoryManagerName(memoryManagerName); final List gcDiagnosticsSnapshots = new ArrayList<>(snapshotMap.values()); - Collections.sort(gcDiagnosticsSnapshots, (a, b) -> a.getTimestamp().compareTo(b.getTimestamp())); + gcDiagnosticsSnapshots.sort(Comparator.comparing(GCDiagnosticsSnapshotDTO::getTimestamp).reversed()); gcDiagnosticsDto.setSnapshots(gcDiagnosticsSnapshots); gcDiagnosticsDtos.add(gcDiagnosticsDto); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index d6beff3531..0f032dfefb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -68,6 +68,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.zip.CRC32; @@ -465,6 +466,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { } final Map attributes = readAttributes(metadataIn); + final String sourceSystemUuid = attributes.get(CoreAttributes.UUID.key()); logger.debug("Received Attributes {} from Peer {}", attributes, peerDescription); @@ -476,6 +478,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .id(flowFileRepository.getNextFlowFileSequence()) .addAttributes(attributes) + .addAttribute(CoreAttributes.UUID.key(), UUID.randomUUID().toString()) .contentClaim(contentClaimTriple.getContentClaim()) .contentClaimOffset(contentClaimTriple.getClaimOffset()) .size(contentClaimTriple.getContentLength()) @@ -484,7 +487,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { .build(); logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength()); - return new RemoteFlowFileRecord(attributes.get(CoreAttributes.UUID.key()), flowFileRecord); + return new RemoteFlowFileRecord(sourceSystemUuid, flowFileRecord); } private Map readAttributes(final DataInputStream in) throws IOException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java index c960902960..4aeb473081 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java @@ -48,7 +48,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { */ public StandardRepositoryRecord(final FlowFileQueue originalQueue) { this(originalQueue, null); - this.type = RepositoryRecordType.CREATE; + setType(RepositoryRecordType.CREATE); } /** @@ -59,13 +59,13 @@ public class StandardRepositoryRecord implements RepositoryRecord { */ public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord) { this(originalQueue, originalFlowFileRecord, null); - this.type = RepositoryRecordType.UPDATE; + setType(RepositoryRecordType.UPDATE); } public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord, final String swapLocation) { this.originalQueue = originalQueue; this.originalFlowFileRecord = originalFlowFileRecord; - this.type = RepositoryRecordType.SWAP_OUT; + setType(RepositoryRecordType.SWAP_OUT); this.swapLocation = swapLocation; this.originalAttributes = originalFlowFileRecord == null ? Collections.emptyMap() : originalFlowFileRecord.getAttributes(); } @@ -96,7 +96,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { public void setSwapLocation(final String swapLocation) { this.swapLocation = swapLocation; if (type != RepositoryRecordType.SWAP_OUT) { - type = RepositoryRecordType.SWAP_IN; // we are swapping in a new record + setType(RepositoryRecordType.SWAP_IN); // we are swapping in a new record } } @@ -159,7 +159,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { } public void markForAbort() { - type = RepositoryRecordType.CONTENTMISSING; + setType(RepositoryRecordType.CONTENTMISSING); } @Override @@ -168,7 +168,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { } public void markForDelete() { - type = RepositoryRecordType.DELETE; + setType(RepositoryRecordType.DELETE); } public boolean isMarkedForDelete() { @@ -222,6 +222,20 @@ public class StandardRepositoryRecord implements RepositoryRecord { return updatedAttributes == null ? Collections.emptyMap() : updatedAttributes; } + private void setType(final RepositoryRecordType newType) { + if (newType == this.type) { + return; + } + + if (this.type == RepositoryRecordType.CREATE) { + // Because we don't copy updated attributes to `this.updatedAttributes` for CREATE records, we need to ensure + // that if a record is changed from CREATE to anything else that we do properly update the `this.updatedAttributes` field. + this.updatedAttributes = new HashMap<>(getCurrent().getAttributes()); + } + + this.type = newType; + } + @Override public String toString() { return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java new file mode 100644 index 0000000000..61e23fe36b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +public class TestStandardRepositoryRecord { + + @Test + public void testUpdatedAttributesMaintainedWhenFlowFileRemoved() { + final StandardRepositoryRecord record = new StandardRepositoryRecord(null); + + final Map updatedAttributes = new HashMap<>(); + updatedAttributes.put("abc", "xyz"); + updatedAttributes.put("hello", "123"); + + final String uuid = UUID.randomUUID().toString(); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", uuid) + .addAttributes(updatedAttributes) + .build(); + + record.setWorking(flowFileRecord, updatedAttributes); + + final Map updatedWithId = new HashMap<>(updatedAttributes); + updatedWithId.put("uuid", uuid); + + assertEquals(updatedWithId, record.getUpdatedAttributes()); + + record.markForDelete(); + + assertEquals(updatedWithId, record.getUpdatedAttributes()); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 9020ae0029..1393608a8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -3292,6 +3292,10 @@ public final class DtoFactory { } final String serviceId = entry.getValue(); + if (serviceId == null) { + continue; + } + final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(serviceId); if (serviceNode == null) { continue; @@ -3573,6 +3577,8 @@ public final class DtoFactory { gcSnapshots.add(snapshotDto); } + gcSnapshots.sort(Comparator.comparing(GCDiagnosticsSnapshotDTO::getTimestamp).reversed()); + final GarbageCollectionDiagnosticsDTO gcDto = new GarbageCollectionDiagnosticsDTO(); gcDto.setMemoryManagerName(memoryManager); gcDto.setSnapshots(gcSnapshots);