Merge branch 'NIFI-305' of https://github.com/gresockj/incubator-nifi into develop

This commit is contained in:
Mark Payne 2015-02-02 13:23:36 -05:00
commit c01dff5922
2 changed files with 491 additions and 316 deletions

View File

@ -0,0 +1,405 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.Bin;
import org.apache.nifi.processors.standard.util.BinManager;
import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
/**
* Base class for file-binning processors, including MergeContent.
*
*/
public abstract class BinFiles extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
.name("Minimum Group Size")
.description("The minimum size of for the bundle")
.required(true)
.defaultValue("0 B")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Group Size")
.description("The maximum size for the bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
.name("Minimum Number of Entries")
.description("The minimum number of files to include in a bundle")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Number of Entries")
.description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
.name("Maximum number of Bins")
.description("Specifies the maximum number of bins that can be held in memory at any one time")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
.name("Max Bin Age")
.description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private final BinManager binManager = new BinManager();
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
@Override
protected final void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
Set<Relationship> additionalRelationships = defineAdditionalRelationships();
if (additionalRelationships != null) {
relationships.addAll(additionalRelationships);
}
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(MIN_ENTRIES);
descriptors.add(MAX_ENTRIES);
descriptors.add(MIN_SIZE);
descriptors.add(MAX_SIZE);
descriptors.add(MAX_BIN_AGE);
descriptors.add(MAX_BIN_COUNT);
List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors();
if (additionalPropertyDescriptors != null) {
descriptors.addAll(additionalPropertyDescriptors);
}
this.descriptors = Collections.unmodifiableList(descriptors);
}
@OnStopped
public final void resetState() {
binManager.purge();
Bin bin;
while ((bin = readyBins.poll()) != null) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().rollback();
}
}
}
@Override
public final Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
/**
* Allows any additional relationships to be defined.
* @return Relationships to be added in the init() method
*/
protected abstract Set<Relationship> defineAdditionalRelationships();
/**
* Allows any additional property descriptors to be defined.
* @return Properties to be added in the init() method
*/
protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors();
/**
* Allows general pre-processing of a flow file before it is offered to a
* bin. This is called before getGroupId().
*
* @param context
* @param session
* @param flowFile
* @return The flow file, possibly altered
*/
protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
/**
* Returns a group ID representing a bin. This allows flow files to be
* binned into like groups.
* @param context
* @param flowFile
* @return The appropriate group ID
*/
protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
/**
* Performs any additional setup of the bin manager. Called during the
* OnScheduled phase.
* @param binManager The bin manager
* @param context
*/
protected abstract void setUpBinManager(BinManager binManager, ProcessContext context);
/**
* Processes a single bin. Implementing class is responsible for committing
* each session
*
* @param unmodifiableBin
* A reference to a single bin of flow file/session wrappers
* @param binContents
* A copy of the contents of the bin
* @param context
* The context
* @param session
* The session that created the bin
* @param logger
* The logger
* @return Return true if the input bin was already committed. E.g., in case of a
* failure, the implementation may choose to transfer all binned files
* to Failure and commit their sessions. If false, the
* processBins() method will transfer the files to Original and commit
* the sessions
* @throws Exception
* This will be handled appropriately, and all flow files in the
* bin will be transferred to failure and the session rolled
* back
*/
protected abstract boolean processBin(Bin unmodifiableBin,
List<FlowFileSessionWrapper> binContents, ProcessContext context,
ProcessSession session, ProcessorLog logger) throws Exception;
/**
* Allows additional custom validation to be done. This will be called from
* the parent's customValidation method.
*
* @param context
* The context
* @return Validation results indicating problems
*/
protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) {
return new ArrayList<ValidationResult>();
}
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
int binsAdded = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
if (!isScheduled()) {
return;
}
binsAdded += migrateBins(context);
final int binsProcessed = processBins(context, sessionFactory);
if (binsProcessed == 0 && binsAdded == 0) {
context.yield();
}
}
private int migrateBins(final ProcessContext context) {
int added = 0;
for (final Bin bin : binManager.removeReadyBins(true)) {
this.readyBins.add(bin);
added++;
}
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now.
if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
this.readyBins.add(bin);
}
}
return added;
}
private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final Bin bin = readyBins.poll();
if (bin == null) {
return 0;
}
final List<Bin> bins = new ArrayList<>();
bins.add(bin);
final ProcessorLog logger = getLogger();
final ProcessSession session = sessionFactory.createSession();
final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents());
boolean binAlreadyCommitted = false;
try {
binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger);
} catch (final Exception e) {
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
}
session.rollback();
return 1;
}
// we first commit the bundle's session before the originals' sessions because if we are restarted or crash
// between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
// across multiple sessions, we cannot guarantee atomicity across the sessions
session.commit();
// If this bin's session has been committed, move on.
if ( !binAlreadyCommitted ) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
wrapper.getSession().commit();
}
}
return 1;
}
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int binsAdded = 0;
while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
if (!isScheduled()) {
return binsAdded;
}
final ProcessSession session = sessionFactory.createSession();
FlowFile flowFile = session.get();
if (flowFile == null) {
return binsAdded;
}
flowFile = this.preprocessFlowFile(context, session, flowFile);
String groupId = this.getGroupId(context, flowFile);
final boolean binned = binManager.offer(groupId, flowFile, session);
// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
if (!binned) {
Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
bin.offer(flowFile, session);
this.readyBins.add(bin);
}
binsAdded++;
}
return binsAdded;
}
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
if (context.getProperty(MAX_BIN_AGE).isSet() ) {
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
} else {
binManager.setMaxBinAge(Integer.MAX_VALUE);
}
if ( context.getProperty(MAX_SIZE).isSet() ) {
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
} else {
binManager.setMaximumSize(Long.MAX_VALUE);
}
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
if ( context.getProperty(MAX_ENTRIES).isSet() ) {
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
} else {
binManager.setMaximumEntries(Integer.MAX_VALUE);
}
this.setUpBinManager(binManager, context);
}
@Override
protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
if (maxBytes != null && maxBytes.longValue() < minBytes) {
problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
}
final Long min = context.getProperty(MIN_ENTRIES).asLong();
final Long max = context.getProperty(MAX_ENTRIES).asLong();
if (min != null && max != null) {
if (min > max) {
problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
}
}
Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
if (otherProblems != null) {
problems.addAll(otherProblems);
}
return problems;
}
}

