NIFI-5709, NIFI-5710: Addressed issue that causes NiFi to not be able to read provenance events when the a new FlowFile is created and then auto-terminated in the same session; minor bug fixes outlined in NIFI-5710

This closes #3083
This commit is contained in:
Mark Payne 2018-10-16 14:27:29 -04:00 committed by Matt Gilman
parent 11fd67cd1e
commit 4d21f9b34e
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
5 changed files with 86 additions and 8 deletions

View File

@ -61,6 +61,7 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -731,7 +732,7 @@ public class StatusMerger {
gcDiagnosticsDto.setMemoryManagerName(memoryManagerName);
final List<GCDiagnosticsSnapshotDTO> gcDiagnosticsSnapshots = new ArrayList<>(snapshotMap.values());
Collections.sort(gcDiagnosticsSnapshots, (a, b) -> a.getTimestamp().compareTo(b.getTimestamp()));
gcDiagnosticsSnapshots.sort(Comparator.comparing(GCDiagnosticsSnapshotDTO::getTimestamp).reversed());
gcDiagnosticsDto.setSnapshots(gcDiagnosticsSnapshots);
gcDiagnosticsDtos.add(gcDiagnosticsDto);

View File

@ -68,6 +68,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
@ -465,6 +466,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
}
final Map<String, String> attributes = readAttributes(metadataIn);
final String sourceSystemUuid = attributes.get(CoreAttributes.UUID.key());
logger.debug("Received Attributes {} from Peer {}", attributes, peerDescription);
@ -476,6 +478,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(flowFileRepository.getNextFlowFileSequence())
.addAttributes(attributes)
.addAttribute(CoreAttributes.UUID.key(), UUID.randomUUID().toString())
.contentClaim(contentClaimTriple.getContentClaim())
.contentClaimOffset(contentClaimTriple.getClaimOffset())
.size(contentClaimTriple.getContentLength())
@ -484,7 +487,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
.build();
logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength());
return new RemoteFlowFileRecord(attributes.get(CoreAttributes.UUID.key()), flowFileRecord);
return new RemoteFlowFileRecord(sourceSystemUuid, flowFileRecord);
}
private Map<String, String> readAttributes(final DataInputStream in) throws IOException {

View File

@ -48,7 +48,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
*/
public StandardRepositoryRecord(final FlowFileQueue originalQueue) {
this(originalQueue, null);
this.type = RepositoryRecordType.CREATE;
setType(RepositoryRecordType.CREATE);
}
/**
@ -59,13 +59,13 @@ public class StandardRepositoryRecord implements RepositoryRecord {
*/
public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord) {
this(originalQueue, originalFlowFileRecord, null);
this.type = RepositoryRecordType.UPDATE;
setType(RepositoryRecordType.UPDATE);
}
public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord, final String swapLocation) {
this.originalQueue = originalQueue;
this.originalFlowFileRecord = originalFlowFileRecord;
this.type = RepositoryRecordType.SWAP_OUT;
setType(RepositoryRecordType.SWAP_OUT);
this.swapLocation = swapLocation;
this.originalAttributes = originalFlowFileRecord == null ? Collections.emptyMap() : originalFlowFileRecord.getAttributes();
}
@ -96,7 +96,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
public void setSwapLocation(final String swapLocation) {
this.swapLocation = swapLocation;
if (type != RepositoryRecordType.SWAP_OUT) {
type = RepositoryRecordType.SWAP_IN; // we are swapping in a new record
setType(RepositoryRecordType.SWAP_IN); // we are swapping in a new record
}
}
@ -159,7 +159,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
}
public void markForAbort() {
type = RepositoryRecordType.CONTENTMISSING;
setType(RepositoryRecordType.CONTENTMISSING);
}
@Override
@ -168,7 +168,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
}
public void markForDelete() {
type = RepositoryRecordType.DELETE;
setType(RepositoryRecordType.DELETE);
}
public boolean isMarkedForDelete() {
@ -222,6 +222,20 @@ public class StandardRepositoryRecord implements RepositoryRecord {
return updatedAttributes == null ? Collections.emptyMap() : updatedAttributes;
}
private void setType(final RepositoryRecordType newType) {
if (newType == this.type) {
return;
}
if (this.type == RepositoryRecordType.CREATE) {
// Because we don't copy updated attributes to `this.updatedAttributes` for CREATE records, we need to ensure
// that if a record is changed from CREATE to anything else that we do properly update the `this.updatedAttributes` field.
this.updatedAttributes = new HashMap<>(getCurrent().getAttributes());
}
this.type = newType;
}
@Override
public String toString() {
return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]";

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
public class TestStandardRepositoryRecord {
@Test
public void testUpdatedAttributesMaintainedWhenFlowFileRemoved() {
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
final Map<String, String> updatedAttributes = new HashMap<>();
updatedAttributes.put("abc", "xyz");
updatedAttributes.put("hello", "123");
final String uuid = UUID.randomUUID().toString();
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", uuid)
.addAttributes(updatedAttributes)
.build();
record.setWorking(flowFileRecord, updatedAttributes);
final Map<String, String> updatedWithId = new HashMap<>(updatedAttributes);
updatedWithId.put("uuid", uuid);
assertEquals(updatedWithId, record.getUpdatedAttributes());
record.markForDelete();
assertEquals(updatedWithId, record.getUpdatedAttributes());
}
}

View File

@ -3292,6 +3292,10 @@ public final class DtoFactory {
}
final String serviceId = entry.getValue();
if (serviceId == null) {
continue;
}
final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(serviceId);
if (serviceNode == null) {
continue;
@ -3573,6 +3577,8 @@ public final class DtoFactory {
gcSnapshots.add(snapshotDto);
}
gcSnapshots.sort(Comparator.comparing(GCDiagnosticsSnapshotDTO::getTimestamp).reversed());
final GarbageCollectionDiagnosticsDTO gcDto = new GarbageCollectionDiagnosticsDTO();
gcDto.setMemoryManagerName(memoryManager);
gcDto.setSnapshots(gcSnapshots);