mirror of https://github.com/apache/nifi.git
NIFI-1291: Adding BinFiles in nifi-processor-utilities, deprecating version in nifi-standard-processors. This closes #155.
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
c696cb09b6
commit
34bd2061f7
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processor.util;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
|
||||
public class FlowFileSessionWrapper {
|
||||
|
||||
private final FlowFile flowFile;
|
||||
private final ProcessSession session;
|
||||
|
||||
public FlowFileSessionWrapper(final FlowFile flowFile, final ProcessSession session) {
|
||||
this.flowFile = flowFile;
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public FlowFile getFlowFile() {
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
public ProcessSession getSession() {
|
||||
return session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return flowFile.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processor.util.bin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.FlowFileSessionWrapper;
|
||||
|
||||
public class Bin {
|
||||
|
||||
private final long creationMomentEpochNs;
|
||||
private final long minimumSizeBytes;
|
||||
private final long maximumSizeBytes;
|
||||
|
||||
private volatile int minimumEntries = 0;
|
||||
private volatile int maximumEntries = Integer.MAX_VALUE;
|
||||
private final String fileCountAttribute;
|
||||
|
||||
final List<FlowFileSessionWrapper> binContents = new ArrayList<>();
|
||||
long size;
|
||||
int successiveFailedOfferings = 0;
|
||||
|
||||
/**
|
||||
* Constructs a new bin
|
||||
*
|
||||
* @param minSizeBytes min bytes
|
||||
* @param maxSizeBytes max bytes
|
||||
* @param minEntries min entries
|
||||
* @param maxEntries max entries
|
||||
* @param fileCountAttribute num files
|
||||
* @throws IllegalArgumentException if the min is not less than or equal to the max.
|
||||
*/
|
||||
public Bin(final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) {
|
||||
this.minimumSizeBytes = minSizeBytes;
|
||||
this.maximumSizeBytes = maxSizeBytes;
|
||||
this.minimumEntries = minEntries;
|
||||
this.maximumEntries = maxEntries;
|
||||
this.fileCountAttribute = fileCountAttribute;
|
||||
|
||||
this.creationMomentEpochNs = System.nanoTime();
|
||||
if (minSizeBytes > maxSizeBytes) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether the bin has enough items to be considered full. This is based on whether the current size of the bin is greater than the minimum size in bytes and based on having a number of
|
||||
* successive unsuccessful attempts to add a new item (because it is so close to the max or the size of the objects being attempted do not favor tight packing)
|
||||
*
|
||||
* @return true if considered full; false otherwise
|
||||
*/
|
||||
public boolean isFull() {
|
||||
return (((size >= minimumSizeBytes) && binContents.size() >= minimumEntries) && (successiveFailedOfferings > 5))
|
||||
|| (size >= maximumSizeBytes) || (binContents.size() >= maximumEntries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates enough size exists to meet the minimum requirements
|
||||
*
|
||||
* @return true if full enough
|
||||
*/
|
||||
public boolean isFullEnough() {
|
||||
return isFull() || (size >= minimumSizeBytes && (binContents.size() >= minimumEntries));
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if this bin is older than the time specified.
|
||||
*
|
||||
* @param duration duration
|
||||
* @param unit unit
|
||||
* @return true if this bin is older than the length of time given; false otherwise
|
||||
*/
|
||||
public boolean isOlderThan(final int duration, final TimeUnit unit) {
|
||||
final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
|
||||
return ageInNanos > TimeUnit.NANOSECONDS.convert(duration, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if this bin is older than the specified bin
|
||||
*
|
||||
* @param other other bin
|
||||
* @return true if this is older than given bin
|
||||
*/
|
||||
public boolean isOlderThan(final Bin other) {
|
||||
return creationMomentEpochNs < other.creationMomentEpochNs;
|
||||
}
|
||||
|
||||
/**
|
||||
* If this bin has enough room for the size of the given flow file then it is added otherwise it is not
|
||||
*
|
||||
* @param flowFile flowfile to offer
|
||||
* @param session the ProcessSession to which the FlowFile belongs
|
||||
* @return true if added; false otherwise
|
||||
*/
|
||||
public boolean offer(final FlowFile flowFile, final ProcessSession session) {
|
||||
if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents.size() >= maximumEntries)) {
|
||||
successiveFailedOfferings++;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fileCountAttribute != null) {
|
||||
final String countValue = flowFile.getAttribute(fileCountAttribute);
|
||||
final Integer count = toInteger(countValue);
|
||||
if (count != null) {
|
||||
this.maximumEntries = Math.min(count, this.maximumEntries);
|
||||
this.minimumEntries = this.maximumEntries;
|
||||
}
|
||||
}
|
||||
|
||||
size += flowFile.getSize();
|
||||
binContents.add(new FlowFileSessionWrapper(flowFile, session));
|
||||
successiveFailedOfferings = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
private static final Pattern intPattern = Pattern.compile("\\d+");
|
||||
|
||||
public Integer toInteger(final String value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (!intPattern.matcher(value).matches()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return Integer.parseInt(value);
|
||||
} catch (final Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the underlying list of flow files within this bin
|
||||
*/
|
||||
public List<FlowFileSessionWrapper> getContents() {
|
||||
return binContents;
|
||||
}
|
||||
|
||||
public long getBinAge() {
|
||||
final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
|
||||
return TimeUnit.MILLISECONDS.convert(ageInNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,349 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processor.util.bin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.FlowFileSessionWrapper;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
/**
|
||||
* Base class for file-binning processors.
|
||||
*
|
||||
*/
|
||||
public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
||||
|
||||
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Minimum Group Size")
|
||||
.description("The minimum size of for the bundle")
|
||||
.required(true)
|
||||
.defaultValue("0 B")
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Maximum Group Size")
|
||||
.description("The maximum size for the bundle. If not specified, there is no maximum.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
|
||||
.name("Minimum Number of Entries")
|
||||
.description("The minimum number of files to include in a bundle")
|
||||
.required(true)
|
||||
.defaultValue("1")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
|
||||
.name("Maximum Number of Entries")
|
||||
.description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
|
||||
.name("Maximum number of Bins")
|
||||
.description("Specifies the maximum number of bins that can be held in memory at any one time")
|
||||
.defaultValue("100")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
|
||||
.name("Max Bin Age")
|
||||
.description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
|
||||
+ "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||
.name("original")
|
||||
.description("The FlowFiles that were used to create the bundle")
|
||||
.build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
|
||||
.build();
|
||||
|
||||
private final BinManager binManager = new BinManager();
|
||||
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
|
||||
|
||||
@OnStopped
|
||||
public final void resetState() {
|
||||
binManager.purge();
|
||||
|
||||
Bin bin;
|
||||
while ((bin = readyBins.poll()) != null) {
|
||||
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
|
||||
wrapper.getSession().rollback();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
|
||||
*
|
||||
* @param context context
|
||||
* @param session session
|
||||
* @param flowFile flowFile
|
||||
* @return The flow file, possibly altered
|
||||
*/
|
||||
protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
|
||||
|
||||
/**
|
||||
* Returns a group ID representing a bin. This allows flow files to be binned into like groups.
|
||||
*
|
||||
* @param context context
|
||||
* @param flowFile flowFile
|
||||
* @return The appropriate group ID
|
||||
*/
|
||||
protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
|
||||
|
||||
/**
|
||||
* Performs any additional setup of the bin manager. Called during the OnScheduled phase.
|
||||
*
|
||||
* @param binManager The bin manager
|
||||
* @param context context
|
||||
*/
|
||||
protected abstract void setUpBinManager(BinManager binManager, ProcessContext context);
|
||||
|
||||
/**
|
||||
* Processes a single bin. Implementing class is responsible for committing each session
|
||||
*
|
||||
* @param unmodifiableBin A reference to a single bin of flow file/session wrappers
|
||||
* @param binContents A copy of the contents of the bin
|
||||
* @param context The context
|
||||
* @param session The session that created the bin
|
||||
* @return Return true if the input bin was already committed. E.g., in case of a failure, the implementation may choose to transfer all binned files to Failure and commit their sessions. If
|
||||
* false, the processBins() method will transfer the files to Original and commit the sessions
|
||||
*
|
||||
* @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin will be transferred to failure and the ProcessSession provided by the 'session'
|
||||
* argument rolled back
|
||||
*/
|
||||
protected abstract boolean processBin(Bin unmodifiableBin, List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
|
||||
|
||||
/**
|
||||
* Allows additional custom validation to be done. This will be called from the parent's customValidation method.
|
||||
*
|
||||
* @param context The context
|
||||
* @return Validation results indicating problems
|
||||
*/
|
||||
protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
|
||||
final int flowFilesBinned = binFlowFiles(context, sessionFactory);
|
||||
getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
|
||||
|
||||
if (!isScheduled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final int binsMigrated = migrateBins(context);
|
||||
final int binsProcessed = processBins(context, sessionFactory);
|
||||
//If we accomplished nothing then let's yield
|
||||
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
||||
private int migrateBins(final ProcessContext context) {
|
||||
int added = 0;
|
||||
for (final Bin bin : binManager.removeReadyBins(true)) {
|
||||
this.readyBins.add(bin);
|
||||
added++;
|
||||
}
|
||||
|
||||
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
|
||||
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
|
||||
// bins. So we may as well expire it now.
|
||||
if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
|
||||
final Bin bin = binManager.removeOldestBin();
|
||||
if (bin != null) {
|
||||
added++;
|
||||
this.readyBins.add(bin);
|
||||
}
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
||||
private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
|
||||
final Bin bin = readyBins.poll();
|
||||
if (bin == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
final List<Bin> bins = new ArrayList<>();
|
||||
bins.add(bin);
|
||||
|
||||
final ProcessorLog logger = getLogger();
|
||||
final ProcessSession session = sessionFactory.createSession();
|
||||
|
||||
final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents());
|
||||
|
||||
boolean binAlreadyCommitted = false;
|
||||
try {
|
||||
binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
|
||||
} catch (final ProcessException e) {
|
||||
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
|
||||
|
||||
for (final FlowFileSessionWrapper wrapper : binCopy) {
|
||||
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
|
||||
wrapper.getSession().commit();
|
||||
}
|
||||
session.rollback();
|
||||
return 1;
|
||||
}
|
||||
|
||||
// we first commit the bundle's session before the originals' sessions because if we are restarted or crash
|
||||
// between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
|
||||
// across multiple sessions, we cannot guarantee atomicity across the sessions
|
||||
session.commit();
|
||||
// If this bin's session has been committed, move on.
|
||||
if (!binAlreadyCommitted) {
|
||||
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
|
||||
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
|
||||
wrapper.getSession().commit();
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
|
||||
int flowFilesBinned = 0;
|
||||
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
|
||||
if (!isScheduled()) {
|
||||
break;
|
||||
}
|
||||
|
||||
final ProcessSession session = sessionFactory.createSession();
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
flowFile = this.preprocessFlowFile(context, session, flowFile);
|
||||
|
||||
String groupId = this.getGroupId(context, flowFile);
|
||||
|
||||
final boolean binned = binManager.offer(groupId, flowFile, session);
|
||||
|
||||
// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
|
||||
if (!binned) {
|
||||
Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
|
||||
bin.offer(flowFile, session);
|
||||
this.readyBins.add(bin);
|
||||
}
|
||||
|
||||
flowFilesBinned++;
|
||||
}
|
||||
|
||||
return flowFilesBinned;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public final void onScheduled(final ProcessContext context) throws IOException {
|
||||
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
|
||||
|
||||
if (context.getProperty(MAX_BIN_AGE).isSet()) {
|
||||
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
|
||||
} else {
|
||||
binManager.setMaxBinAge(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
if (context.getProperty(MAX_SIZE).isSet()) {
|
||||
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
|
||||
} else {
|
||||
binManager.setMaximumSize(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
|
||||
|
||||
if (context.getProperty(MAX_ENTRIES).isSet()) {
|
||||
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
|
||||
} else {
|
||||
binManager.setMaximumEntries(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
this.setUpBinManager(binManager, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
|
||||
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
|
||||
|
||||
final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
|
||||
final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
|
||||
|
||||
if (maxBytes != null && maxBytes.longValue() < minBytes) {
|
||||
problems.add(
|
||||
new ValidationResult.Builder()
|
||||
.subject(MIN_SIZE.getName())
|
||||
.input(context.getProperty(MIN_SIZE).getValue())
|
||||
.valid(false)
|
||||
.explanation("Min Size must be less than or equal to Max Size")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
final Long min = context.getProperty(MIN_ENTRIES).asLong();
|
||||
final Long max = context.getProperty(MAX_ENTRIES).asLong();
|
||||
|
||||
if (min != null && max != null) {
|
||||
if (min > max) {
|
||||
problems.add(
|
||||
new ValidationResult.Builder().subject(MIN_ENTRIES.getName())
|
||||
.input(context.getProperty(MIN_ENTRIES).getValue())
|
||||
.valid(false)
|
||||
.explanation("Min Entries must be less than or equal to Max Entries")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
|
||||
if (otherProblems != null) {
|
||||
problems.addAll(otherProblems);
|
||||
}
|
||||
|
||||
return problems;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,236 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processor.util.bin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.FlowFileSessionWrapper;
|
||||
|
||||
/**
|
||||
* This class is thread safe
|
||||
*
|
||||
*/
|
||||
public class BinManager {
|
||||
|
||||
private final AtomicLong minSizeBytes = new AtomicLong(0L);
|
||||
private final AtomicLong maxSizeBytes = new AtomicLong(Long.MAX_VALUE);
|
||||
private final AtomicInteger minEntries = new AtomicInteger(0);
|
||||
private final AtomicInteger maxEntries = new AtomicInteger(Integer.MAX_VALUE);
|
||||
private final AtomicReference<String> fileCountAttribute = new AtomicReference<>(null);
|
||||
|
||||
private final AtomicInteger maxBinAgeSeconds = new AtomicInteger(Integer.MAX_VALUE);
|
||||
private final Map<String, List<Bin>> groupBinMap = new HashMap<>();
|
||||
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||
private final Lock rLock = rwLock.readLock();
|
||||
private final Lock wLock = rwLock.writeLock();
|
||||
|
||||
private int binCount = 0; // guarded by read/write lock
|
||||
|
||||
public BinManager() {
|
||||
}
|
||||
|
||||
public void purge() {
|
||||
wLock.lock();
|
||||
try {
|
||||
for (final List<Bin> binList : groupBinMap.values()) {
|
||||
for (final Bin bin : binList) {
|
||||
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
|
||||
wrapper.getSession().rollback();
|
||||
}
|
||||
}
|
||||
}
|
||||
groupBinMap.clear();
|
||||
binCount = 0;
|
||||
} finally {
|
||||
wLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void setFileCountAttribute(final String fileCountAttribute) {
|
||||
this.fileCountAttribute.set(fileCountAttribute);
|
||||
}
|
||||
|
||||
public void setMinimumEntries(final int minimumEntries) {
|
||||
this.minEntries.set(minimumEntries);
|
||||
}
|
||||
|
||||
public void setMaximumEntries(final int maximumEntries) {
|
||||
this.maxEntries.set(maximumEntries);
|
||||
}
|
||||
|
||||
public int getBinCount() {
|
||||
rLock.lock();
|
||||
try {
|
||||
return binCount;
|
||||
} finally {
|
||||
rLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void setMinimumSize(final long numBytes) {
|
||||
minSizeBytes.set(numBytes);
|
||||
}
|
||||
|
||||
public void setMaximumSize(final long numBytes) {
|
||||
maxSizeBytes.set(numBytes);
|
||||
}
|
||||
|
||||
public void setMaxBinAge(final int seconds) {
|
||||
maxBinAgeSeconds.set(seconds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
|
||||
* <p/>
|
||||
* @param groupIdentifier the group to which the flow file belongs; can be null
|
||||
* @param flowFile the flow file to bin
|
||||
* @param session the ProcessSession to which the FlowFile belongs
|
||||
* @return true if added; false if no bin exists which can fit this item and no bin can be created based on current min/max criteria
|
||||
*/
|
||||
public boolean offer(final String groupIdentifier, final FlowFile flowFile, final ProcessSession session) {
|
||||
final long currentMaxSizeBytes = maxSizeBytes.get();
|
||||
if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing)
|
||||
return false;
|
||||
}
|
||||
wLock.lock();
|
||||
try {
|
||||
final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
|
||||
if (currentBins == null) { // this is a new group we need to register
|
||||
final List<Bin> bins = new ArrayList<>();
|
||||
final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get());
|
||||
bins.add(bin);
|
||||
groupBinMap.put(groupIdentifier, bins);
|
||||
binCount++;
|
||||
return bin.offer(flowFile, session);
|
||||
} else {
|
||||
for (final Bin bin : currentBins) {
|
||||
final boolean accepted = bin.offer(flowFile, session);
|
||||
if (accepted) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
//if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
|
||||
final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get());
|
||||
currentBins.add(bin);
|
||||
binCount++;
|
||||
return bin.offer(flowFile, session);
|
||||
}
|
||||
} finally {
|
||||
wLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all bins that are considered full and removes them from the manager.
|
||||
* <p/>
|
||||
* @param relaxFullnessConstraint if false will require bins to be full before considered ready; if true bins only have to meet their minimum size criteria or be 'old' and then they'll be
|
||||
* considered ready
|
||||
* @return bins that are considered full
|
||||
*/
|
||||
public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
|
||||
final Map<String, List<Bin>> newGroupMap = new HashMap<>();
|
||||
final List<Bin> readyBins = new ArrayList<>();
|
||||
|
||||
wLock.lock();
|
||||
try {
|
||||
for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) {
|
||||
final List<Bin> remainingBins = new ArrayList<>();
|
||||
for (final Bin bin : group.getValue()) {
|
||||
if (relaxFullnessConstraint && (bin.isFullEnough() || bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check
|
||||
readyBins.add(bin);
|
||||
} else if (!relaxFullnessConstraint && bin.isFull()) { //strict check
|
||||
readyBins.add(bin);
|
||||
} else { //it isn't time yet...
|
||||
remainingBins.add(bin);
|
||||
}
|
||||
}
|
||||
if (!remainingBins.isEmpty()) {
|
||||
newGroupMap.put(group.getKey(), remainingBins);
|
||||
}
|
||||
}
|
||||
groupBinMap.clear();
|
||||
groupBinMap.putAll(newGroupMap);
|
||||
binCount -= readyBins.size();
|
||||
} finally {
|
||||
wLock.unlock();
|
||||
}
|
||||
return readyBins;
|
||||
}
|
||||
|
||||
public Bin removeOldestBin() {
|
||||
wLock.lock();
|
||||
try {
|
||||
Bin oldestBin = null;
|
||||
String oldestBinGroup = null;
|
||||
|
||||
for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) {
|
||||
for (final Bin bin : group.getValue()) {
|
||||
if (oldestBin == null || bin.isOlderThan(oldestBin)) {
|
||||
oldestBin = bin;
|
||||
oldestBinGroup = group.getKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (oldestBin == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
binCount--;
|
||||
final List<Bin> bins = groupBinMap.get(oldestBinGroup);
|
||||
bins.remove(oldestBin);
|
||||
if (bins.isEmpty()) {
|
||||
groupBinMap.remove(oldestBinGroup);
|
||||
}
|
||||
return oldestBin;
|
||||
} finally {
|
||||
wLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if any current bins are older than the allowable max
|
||||
*/
|
||||
public boolean containsOldBins() {
|
||||
rLock.lock();
|
||||
try {
|
||||
for (final List<Bin> bins : groupBinMap.values()) {
|
||||
for (final Bin bin : bins) {
|
||||
if (bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rLock.unlock();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -46,7 +46,10 @@ import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
|
|||
/**
|
||||
* Base class for file-binning processors, including MergeContent.
|
||||
*
|
||||
* @deprecated As of release 0.5.0, replaced by
|
||||
* {@link org.apache.nifi.processor.util.bin.BinFiles}
|
||||
*/
|
||||
@Deprecated
|
||||
public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
||||
|
||||
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -73,9 +73,10 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.Bin;
|
||||
import org.apache.nifi.processors.standard.util.BinManager;
|
||||
import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
|
||||
import org.apache.nifi.processor.util.bin.Bin;
|
||||
import org.apache.nifi.processor.util.bin.BinFiles;
|
||||
import org.apache.nifi.processor.util.bin.BinManager;
|
||||
import org.apache.nifi.processor.util.FlowFileSessionWrapper;
|
||||
import org.apache.nifi.stream.io.BufferedInputStream;
|
||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||
import org.apache.nifi.stream.io.NonCloseableOutputStream;
|
||||
|
|
|
@ -24,6 +24,11 @@ import java.util.regex.Pattern;
|
|||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
|
||||
/**
|
||||
* @deprecated As of release 0.5.0, replaced by
|
||||
* {@link org.apache.nifi.processor.util.bin.Bin}
|
||||
*/
|
||||
@Deprecated
|
||||
public class Bin {
|
||||
|
||||
private final long creationMomentEpochNs;
|
||||
|
|
|
@ -34,7 +34,10 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
/**
|
||||
* This class is thread safe
|
||||
*
|
||||
* @deprecated As of release 0.5.0, replaced by
|
||||
* {@link org.apache.nifi.processor.util.bin.BinManager}
|
||||
*/
|
||||
@Deprecated
|
||||
public class BinManager {
|
||||
|
||||
private final AtomicLong minSizeBytes = new AtomicLong(0L);
|
||||
|
|
|
@ -19,6 +19,11 @@ package org.apache.nifi.processors.standard.util;
|
|||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
|
||||
/**
|
||||
* @deprecated As of release 0.5.0, replaced by
|
||||
* {@link org.apache.nifi.processor.util.FlowFileSessionWrapper}
|
||||
*/
|
||||
@Deprecated
|
||||
public class FlowFileSessionWrapper {
|
||||
|
||||
private final FlowFile flowFile;
|
||||
|
|
Loading…
Reference in New Issue