View File

@ -23,7 +23,6 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
@ -31,59 +30,47 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.Bin; import org.apache.nifi.processors.standard.util.Bin;
import org.apache.nifi.processors.standard.util.BinManager; import org.apache.nifi.processors.standard.util.BinManager;
import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper; import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFilePackager; import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFilePackagerV1; import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2; import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3; import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ObjectHolder;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
@SideEffectFree @SideEffectFree
@TriggerWhenEmpty @TriggerWhenEmpty
@Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"}) @Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.") @CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
public class MergeContent extends AbstractSessionFactoryProcessor { public class MergeContent extends BinFiles {
// preferred attributes // preferred attributes
public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier"; public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
@ -207,108 +194,31 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
.defaultValue("false") .defaultValue("false")
.build(); .build();
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
.name("Minimum Group Size")
.description("The minimum size of for the bundle")
.required(true)
.defaultValue("0 B")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Group Size")
.description("The maximum size for the bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
.name("Minimum Number of Entries")
.description("The minimum number of files to include in a bundle")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Number of Entries")
.description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
.name("Maximum number of Bins")
.description("Specifies the maximum number of bins that can be held in memory at any one time")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
.name("Max Bin Age")
.description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged content").build(); public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged content").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
private Set<Relationship> relationships; @Override
private List<PropertyDescriptor> descriptors; protected Set<Relationship> defineAdditionalRelationships() {
private final BinManager binManager = new BinManager(); final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_MERGED);
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>(); return relationships;
}
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_MERGED);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> descriptors = new ArrayList<>(); final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(MERGE_STRATEGY); descriptors.add(MERGE_STRATEGY);
descriptors.add(MERGE_FORMAT); descriptors.add(MERGE_FORMAT);
descriptors.add(ATTRIBUTE_STRATEGY); descriptors.add(ATTRIBUTE_STRATEGY);
descriptors.add(CORRELATION_ATTRIBUTE_NAME); descriptors.add(CORRELATION_ATTRIBUTE_NAME);
descriptors.add(MIN_ENTRIES);
descriptors.add(MAX_ENTRIES);
descriptors.add(MIN_SIZE);
descriptors.add(MAX_SIZE);
descriptors.add(MAX_BIN_AGE);
descriptors.add(MAX_BIN_COUNT);
descriptors.add(HEADER); descriptors.add(HEADER);
descriptors.add(FOOTER); descriptors.add(FOOTER);
descriptors.add(DEMARCATOR); descriptors.add(DEMARCATOR);
descriptors.add(COMPRESSION_LEVEL); descriptors.add(COMPRESSION_LEVEL);
descriptors.add(KEEP_PATH); descriptors.add(KEEP_PATH);
this.descriptors = Collections.unmodifiableList(descriptors);
}
@OnStopped
public void resetState() {
binManager.purge();
Bin bin;
while ((bin = readyBins.poll()) != null) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().rollback();
}
}
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors; return descriptors;
} }
@ -318,49 +228,48 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { protected FlowFile preprocessFlowFile(ProcessContext context,
int binsAdded = binFlowFiles(context, sessionFactory); ProcessSession session, FlowFile flowFile) {
getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
if (!isScheduled()) { // handle backward compatibility with old segment attributes
return; if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
}
if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
}
if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
} }
binsAdded += migrateBins(context); return flowFile;
}
final int binsProcessed = processBins(context, sessionFactory); @Override
if (binsProcessed == 0 && binsAdded == 0) { protected String getGroupId(ProcessContext context, FlowFile flowFile) {
context.yield();
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
}
return groupId;
}
@Override
protected void setUpBinManager(BinManager binManager, ProcessContext context) {
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
} }
} }
@Override
private int migrateBins(final ProcessContext context) { protected boolean processBin(Bin unmodifiableBin,
int added = 0; List<FlowFileSessionWrapper> binCopy, ProcessContext context,
for (final Bin bin : binManager.removeReadyBins(true)) { ProcessSession session, ProcessorLog logger) throws Exception {
this.readyBins.add(bin);
added++;
}
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now.
if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
this.readyBins.add(bin);
}
}
return added;
}
private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final Bin bin = readyBins.poll();
if (bin == null) {
return 0;
}
final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue(); final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
MergeBin merger; MergeBin merger;
@ -398,37 +307,25 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
break; break;
} }
final List<Bin> bins = new ArrayList<>();
bins.add(bin);
final ProcessorLog logger = getLogger();
final ProcessSession session = sessionFactory.createSession();
final Set<Bin> committedBins = new HashSet<>();
for (final Bin unmodifiableBin : bins) {
final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(unmodifiableBin.getContents());
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
final String error = getDefragmentValidationError(binCopy); final String error = getDefragmentValidationError(binCopy);
// Fail the flow files and commit them
if (error != null) { if (error != null) {
final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles"; final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
logger.error(error + "; routing {} to failure", new Object[]{binDescription}); logger.error(error + "; routing {} to failure", new Object[]{binDescription});
for ( final FlowFileSessionWrapper wrapper : binCopy ) { for ( final FlowFileSessionWrapper wrapper : binCopy ) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE); wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit(); wrapper.getSession().commit();
committedBins.add(unmodifiableBin);
} }
continue; return true;
} }
Collections.sort(binCopy, new FragmentComparator()); Collections.sort(binCopy, new FragmentComparator());
} }
FlowFile bundle = null; FlowFile bundle = merger.merge(context, session, binCopy);
try {
bundle = merger.merge(context, session, binCopy);
// keep the filename, as it is added to the bundle. // keep the filename, as it is added to the bundle.
final String filename = bundle.getAttribute(CoreAttributes.FILENAME.key()); final String filename = bundle.getAttribute(CoreAttributes.FILENAME.key());
@ -439,89 +336,16 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
// restore the filename of the bundle // restore the filename of the bundle
bundleAttributes.put(CoreAttributes.FILENAME.key(), filename); bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(binCopy.size())); bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(binCopy.size()));
bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(bin.getBinAge())); bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(unmodifiableBin.getBinAge()));
bundle = session.putAllAttributes(bundle, bundleAttributes); bundle = session.putAllAttributes(bundle, bundleAttributes);
final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles"; final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
logger.info("Merged {} into {}", new Object[]{inputDescription, bundle}); logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
} catch (final Exception e) {
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
}
session.rollback();
return 1;
}
session.transfer(bundle, REL_MERGED); session.transfer(bundle, REL_MERGED);
}
// we first commit the bundle's session before the originals' sessions because if we are restarted or crash // We haven't committed anything, parent will take care of it
// between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability return false;
// across multiple sessions, we cannot guarantee atomicity across the sessions
session.commit();
for (final Bin unmodifiableBin : bins) {
// If this bin's session has been committed, move on.
if ( committedBins.contains(unmodifiableBin) ) {
continue;
}
for (final FlowFileSessionWrapper wrapper : unmodifiableBin.getContents()) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
wrapper.getSession().commit();
}
}
return 1;
}
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int binsAdded = 0;
while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
if (!isScheduled()) {
return binsAdded;
}
final ProcessSession session = sessionFactory.createSession();
FlowFile flowFile = session.get();
if (flowFile == null) {
return binsAdded;
}
// handle backward compatibility with old segment attributes
if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
}
if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
}
if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
}
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
}
final boolean binned = binManager.offer(groupId, flowFile, session);
// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
if (!binned) {
Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
bin.offer(flowFile, session);
this.readyBins.add(bin);
}
binsAdded++;
}
return binsAdded;
} }
private String getDefragmentValidationError(final List<FlowFileSessionWrapper> bin) { private String getDefragmentValidationError(final List<FlowFileSessionWrapper> bin) {
@ -578,60 +402,6 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
return NUMBER_PATTERN.matcher(value).matches(); return NUMBER_PATTERN.matcher(value).matches();
} }
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
if (context.getProperty(MAX_BIN_AGE).isSet() ) {
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
} else {
binManager.setMaxBinAge(Integer.MAX_VALUE);
}
if ( context.getProperty(MAX_SIZE).isSet() ) {
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
} else {
binManager.setMaximumSize(Long.MAX_VALUE);
}
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
} else {
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
if ( context.getProperty(MAX_ENTRIES).isSet() ) {
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
} else {
binManager.setMaximumEntries(Integer.MAX_VALUE);
}
}
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
if (maxBytes != null && maxBytes.longValue() < minBytes) {
problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
}
final Long min = context.getProperty(MIN_ENTRIES).asLong();
final Long max = context.getProperty(MAX_ENTRIES).asLong();
if (min != null && max != null) {
if (min > max) {
problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
}
}
return problems;
}
private class BinaryConcatenationMerge implements MergeBin { private class BinaryConcatenationMerge implements MergeBin {
private String mimeType = "application/octet-stream"; private String mimeType = "application/octet-stream";