From 98afcce0dc3672fe04e2130ac72b9a201e17ba1a Mon Sep 17 00:00:00 2001 From: gresockj Date: Tue, 27 Jan 2015 17:32:38 -0500 Subject: [PATCH] NIFI-305: Refactoring superclass BinFiles from MergeContent --- .../nifi/processors/standard/BinFiles.java | 388 +++++++++++++++++ .../processors/standard/MergeContent.java | 402 ++++-------------- 2 files changed, 474 insertions(+), 316 deletions(-) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java new file mode 100644 index 0000000000..7846c7d629 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.Bin; +import org.apache.nifi.processors.standard.util.BinManager; +import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper; + +/** + * Base class for MergeContent. + * + */ +public abstract class BinFiles extends AbstractSessionFactoryProcessor { + + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("Minimum Group Size") + .description("The minimum size of for the bundle") + .required(true) + .defaultValue("0 B") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("Maximum Group Size") + .description("The maximum size for the bundle. If not specified, there is no maximum.") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder() + .name("Minimum Number of Entries") + .description("The minimum number of files to include in a bundle") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder() + .name("Maximum Number of Entries") + .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder() + .name("Maximum number of Bins") + .description("Specifies the maximum number of bins that can be held in memory at any one time") + .defaultValue("100") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder() + .name("Max Bin Age") + .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is