NIFI-1181: Ensure that a FlowFile's uuid cannot be modified by processors

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Mark Payne 2015-11-18 09:04:22 -05:00 committed by Aldrin Piri
parent 2b9b5e008f
commit 93be753301
3 changed files with 66 additions and 13 deletions

View File

@ -247,6 +247,10 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
public Builder removeAttributes(final String... keys) {
if (keys != null) {
for (final String key : keys) {
if (CoreAttributes.UUID.key().equals(key)) {
continue;
}
bAttributes.remove(key);
}
}
@ -256,6 +260,10 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
public Builder removeAttributes(final Set<String> keys) {
if (keys != null) {
for (final String key : keys) {
if (CoreAttributes.UUID.key().equals(key)) {
continue;
}
bAttributes.remove(key);
}
}
@ -267,6 +275,11 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
final Iterator<String> iterator = bAttributes.keySet().iterator();
while (iterator.hasNext()) {
final String key = iterator.next();
if (CoreAttributes.UUID.key().equals(key)) {
continue;
}
if (keyPattern.matcher(key).matches()) {
iterator.remove();
}

View File

@ -1410,14 +1410,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final String originalUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
final Map<String, String> updatedAttributes;
if (attributes.containsKey(CoreAttributes.UUID.key())) {
updatedAttributes = new HashMap<>(attributes);
updatedAttributes.remove(CoreAttributes.UUID.key());
} else {
updatedAttributes = attributes;
}
final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(attributes);
// Ignore the uuid attribute, if passed in
ffBuilder.addAttribute(CoreAttributes.UUID.key(), originalUuid);
final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(updatedAttributes);
final FlowFileRecord newFile = ffBuilder.build();
record.setWorking(newFile, attributes);
record.setWorking(newFile, updatedAttributes);
return newFile;
}
@ -1443,19 +1447,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return flowFile;
}
final Set<String> keysToRemove;
if (keys.contains(CoreAttributes.UUID.key())) {
keysToRemove = new HashSet<>(keys);
keysToRemove.remove(CoreAttributes.UUID.key());
} else {
keysToRemove = keys;
}
final StandardRepositoryRecord record = records.get(flowFile);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build();
final Map<String, String> updatedAttrs = new HashMap<>();
for (final String key : keys) {
if (CoreAttributes.UUID.key().equals(key)) {
continue;
}
updatedAttrs.put(key, null);
}

View File

@ -37,13 +37,16 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@ -58,6 +61,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
@ -469,6 +473,42 @@ public class TestStandardProcessSession {
assertEquals(1, provenanceRepo.getEvents(0L, 100000).size());
}
@Test
public void testUuidAttributeCannotBeUpdated() {
String originalUuid = "11111111-1111-1111-1111-111111111111";
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", originalUuid)
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord1);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final String uuid = CoreAttributes.UUID.key();
final String newUuid = "22222222-2222-2222-2222-222222222222";
flowFile = session.putAttribute(flowFile, uuid, newUuid);
assertEquals(originalUuid, flowFile.getAttribute(uuid));
final Map<String, String> uuidMap = new HashMap<>(1);
uuidMap.put(uuid, newUuid);
flowFile = session.putAllAttributes(flowFile, uuidMap);
assertEquals(originalUuid, flowFile.getAttribute(uuid));
flowFile = session.removeAllAttributes(flowFile, Pattern.compile("uuid"));
assertEquals(originalUuid, flowFile.getAttribute(uuid));
flowFile = session.removeAllAttributes(flowFile, Collections.singleton(uuid));
assertEquals(originalUuid, flowFile.getAttribute(uuid));
flowFile = session.removeAttribute(flowFile, uuid);
assertEquals(originalUuid, flowFile.getAttribute(uuid));
}
@Test
public void testUpdateAttributesThenJoin() throws IOException {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()