NIFI-4262 - MergeContent - option to add merged uuid in original flow files

This closes #2056

Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
Pierre Villard 2017-08-03 23:40:48 +02:00 committed by zenfenan
parent 6e067734d5
commit 05d7b6c6e7
4 changed files with 89 additions and 10 deletions

View File

@ -157,7 +157,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
* will be transferred to failure and the ProcessSession provided by the 'session'
* argument rolled back
*/
protected abstract boolean processBin(Bin unmodifiableBin, ProcessContext context) throws ProcessException;
protected abstract BinProcessingResult processBin(Bin unmodifiableBin, ProcessContext context) throws ProcessException;
/**
* Allows additional custom validation to be done. This will be called from the parent's customValidation method.
@ -221,9 +221,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
int processedBins = 0;
Bin bin;
while ((bin = readyBins.poll()) != null) {
boolean binAlreadyCommitted;
BinProcessingResult binProcessingResult;
try {
binAlreadyCommitted = this.processBin(bin, context);
binProcessingResult = this.processBin(bin, context);
} catch (final ProcessException e) {
logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e});
@ -241,8 +241,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
// If this bin's session has been committed, move on.
if (!binAlreadyCommitted) {
if (!binProcessingResult.isCommitted()) {
final ProcessSession binSession = bin.getSession();
bin.getContents().stream().forEach(ff -> binSession.putAllAttributes(ff, binProcessingResult.getAttributes()));
binSession.transfer(bin.getContents(), REL_ORIGINAL);
binSession.commit();
}
@ -361,4 +362,4 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
return problems;
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.bin;
import java.util.HashMap;
import java.util.Map;
/**
* Convenience class used to add attributes in the origin flow files once the bin was committed
*/
public class BinProcessingResult {
/**
* <code>true</code> if the processed bin was already committed. E.g., in case of a failure, the implementation
* may choose to transfer all binned files to Failure and commit their sessions. If
* false, the processBins() method will transfer the files to Original and commit the sessions
*/
private boolean isCommitted;
/**
* Map of attributes to add to original flow files
*/
private Map<String, String> attributes;
public BinProcessingResult(boolean isCommitted) {
this.setCommitted(isCommitted);
this.setAttributes(new HashMap<String, String>());
}
public BinProcessingResult(boolean isCommitted, Map<String, String> attributes) {
this.setCommitted(isCommitted);
this.setAttributes(attributes);
}
public boolean isCommitted() {
return isCommitted;
}
public void setCommitted(boolean isCommitted) {
this.isCommitted = isCommitted;
}
public Map<String, String> getAttributes() {
return attributes;
}
public void setAttributes(Map<String, String> attributes) {
this.attributes = attributes;
}
}

View File

@ -86,6 +86,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.bin.Bin;
import org.apache.nifi.processor.util.bin.BinFiles;
import org.apache.nifi.processor.util.bin.BinManager;
import org.apache.nifi.processor.util.bin.BinProcessingResult;
import org.apache.nifi.processors.standard.merge.AttributeStrategy;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
@ -132,7 +133,8 @@ import org.apache.nifi.util.FlowFilePackagerV3;
+ "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"),
@WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
@WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"),
@WritesAttribute(attribute = "merge.uuid", description = "UUID of the merged flow file that will be added to the original flow files attributes.")})
@SeeAlso({SegmentContent.class, MergeRecord.class})
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "While content is not stored in memory, the FlowFiles' attributes are. " +
"The configuration of MergeContent (maximum bin size, maximum group size, maximum bin age, max number of entries) will influence how much " +
@ -242,6 +244,7 @@ public class MergeContent extends BinFiles {
public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid";
public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder()
.name("Merge Strategy")
@ -438,8 +441,8 @@ public class MergeContent extends BinFiles {
}
@Override
protected boolean processBin(final Bin bin, final ProcessContext context) throws ProcessException {
protected BinProcessingResult processBin(final Bin bin, final ProcessContext context) throws ProcessException {
final BinProcessingResult binProcessingResult = new BinProcessingResult(true);
final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
MergeBin merger;
switch (mergeFormat) {
@ -483,7 +486,7 @@ public class MergeContent extends BinFiles {
binSession.transfer(contents, REL_FAILURE);
binSession.commit();
return true;
return binProcessingResult;
}
Collections.sort(contents, new FragmentComparator());
@ -507,6 +510,7 @@ public class MergeContent extends BinFiles {
final String inputDescription = contents.size() < 10 ? contents.toString() : contents.size() + " FlowFiles";
getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
binSession.transfer(bundle, REL_MERGED);
binProcessingResult.getAttributes().put(MERGE_UUID_ATTRIBUTE, bundle.getAttribute(CoreAttributes.UUID.key()));
for (final FlowFile unmerged : merger.getUnmergedFlowFiles()) {
final FlowFile unmergedCopy = binSession.clone(unmerged);
@ -514,7 +518,8 @@ public class MergeContent extends BinFiles {
}
// We haven't committed anything, parent will take care of it
return false;
binProcessingResult.setCommitted(false);
return binProcessingResult;
}
private String getDefragmentValidationError(final List<FlowFile> binContents) {

View File

@ -86,6 +86,8 @@ public class TestMergeContent {
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 1);
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
assertEquals(runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0).getAttribute(CoreAttributes.UUID.key()),
runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).get(0).getAttribute(MergeContent.MERGE_UUID_ATTRIBUTE));
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assertEquals(1024 * 6, bundle.getSize());
@ -492,6 +494,9 @@ public class TestMergeContent {
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).stream().forEach(
ff -> assertEquals(bundle.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeContent.MERGE_UUID_ATTRIBUTE)));
}
@Test
@ -982,6 +987,9 @@ public class TestMergeContent {
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).stream().forEach(
ff -> assertEquals(assembled.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeContent.MERGE_UUID_ATTRIBUTE)));
}
@Test