diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileSessionWrapper.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileSessionWrapper.java new file mode 100644 index 0000000000..e2f17b3f6b --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileSessionWrapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; + +public class FlowFileSessionWrapper { + + private final FlowFile flowFile; + private final ProcessSession session; + + public FlowFileSessionWrapper(final FlowFile flowFile, final ProcessSession session) { + this.flowFile = flowFile; + this.session = session; + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public ProcessSession getSession() { + return session; + } + + @Override + public String toString() { + return flowFile.toString(); + } +} diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java new file mode 100644 index 0000000000..af8a8cd895 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.bin; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.FlowFileSessionWrapper; + +public class Bin { + + private final long creationMomentEpochNs; + private final long minimumSizeBytes; + private final long maximumSizeBytes; + + private volatile int minimumEntries = 0; + private volatile int maximumEntries = Integer.MAX_VALUE; + private final String fileCountAttribute; + + final List binContents = new ArrayList<>(); + long size; + int successiveFailedOfferings = 0; + + /** + * Constructs a new bin + * + * @param minSizeBytes min bytes + * @param maxSizeBytes max bytes + * @param minEntries min entries + * @param maxEntries max entries + * @param fileCountAttribute num files + * @throws IllegalArgumentException if the min is not less than or equal to the max. + */ + public Bin(final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) { + this.minimumSizeBytes = minSizeBytes; + this.maximumSizeBytes = maxSizeBytes; + this.minimumEntries = minEntries; + this.maximumEntries = maxEntries; + this.fileCountAttribute = fileCountAttribute; + + this.creationMomentEpochNs = System.nanoTime(); + if (minSizeBytes > maxSizeBytes) { + throw new IllegalArgumentException(); + } + } + + /** + * Indicates whether the bin has enough items to be considered full. This is based on whether the current size of the bin is greater than the minimum size in bytes and based on having a number of + * successive unsuccessful attempts to add a new item (because it is so close to the max or the size of the objects being attempted do not favor tight packing) + * + * @return true if considered full; false otherwise + */ + public boolean isFull() { + return (((size >= minimumSizeBytes) && binContents.size() >= minimumEntries) && (successiveFailedOfferings > 5)) + || (size >= maximumSizeBytes) || (binContents.size() >= maximumEntries); + } + + /** + * Indicates enough size exists to meet the minimum requirements + * + * @return true if full enough + */ + public boolean isFullEnough() { + return isFull() || (size >= minimumSizeBytes && (binContents.size() >= minimumEntries)); + } + + /** + * Determines if this bin is older than the time specified. + * + * @param duration duration + * @param unit unit + * @return true if this bin is older than the length of time given; false otherwise + */ + public boolean isOlderThan(final int duration, final TimeUnit unit) { + final long ageInNanos = System.nanoTime() - creationMomentEpochNs; + return ageInNanos > TimeUnit.NANOSECONDS.convert(duration, unit); + } + + /** + * Determines if this bin is older than the specified bin + * + * @param other other bin + * @return true if this is older than given bin + */ + public boolean isOlderThan(final Bin other) { + return creationMomentEpochNs < other.creationMomentEpochNs; + } + + /** + * If this bin has enough room for the size of the given flow file then it is added otherwise it is not + * + * @param flowFile flowfile to offer + * @param session the ProcessSession to which the FlowFile belongs + * @return true if added; false otherwise + */ + public boolean offer(final FlowFile flowFile, final ProcessSession session) { + if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents.size() >= maximumEntries)) { + successiveFailedOfferings++; + return false; + } + + if (fileCountAttribute != null) { + final String countValue = flowFile.getAttribute(fileCountAttribute); + final Integer count = toInteger(countValue); + if (count != null) { + this.maximumEntries = Math.min(count, this.maximumEntries); + this.minimumEntries = this.maximumEntries; + } + } + + size += flowFile.getSize(); + binContents.add(new FlowFileSessionWrapper(flowFile, session)); + successiveFailedOfferings = 0; + return true; + } + + private static final Pattern intPattern = Pattern.compile("\\d+"); + + public Integer toInteger(final String value) { + if (value == null) { + return null; + } + if (!intPattern.matcher(value).matches()) { + return null; + } + + try { + return Integer.parseInt(value); + } catch (final Exception e) { + return null; + } + } + + /** + * @return the underlying list of flow files within this bin + */ + public List getContents() { + return binContents; + } + + public long getBinAge() { + final long ageInNanos = System.nanoTime() - creationMomentEpochNs; + return TimeUnit.MILLISECONDS.convert(ageInNanos, TimeUnit.NANOSECONDS); + } +} diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java new file mode 100644 index 0000000000..62e5655a23 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.bin; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.FlowFileSessionWrapper; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * Base class for file-binning processors. + * + */ +public abstract class BinFiles extends AbstractSessionFactoryProcessor { + + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("Minimum Group Size") + .description("The minimum size of for the bundle") + .required(true) + .defaultValue("0 B") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("Maximum Group Size") + .description("The maximum size for the bundle. If not specified, there is no maximum.") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder() + .name("Minimum Number of Entries") + .description("The minimum number of files to include in a bundle") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder() + .name("Maximum Number of Entries") + .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder() + .name("Maximum number of Bins") + .description("Specifies the maximum number of bins that can be held in memory at any one time") + .defaultValue("100") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder() + .name("Max Bin Age") + .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is