NIFI-1157 searched for and resolved all remaining references to deprecated items that were clearly addressable.

This commit is contained in:
joewitt 2016-07-14 00:51:04 -04:00 committed by Mark Payne
parent 961be21a38
commit f987b21609
87 changed files with 129 additions and 1844 deletions

View File

@ -20,7 +20,6 @@ package org.apache.nifi.provenance;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A Provenance Event that is used to replace another Provenance Event when authorizations

View File

@ -104,7 +104,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
sourceQueueIdentifier = builder.sourceQueueIdentifier;
}
public String getStorageFilename() {

View File

@ -21,14 +21,12 @@ import java.net.URI;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
@ -710,54 +708,4 @@ public class StandardValidators {
}
}
/**
* Creates a validator based on existence of a {@link ControllerService}.
*
* @param serviceClass the controller service API your
* {@link ConfigurableComponent} depends on
* @return a Validator
* @deprecated As of release 0.1.0-incubating, replaced by
* {@link org.apache.nifi.components.PropertyDescriptor.Builder#identifiesControllerService(Class)}
*/
@Deprecated
public static Validator createControllerServiceExistsValidator(final Class<? extends ControllerService> serviceClass) {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final ControllerService svc = context.getControllerServiceLookup().getControllerService(input);
if (svc == null) {
return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("No Controller Service exists with this ID").build();
}
if (!serviceClass.isAssignableFrom(svc.getClass())) {
return new ValidationResult.Builder()
.valid(false)
.input(input)
.subject(subject)
.explanation("Controller Service with this ID is of type " + svc.getClass().getName() + " but is expected to be of type " + serviceClass.getName())
.build();
}
final ValidationContext serviceValidationContext = context.getControllerServiceValidationContext(svc);
final Collection<ValidationResult> serviceValidationResults = svc.validate(serviceValidationContext);
for (final ValidationResult result : serviceValidationResults) {
if (!result.isValid()) {
return new ValidationResult.Builder()
.valid(false)
.input(input)
.subject(subject)
.explanation("Controller Service " + input + " is not valid: " + result.getExplanation())
.build();
}
}
return new ValidationResult.Builder().input(input).subject(subject).valid(true).build();
}
};
}
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @deprecated As of release 1.0.1. Please use {@link AtomicBoolean}
*
* Wraps an Boolean value so that it can be declared <code>final</code> and still be accessed from inner classes;
* the functionality is similar to that of an AtomicBoolean, but operations on this class
* are not atomic. This results in greater performance when the atomicity is not needed.
*
*/
@Deprecated
public class BooleanHolder extends ObjectHolder<Boolean> {
public BooleanHolder(final boolean initialValue) {
super(initialValue);
}
}

View File

@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @deprecated As of release 1.0.1. Please use {@link AtomicInteger}
*
* Wraps an Integer value so that it can be declared <code>final</code> and still be accessed from inner classes;
* the functionality is similar to that of an AtomicInteger, but operations on this class
* are not atomic. This results in greater performance when the atomicity is not needed.
*
*/
@Deprecated
public class IntegerHolder extends ObjectHolder<Integer> {
public IntegerHolder(final int initialValue) {
super(initialValue);
}
public int addAndGet(final int delta) {
final int curValue = get();
final int newValue = curValue + delta;
set(newValue);
return newValue;
}
public int getAndAdd(final int delta) {
final int curValue = get();
final int newValue = curValue + delta;
set(newValue);
return curValue;
}
public int incrementAndGet() {
return addAndGet(1);
}
public int getAndIncrement() {
return getAndAdd(1);
}
public int decrementAndGet() {
return addAndGet(-1);
}
}

View File

@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.util.concurrent.atomic.AtomicLong;
/**
* @deprecated As of release 1.0.1. Please use {@link AtomicLong}
*
* Wraps a Long value so that it can be declared <code>final</code> and still be accessed from inner classes;
* the functionality is similar to that of an AtomicLong, but operations on this class
* are not atomic. This results in greater performance when the atomicity is not needed.
*/
@Deprecated
public class LongHolder extends ObjectHolder<Long> {
public LongHolder(final long initialValue) {
super(initialValue);
}
public long addAndGet(final long delta) {
final long curValue = get();
final long newValue = curValue + delta;
set(newValue);
return newValue;
}
public long getAndAdd(final long delta) {
final long curValue = get();
final long newValue = curValue + delta;
set(newValue);
return curValue;
}
public long incrementAndGet() {
return addAndGet(1);
}
public long getAndIncrement() {
return getAndAdd(1);
}
public long decrementAndGet() {
return addAndGet(-1L);
}
public long getAndDecrement() {
return getAndAdd(-1L);
}
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.util.concurrent.atomic.AtomicReference;
/**
* @deprecated As of release 0.7.0. Please use {@link AtomicReference}
*
* A bean that holds a single value of type T.
*
*/
@Deprecated
public class ObjectHolder<T> {
private T value;
public ObjectHolder(final T initialValue) {
this.value = initialValue;
}
public T get() {
return value;
}
public void set(T value) {
this.value = value;
}
}

View File

@ -28,7 +28,6 @@ import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

View File

@ -65,7 +65,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.junit.Assert;
@ -459,15 +458,6 @@ public class StandardProcessorTestRunner implements TestRunner {
return flowFiles;
}
/**
* @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted.
*/
@Override
@Deprecated
public ProvenanceReporter getProvenanceReporter() {
return sharedState.getProvenanceReporter();
}
@Override
public QueueSize getQueueSize() {
return flowFileQueue.size();
@ -584,13 +574,6 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException {
// hold off on failing due to deprecated annotation for now... will introduce later.
// for ( final Method method : service.getClass().getMethods() ) {
// if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) {
// Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method);
// }
// }
final MockComponentLog logger = new MockComponentLog(identifier, service);
controllerServiceLoggers.put(identifier, logger);
final MockStateManager serviceStateManager = new MockStateManager(service);

View File

@ -33,7 +33,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
@ -485,12 +484,6 @@ public interface TestRunner {
*/
List<MockFlowFile> getPenalizedFlowFiles();
/**
* @return the {@link ProvenanceReporter} that will be used by the
* configured {@link Processor} for reporting Provenance Events
*/
ProvenanceReporter getProvenanceReporter();
/**
* @return the current size of the Processor's Input Queue
*/

View File

@ -110,26 +110,6 @@ public class TestStandardProcessorTestRunner {
runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY);
}
@org.apache.nifi.annotation.documentation.Tags({"deprecated"})
private static class NewAnnotation extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
private static class NewMethodAnnotation extends AbstractProcessor {
@org.apache.nifi.annotation.lifecycle.OnScheduled
public void dummy() {
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
private static class ProcessorWithOnStop extends AbstractProcessor {
private int callsWithContext = 0;

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
@ -57,7 +58,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
@SideEffectFree
@SupportsBatching
@ -231,7 +231,7 @@ public class SplitAvro extends AbstractProcessor {
@Override
public List<FlowFile> split(final ProcessSession session, final FlowFile originalFlowFile, final SplitWriter splitWriter) {
final List<FlowFile> childFlowFiles = new ArrayList<>();
final ObjectHolder<GenericRecord> recordHolder = new ObjectHolder<>(null);
final AtomicReference<GenericRecord> recordHolder = new AtomicReference<>(null);
session.read(originalFlowFile, new InputStreamCallback() {
@Override
@ -239,7 +239,7 @@ public class SplitAvro extends AbstractProcessor {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final ObjectHolder<String> codec = new ObjectHolder<>(reader.getMetaString(DataFileConstants.CODEC));
final AtomicReference<String> codec = new AtomicReference<>(reader.getMetaString(DataFileConstants.CODEC));
if (codec.get() == null) {
codec.set(DataFileConstants.NULL_CODEC);
}

View File

@ -52,7 +52,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
@ -71,6 +70,7 @@ import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@Tags({"cassandra", "cql", "select"})
@EventDriven
@ -228,7 +228,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
// and states that it is thread-safe. This is why connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery);
final LongHolder nrOfRows = new LongHolder(0L);
final AtomicLong nrOfRows = new AtomicLong(0L);
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override
@ -259,7 +259,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
});
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString());
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});

View File

@ -284,8 +284,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
@SuppressWarnings("deprecation")
public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
if (toSwap == null || toSwap.isEmpty()) {
return 0;

View File

@ -986,7 +986,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws NullPointerException if either arg is null
* @throws ProcessorInstantiationException if the processor cannot be instantiated for any reason
*/
@SuppressWarnings("deprecation")
public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException {
id = id.intern();
@ -3605,7 +3604,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return replayFlowFile(record, user);
}
@SuppressWarnings("deprecation")
public ProvenanceEventRecord replayFlowFile(final ProvenanceEventRecord event, final NiFiUser user) throws IOException {
if (event == null) {
throw new NullPointerException();

View File

@ -733,7 +733,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
@SuppressWarnings("deprecation")
public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
int returnVal = 0;
final boolean f1Penalized = f1.isPenalized();

View File

@ -139,7 +139,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
processor.getClass().getSimpleName(), processor.getClass().getCanonicalName());
}
@SuppressWarnings("deprecation")
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider,

View File

@ -70,7 +70,6 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.file.FileUtils;
@ -297,7 +296,7 @@ public class FileSystemRepository implements ContentRepository {
final Callable<Long> scanContainer = new Callable<Long>() {
@Override
public Long call() throws IOException {
final LongHolder oldestDateHolder = new LongHolder(0L);
final AtomicLong oldestDateHolder = new AtomicLong(0L);
// the path already exists, so scan the path to find any files and update maxIndex to the max of
// all filenames seen.

View File

@ -54,7 +54,6 @@ import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.io.LongHolder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
@ -120,8 +119,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private int removedCount = 0; // number of flowfiles removed in this session
private long removedBytes = 0L; // size of all flowfiles removed in this session
private final LongHolder bytesRead = new LongHolder(0L);
private final LongHolder bytesWritten = new LongHolder(0L);
private final AtomicLong bytesRead = new AtomicLong(0L);
private final AtomicLong bytesWritten = new AtomicLong(0L);
private int flowFilesIn = 0, flowFilesOut = 0;
private long contentSizeIn = 0L, contentSizeOut = 0L;
@ -966,8 +965,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Connectable connectable = context.getConnectable();
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
flowFileEvent.setBytesRead(bytesRead.getValue());
flowFileEvent.setBytesWritten(bytesWritten.getValue());
flowFileEvent.setBytesRead(bytesRead.get());
flowFileEvent.setBytesWritten(bytesWritten.get());
// update event repository
try {
@ -1055,8 +1054,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
flowFilesOut = 0;
removedCount = 0;
removedBytes = 0L;
bytesRead.setValue(0L);
bytesWritten.setValue(0L);
bytesRead.set(0L);
bytesWritten.set(0L);
connectionCounts.clear();
createdFlowFiles.clear();
removedFlowFiles.clear();
@ -1822,7 +1821,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
currentReadClaim = claim;
currentReadClaimStream = new ByteCountingInputStream(rawInStream, new LongHolder(0L));
currentReadClaimStream = new ByteCountingInputStream(rawInStream, new AtomicLong(0L));
StreamUtils.skip(currentReadClaimStream, offset);
// Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
@ -2069,8 +2068,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
writtenCount += footer.length;
}
} finally {
bytesWritten.increment(writtenCount);
bytesRead.increment(readCount);
bytesWritten.getAndAdd(writtenCount);
bytesRead.getAndAdd(readCount);
}
} catch (final ContentNotFoundException nfe) {
destroyContent(newClaim);
@ -2111,7 +2110,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StandardRepositoryRecord record = records.get(source);
ContentClaim newClaim = null;
final LongHolder writtenHolder = new LongHolder(0L);
final AtomicLong writtenHolder = new AtomicLong(0L);
try {
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
@ -2142,7 +2141,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
destroyContent(newClaim);
throw t;
} finally {
bytesWritten.increment(writtenHolder.getValue());
bytesWritten.getAndAdd(writtenHolder.get());
}
removeTemporaryClaim(record);
@ -2150,7 +2149,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
.fromFlowFile(record.getCurrent())
.contentClaim(newClaim)
.contentClaimOffset(0)
.size(writtenHolder.getValue())
.size(writtenHolder.get())
.build();
record.setWorking(newFile);
@ -2178,7 +2177,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final OutputStream rawOutStream = context.getContentRepository().write(newClaim);
final OutputStream bufferedOutStream = new BufferedOutputStream(rawOutStream);
outStream = new ByteCountingOutputStream(bufferedOutStream, new LongHolder(0L));
outStream = new ByteCountingOutputStream(bufferedOutStream, new AtomicLong(0L));
originalByteWrittenCount = 0;
appendableStreams.put(newClaim, outStream);
@ -2224,7 +2223,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} finally {
if (outStream != null) {
final long bytesWrittenThisIteration = outStream.getBytesWritten() - originalByteWrittenCount;
bytesWritten.increment(bytesWrittenThisIteration);
bytesWritten.getAndAdd(bytesWrittenThisIteration);
}
}
@ -2313,7 +2312,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ContentClaim currClaim = record.getCurrentClaim();
ContentClaim newClaim = null;
final LongHolder writtenHolder = new LongHolder(0L);
final AtomicLong writtenHolder = new AtomicLong(0L);
try {
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
@ -2365,7 +2364,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
destroyContent(newClaim);
throw t;
} finally {
bytesWritten.increment(writtenHolder.getValue());
bytesWritten.getAndAdd(writtenHolder.get());
}
removeTemporaryClaim(record);
@ -2373,7 +2372,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
.fromFlowFile(record.getCurrent())
.contentClaim(newClaim)
.contentClaimOffset(0L)
.size(writtenHolder.getValue())
.size(writtenHolder.get())
.build();
record.setWorking(newFile);
@ -2405,8 +2404,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
long newSize = 0L;
try {
newSize = context.getContentRepository().importFrom(source, newClaim);
bytesWritten.increment(newSize);
bytesRead.increment(newSize);
bytesWritten.getAndAdd(newSize);
bytesRead.getAndAdd(newSize);
} catch (final Throwable t) {
destroyContent(newClaim);
throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
@ -2439,7 +2438,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
newSize = context.getContentRepository().importFrom(source, newClaim);
bytesWritten.increment(newSize);
bytesWritten.getAndAdd(newSize);
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
@ -2465,8 +2464,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(record.getCurrentClaim());
final long copyCount = context.getContentRepository().exportTo(record.getCurrentClaim(), destination, append, record.getCurrentClaimOffset(), source.getSize());
bytesRead.increment(copyCount);
bytesWritten.increment(copyCount);
bytesRead.getAndAdd(copyCount);
bytesWritten.getAndAdd(copyCount);
} catch (final ContentNotFoundException nfe) {
handleContentNotFound(nfe, record);
} catch (final Throwable t) {
@ -2640,7 +2639,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers
// and use the earliest lineage start date
long lineageStartDate = 0L;
final Set<String> lineageIdentifiers = new HashSet<>();
for (final FlowFile parent : parents) {
final long parentLineageStartDate = parent.getLineageStartDate();
@ -2785,8 +2783,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.removedCount += session.removedCount;
this.removedBytes += session.removedBytes;
this.bytesRead += session.bytesRead.getValue();
this.bytesWritten += session.bytesWritten.getValue();
this.bytesRead += session.bytesRead.get();
this.bytesWritten += session.bytesWritten.get();
this.flowFilesIn += session.flowFilesIn;
this.flowFilesOut += session.flowFilesOut;
this.contentSizeIn += session.contentSizeIn;

View File

@ -18,14 +18,15 @@ package org.apache.nifi.controller.repository.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicLong;
public class ByteCountingInputStream extends InputStream {
private final LongHolder bytesReadHolder;
private final AtomicLong bytesReadHolder;
private final InputStream in;
private long bytesSkipped = 0L;
public ByteCountingInputStream(final InputStream in, final LongHolder longHolder) {
public ByteCountingInputStream(final InputStream in, final AtomicLong longHolder) {
this.in = in;
this.bytesReadHolder = longHolder;
}
@ -34,7 +35,7 @@ public class ByteCountingInputStream extends InputStream {
public int read() throws IOException {
final int fromSuper = in.read();
if (fromSuper >= 0) {
bytesReadHolder.increment(1);
bytesReadHolder.getAndIncrement();
}
return fromSuper;
}
@ -43,7 +44,7 @@ public class ByteCountingInputStream extends InputStream {
public int read(byte[] b, int off, int len) throws IOException {
final int fromSuper = in.read(b, off, len);
if (fromSuper >= 0) {
bytesReadHolder.increment(fromSuper);
bytesReadHolder.getAndAdd(fromSuper);
}
return fromSuper;
@ -87,7 +88,7 @@ public class ByteCountingInputStream extends InputStream {
}
public long getBytesRead() {
return bytesReadHolder.getValue();
return bytesReadHolder.get();
}
public long getBytesSkipped() {

View File

@ -18,13 +18,14 @@ package org.apache.nifi.controller.repository.io;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
public class ByteCountingOutputStream extends OutputStream {
private final LongHolder bytesWrittenHolder;
private final AtomicLong bytesWrittenHolder;
private final OutputStream out;
public ByteCountingOutputStream(final OutputStream out, final LongHolder longHolder) {
public ByteCountingOutputStream(final OutputStream out, final AtomicLong longHolder) {
this.out = out;
this.bytesWrittenHolder = longHolder;
}
@ -32,7 +33,7 @@ public class ByteCountingOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
out.write(b);
bytesWrittenHolder.increment(1);
bytesWrittenHolder.getAndIncrement();
}
@Override
@ -43,11 +44,11 @@ public class ByteCountingOutputStream extends OutputStream {
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
bytesWrittenHolder.increment(len);
bytesWrittenHolder.getAndAdd(len);
}
public long getBytesWritten() {
return bytesWrittenHolder.getValue();
return bytesWrittenHolder.get();
}
@Override

View File

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.io;
/**
* Class to hold a long value that can be incremented and decremented. This allows the abstraction of passing a long value by reference, rather than by value, without the overhead of synchronization
* required by the use of an AtomicLong.
*/
public class LongHolder {
private long value;
public LongHolder() {
value = 0L;
}
public LongHolder(final long initialValue) {
value = initialValue;
}
public void increment(long value) {
this.value += value;
}
public long getValue() {
return value;
}
public void setValue(final long value) {
this.value = value;
}
}

View File

@ -270,7 +270,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
}
@SuppressWarnings("deprecation")
private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
final int newThreadCount = scheduleState.incrementActiveThreadCount();
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {

View File

@ -181,7 +181,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
scheduleState.setScheduled(true);
final Runnable startReportingTaskRunnable = new Runnable() {
@SuppressWarnings("deprecation")
@Override
public void run() {
final long lastStopTime = scheduleState.getLastStopTime();
@ -243,7 +242,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
taskNode.setScheduledState(ScheduledState.STOPPED);
final Runnable unscheduleReportingTaskRunnable = new Runnable() {
@SuppressWarnings("deprecation")
@Override
public void run() {
final ConfigurationContext configurationContext = taskNode.getConfigurationContext();

View File

@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnAdded;
@ -56,7 +57,6 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -144,7 +144,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
final ControllerService originalService = controllerServiceClass.newInstance();
final ObjectHolder<ControllerServiceNode> serviceNodeHolder = new ObjectHolder<>(null);
final AtomicReference<ControllerServiceNode> serviceNodeHolder = new AtomicReference<>(null);
final InvocationHandler invocationHandler = new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {

View File

@ -54,7 +54,6 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
}
@Override
@SuppressWarnings("deprecation")
public Boolean call() {
if (!scheduleState.isScheduled()) {
return false;

View File

@ -34,7 +34,6 @@ public class ReportingTaskWrapper implements Runnable {
this.scheduleState = scheduleState;
}
@SuppressWarnings("deprecation")
@Override
public synchronized void run() {
scheduleState.incrementActiveThreadCount();

View File

@ -333,7 +333,6 @@ public final class StandardProcessGroup implements ProcessGroup {
return flowController.getStateManagerProvider().getStateManager(componentId);
}
@SuppressWarnings("deprecation")
private void shutdown(final ProcessGroup procGroup) {
for (final ProcessorNode node : procGroup.getProcessors()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
@ -688,7 +687,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
@SuppressWarnings("deprecation")
public void removeProcessor(final ProcessorNode processor) {
final String id = requireNonNull(processor).getIdentifier();
writeLock.lock();

View File

@ -69,7 +69,6 @@ public class TestFileSystemSwapManager {
}
@Test
@SuppressWarnings("deprecation")
public void testRoundTripSerializeDeserialize() throws IOException {
final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
final Map<String, String> attrs = new HashMap<>();

View File

@ -564,7 +564,6 @@ public class TestStandardFlowFileQueue {
}
@Override
@SuppressWarnings("deprecation")
public SwapSummary getSwapSummary(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
if (flowFiles == null) {
@ -668,7 +667,6 @@ public class TestStandardFlowFileQueue {
}
@Override
@SuppressWarnings("deprecation")
public int compareTo(final FlowFile o) {
return Long.compare(id, o.getId());
}

View File

@ -157,7 +157,6 @@ public class TestWriteAheadFlowFileRepository {
}
@Test
@SuppressWarnings("deprecation")
public void testRestartWithOneRecord() throws IOException {
final Path path = Paths.get("target/test-repo");
if (Files.exists(path)) {
@ -305,7 +304,6 @@ public class TestWriteAheadFlowFileRepository {
}
@Override
@SuppressWarnings("deprecation")
public SwapSummary getSwapSummary(String swapLocation) throws IOException {
List<FlowFileRecord> records = null;
for (final Map<String, List<FlowFileRecord>> swapMap : swappedRecords.values()) {

View File

@ -95,77 +95,6 @@ public class FileUtils {
return isGone;
}
/**
* Deletes all files (not directories..) in the given directory (non
* recursive) that match the given filename filter. If any file cannot be
* deleted then this is printed at warn to the given logger.
*
* @param directory to delete contents of
* @param filter if null then no filter is used
* @param logger to notify
* @deprecated As of release 0.6.0, replaced by
* {@link #deleteFilesInDirectory(File, FilenameFilter, Logger)}
*/
@Deprecated
public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) {
FileUtils.deleteFilesInDir(directory, filter, logger, false);
}
/**
* Deletes all files (not directories) in the given directory (recursive)
* that match the given filename filter. If any file cannot be deleted then
* this is printed at warn to the given logger.
*
* @param directory to delete contents of
* @param filter if null then no filter is used
* @param logger to notify
* @param recurse true if should recurse
* @deprecated As of release 0.6.0, replaced by
* {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean)}
*/
@Deprecated
public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) {
FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
}
/**
* Deletes all files (not directories) in the given directory (recursive)
* that match the given filename filter. If any file cannot be deleted then
* this is printed at warn to the given logger.
*
* @param directory to delete contents of
* @param filter if null then no filter is used
* @param logger to notify
* @param recurse will look for contents of sub directories.
* @param deleteEmptyDirectories default is false; if true will delete
* directories found that are empty
* @deprecated As of release 0.6.0, replaced by
* {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean, boolean)}
*/
@Deprecated
public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) {
// ensure the specified directory is actually a directory and that it exists
if (null != directory && directory.isDirectory()) {
final File ingestFiles[] = directory.listFiles();
if (ingestFiles == null) {
// null if abstract pathname does not denote a directory, or if an I/O error occurs
logger.error("Unable to list directory content in: " + directory.getAbsolutePath());
}
for (File ingestFile : ingestFiles) {
boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName());
if (ingestFile.isFile() && process) {
FileUtils.deleteFile(ingestFile, logger, 3);
}
if (ingestFile.isDirectory() && recurse) {
FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
if (deleteEmptyDirectories && ingestFile.list().length == 0) {
FileUtils.deleteFile(ingestFile, logger, 3);
}
}
}
}
}
/**
* Deletes all files (not directories..) in the given directory (non
* recursive) that match the given filename filter. If any file cannot be

View File

@ -1927,23 +1927,14 @@ public final class DtoFactory {
/**
* Gets the capability description from the specified class.
*/
@SuppressWarnings("deprecation")
private String getCapabilityDescription(final Class<?> cls) {
final CapabilityDescription capabilityDesc = cls.getAnnotation(CapabilityDescription.class);
if (capabilityDesc != null) {
return capabilityDesc.value();
}
final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapabilityDesc
= cls.getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class);
return (deprecatedCapabilityDesc == null) ? null : deprecatedCapabilityDesc.value();
return capabilityDesc == null ? null : capabilityDesc.value();
}
/**
* Gets the tags from the specified class.
*/
@SuppressWarnings("deprecation")
private Set<String> getTags(final Class<?> cls) {
final Set<String> tags = new HashSet<>();
final Tags tagsAnnotation = cls.getAnnotation(Tags.class);
@ -1951,13 +1942,6 @@ public final class DtoFactory {
for (final String tag : tagsAnnotation.value()) {
tags.add(tag);
}
} else {
final org.apache.nifi.processor.annotation.Tags deprecatedTagsAnnotation = cls.getAnnotation(org.apache.nifi.processor.annotation.Tags.class);
if (deprecatedTagsAnnotation != null) {
for (final String tag : deprecatedTagsAnnotation.value()) {
tags.add(tag);
}
}
}
return tags;
@ -2129,7 +2113,6 @@ public final class DtoFactory {
* @param node node
* @return dto
*/
@SuppressWarnings("deprecation")
public ProvenanceNodeDTO createProvenanceEventNodeDTO(final ProvenanceEventLineageNode node) {
final ProvenanceNodeDTO dto = new ProvenanceNodeDTO();
dto.setId(node.getIdentifier());
@ -2140,7 +2123,6 @@ public final class DtoFactory {
dto.setFlowFileUuid(node.getFlowFileUuid());
dto.setParentUuids(node.getParentUuids());
dto.setChildUuids(node.getChildUuids());
dto.setClusterNodeIdentifier(node.getClusterNodeIdentifier());
return dto;
}
@ -2150,7 +2132,6 @@ public final class DtoFactory {
* @param node node
* @return dto
*/
@SuppressWarnings("deprecation")
public ProvenanceNodeDTO createFlowFileNodeDTO(final LineageNode node) {
final ProvenanceNodeDTO dto = new ProvenanceNodeDTO();
dto.setId(node.getIdentifier());
@ -2158,7 +2139,6 @@ public final class DtoFactory {
dto.setTimestamp(new Date(node.getTimestamp()));
dto.setMillis(node.getTimestamp());
dto.setFlowFileUuid(node.getFlowFileUuid());
dto.setClusterNodeIdentifier(node.getClusterNodeIdentifier());
return dto;
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web;
import org.apache.nifi.web.revision.RevisionManager;
/**
* A manager for optimistic locking based on revisions. A revision is composed
* of a client ID and a version number. Two revisions are considered equal if
* either their version numbers match or their client IDs match.
*
* @deprecated This class has been deprecated in favor of {@link RevisionManager}
*/
@Deprecated
public interface OptimisticLockingManager {
/**
* Attempts to execute the specified configuration request using the
* specified revision within a lock.
*
* @param <T> type of snapshot
* @param revision revision
* @param configurationRequest request
* @return snapshot
*/
<T> ConfigurationSnapshot<T> configureFlow(Revision revision, ConfigurationRequest<T> configurationRequest);
/**
* Updates the revision using the specified revision within a lock.
*
* @param updateRevision new revision
*/
void setRevision(UpdateRevision updateRevision);
/**
* Returns the last flow modification. This is a combination of the revision
* and the user who performed the modification.
*
* @return the last modification
*/
FlowModification getLastModification();
}

View File

@ -31,10 +31,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile;
@ -46,8 +44,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.HDFSListing;
import org.apache.nifi.processors.hadoop.util.HDFSListing.StateKeys;
import org.apache.nifi.processors.hadoop.util.StringSerDe;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
@ -124,7 +120,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
private volatile long latestTimestampListed = -1L;
private volatile long latestTimestampEmitted = -1L;
private volatile boolean electedPrimaryNodeSinceLastIteration = false;
private volatile long lastRunTimestamp = -1L;
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
@ -175,80 +170,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
return mapper.readValue(jsonNode, HDFSListing.class);
}
/**
* Transitions state from the Distributed cache service to the state manager. This will be
* removed in NiFi 1.x
*
* @param context the ProcessContext
* @throws IOException if unable to communicate with state manager or controller service
*/
@Deprecated
@OnScheduled
public void moveStateToStateManager(final ProcessContext context) throws IOException {
final StateManager stateManager = context.getStateManager();
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
// Check if we have already stored state in the cluster state manager.
if (stateMap.getVersion() == -1L) {
final HDFSListing serviceListing = getListingFromService(context);
if (serviceListing != null) {
context.getStateManager().setState(serviceListing.toMap(), Scope.CLUSTER);
}
}
}
@Deprecated
private HDFSListing getListingFromService(final ProcessContext context) throws IOException {
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
if (client == null) {
return null;
}
final String directory = context.getProperty(DIRECTORY).getValue();
final String remoteValue = client.get(getKey(directory), new StringSerDe(), new StringSerDe());
if (remoteValue == null) {
return null;
}
try {
return deserialize(remoteValue);
} catch (final Exception e) {
getLogger().error("Failed to retrieve state from Distributed Map Cache because the content that was retrieved could not be understood", e);
return null;
}
}
/**
* Restores state information from the 'old' style of storing state. This is deprecated and will no longer be supported
* in the 1.x NiFi baseline
*
* @param directory the directory that the listing was performed against
* @param remoteListing the remote listing
* @return the minimum timestamp that should be used for new entries
*/
@Deprecated
private Long restoreTimestampFromOldStateFormat(final String directory, final HDFSListing remoteListing) {
// No cluster-wide state has been recovered. Just use whatever values we already have.
if (remoteListing == null) {
return latestTimestampListed;
}
// If our local timestamp is already later than the remote listing's timestamp, use our local info.
Long minTimestamp = latestTimestampListed;
if (minTimestamp != null && minTimestamp > remoteListing.getLatestTimestamp().getTime()) {
return minTimestamp;
}
// Use the remote listing's information.
if (minTimestamp == null || electedPrimaryNodeSinceLastIteration) {
this.latestTimestampListed = remoteListing.getLatestTimestamp().getTime();
this.latestTimestampEmitted = this.latestTimestampListed;
}
return minTimestamp;
}
/**
* Determines which of the given FileStatus's describes a File that should be listed.
*
@ -339,13 +260,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
} else {
// Determine if state is stored in the 'new' format or the 'old' format
final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
if (emittedString == null && stateMap.get(StateKeys.TIMESTAMP) != null) {
// state is stored in the old format with XML
final Map<String, String> stateValues = stateMap.toMap();
final HDFSListing stateListing = HDFSListing.fromMap(stateValues);
getLogger().debug("Found old-style state stored");
restoreTimestampFromOldStateFormat(directory, stateListing);
} else if (emittedString == null) {
if (emittedString == null) {
latestTimestampEmitted = -1L;
latestTimestampListed = -1L;
getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");

View File

@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -73,7 +74,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
@TriggerWhenEmpty
@TriggerSerially
@ -274,8 +274,8 @@ public class GetHBase extends AbstractProcessor {
final Map<String, Set<String>> cellsMatchingTimestamp = new HashMap<>();
final ObjectHolder<Long> rowsPulledHolder = new ObjectHolder<>(0L);
final ObjectHolder<Long> latestTimestampHolder = new ObjectHolder<>(minTime);
final AtomicReference<Long> rowsPulledHolder = new AtomicReference<>(0L);
final AtomicReference<Long> latestTimestampHolder = new AtomicReference<>(minTime);
hBaseClientService.scan(tableName, columns, filterExpression, minTime, new ResultHandler() {

View File

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -46,7 +47,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
@ -166,7 +166,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
final AtomicReference<JsonNode> rootNodeRef = new AtomicReference<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
@ -189,13 +189,13 @@ public class PutHBaseJSON extends AbstractPutHBase {
}
final Collection<PutColumn> columns = new ArrayList<>();
final ObjectHolder<String> rowIdHolder = new ObjectHolder<>(null);
final AtomicReference<String> rowIdHolder = new AtomicReference<>(null);
// convert each field/value to a column for the put, skip over nulls and arrays
final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final ObjectHolder<byte[]> fieldValueHolder = new ObjectHolder<>(null);
final AtomicReference<byte[]> fieldValueHolder = new AtomicReference<>(null);
final JsonNode fieldNode = rootNode.get(fieldName);
if (fieldNode.isNull()) {

View File

@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -47,7 +48,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.hive.HiveJdbcCommon;
@ -157,7 +157,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
final LongHolder nrOfRows = new LongHolder(0L);
final AtomicLong nrOfRows = new AtomicLong(0L);
if (fileToProcess == null) {
fileToProcess = session.create();
}
@ -182,7 +182,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
});
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString());
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
// Set MIME type on output document and add extension
if (AVRO.equals(outputFormat)) {

View File

@ -53,7 +53,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException;
import org.apache.nifi.util.LongHolder;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.SchemaNotFoundException;
@ -63,6 +62,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.concurrent.atomic.AtomicLong;
@Tags({ "avro", "convert", "kite" })
@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions")
@ -291,7 +291,7 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
failureWriter.setCodec(CodecFactory.snappyCodec());
try {
final LongHolder written = new LongHolder(0L);
final AtomicLong written = new AtomicLong(0L);
final FailureTracker failures = new FailureTracker();
final List<Record> badRecords = Lists.newLinkedList();

View File

@ -47,7 +47,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.LongHolder;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException;
@ -59,6 +58,7 @@ import org.kitesdk.data.spi.filesystem.CSVProperties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.concurrent.atomic.AtomicLong;
@Tags({"kite", "csv", "avro"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -224,7 +224,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
writer.setCodec(CodecFactory.snappyCodec());
try {
final LongHolder written = new LongHolder(0L);
final AtomicLong written = new AtomicLong(0L);
final FailureTracker failures = new FailureTracker();
FlowFile badRecords = session.clone(incomingCSV);

View File

@ -39,7 +39,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.util.LongHolder;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException;
@ -50,6 +49,7 @@ import org.kitesdk.data.spi.filesystem.JSONFileReader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.concurrent.atomic.AtomicLong;
@Tags({"kite", "json", "avro"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -132,7 +132,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
writer.setCodec(CodecFactory.snappyCodec());
try {
final LongHolder written = new LongHolder(0L);
final AtomicLong written = new AtomicLong(0L);
final FailureTracker failures = new FailureTracker();
FlowFile badRecords = session.clone(incomingJSON);

View File

@ -44,13 +44,13 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import com.drew.imaging.ImageMetadataReader;
import com.drew.imaging.ImageProcessingException;
import com.drew.metadata.Directory;
import com.drew.metadata.Metadata;
import com.drew.metadata.Tag;
import java.util.concurrent.atomic.AtomicReference;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Exif", "Exchangeable", "image", "file", "format", "JPG", "GIF", "PNG", "BMP", "metadata","IPTC", "XMP"})
@ -116,7 +116,7 @@ public class ExtractImageMetadata extends AbstractProcessor {
}
final ComponentLog logger = this.getLogger();
final ObjectHolder<Metadata> value = new ObjectHolder<>(null);
final AtomicReference<Metadata> value = new AtomicReference<>(null);
final Integer max = context.getProperty(MAX_NUMBER_OF_ATTRIBUTES).asInteger();
try {

View File

@ -113,7 +113,6 @@ import java.util.stream.Collectors;
public class PersistentProvenanceRepository implements ProvenanceEventRepository {
public static final String DEPRECATED_CLASS_NAME = "nifi.controller.repository.provenance.PersistentProvenanceRepository";
public static final String EVENT_CATEGORY = "Provenance Repository";
private static final String FILE_EXTENSION = ".prov";
private static final String TEMP_FILE_SUFFIX = ".prov.part";

View File

@ -35,7 +35,6 @@ import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.Filter;
@ -611,7 +610,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
@Override
public void run() {
// Retrieve the most recent results and count the total number of matches
final IntegerHolder matchingCount = new IntegerHolder(0);
final AtomicInteger matchingCount = new AtomicInteger(0);
final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>(maxRecords);
ringBuffer.forEach(new ForEachEvaluator<ProvenanceEventRecord>() {
@Override

View File

@ -49,10 +49,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

View File

@ -32,6 +32,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -47,7 +48,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
@ -166,8 +166,8 @@ public class PutSolrContentStream extends SolrProcessor {
return;
}
final ObjectHolder<Exception> error = new ObjectHolder<>(null);
final ObjectHolder<Exception> connectionError = new ObjectHolder<>(null);
final AtomicReference<Exception> error = new AtomicReference<>(null);
final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();

View File

@ -39,7 +39,6 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@ -50,6 +49,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"splunk", "logs", "tcp", "udp"})
@ -219,7 +219,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
// some pattern. We can use this to search for the delimiter as we read through the stream of bytes in the FlowFile
final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
final LongHolder messagesSent = new LongHolder(0L);
final AtomicLong messagesSent = new AtomicLong(0L);
final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile);
activeBatches.add(messageBatch);

View File

@ -31,7 +31,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.JsonPathExpressionValidator;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
@ -39,6 +38,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
/**
* Provides common functionality used for processors interacting and manipulating JSON data via JsonPath.
@ -73,7 +73,7 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
// Parse the document once into an associated context to support multiple path evaluations if specified
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
final AtomicReference<DocumentContext> contextHolder = new AtomicReference<>(null);
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {

View File

@ -1,354 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.Bin;
import org.apache.nifi.processors.standard.util.BinManager;
import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
/**
* Base class for file-binning processors, including MergeContent.
*
* @deprecated As of release 0.5.0, replaced by
* {@link org.apache.nifi.processor.util.bin.BinFiles}
*/
@Deprecated
public abstract class BinFiles extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
.name("Minimum Group Size")
.description("The minimum size of for the bundle")
.required(true)
.defaultValue("0 B")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Group Size")
.description("The maximum size for the bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
.name("Minimum Number of Entries")
.description("The minimum number of files to include in a bundle")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Number of Entries")
.description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
.name("Maximum number of Bins")
.description("Specifies the maximum number of bins that can be held in memory at any one time")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
.name("Max Bin Age")
.description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
+ "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The FlowFiles that were used to create the bundle")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
.build();
private final BinManager binManager = new BinManager();
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
@OnStopped
public final void resetState() {
binManager.purge();
Bin bin;
while ((bin = readyBins.poll()) != null) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().rollback();
}
}
}
/**
* Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
*
* @param context context
* @param session session
* @param flowFile flowFile
* @return The flow file, possibly altered
*/
protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
/**
* Returns a group ID representing a bin. This allows flow files to be binned into like groups.
*
* @param context context
* @param flowFile flowFile
* @return The appropriate group ID
*/
protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
/**
* Performs any additional setup of the bin manager. Called during the OnScheduled phase.
*
* @param binManager The bin manager
* @param context context
*/
protected abstract void setUpBinManager(BinManager binManager, ProcessContext context);
/**
* Processes a single bin. Implementing class is responsible for committing each session
*
* @param unmodifiableBin A reference to a single bin of flow file/session wrappers
* @param binContents A copy of the contents of the bin
* @param context The context
* @param session The session that created the bin
* @return Return true if the input bin was already committed. E.g., in case of a failure, the implementation may choose to transfer all binned files to Failure and commit their sessions. If
* false, the processBins() method will transfer the files to Original and commit the sessions
*
* @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin will be transferred to failure and the ProcessSession provided by the 'session'
* argument rolled back
*/
protected abstract boolean processBin(Bin unmodifiableBin, List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
/**
* Allows additional custom validation to be done. This will be called from the parent's customValidation method.
*
* @param context The context
* @return Validation results indicating problems
*/
protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) {
return new ArrayList<>();
}
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final int flowFilesBinned = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
if (!isScheduled()) {
return;
}
final int binsMigrated = migrateBins(context);
final int binsProcessed = processBins(context, sessionFactory);
//If we accomplished nothing then let's yield
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
context.yield();
}
}
private int migrateBins(final ProcessContext context) {
int added = 0;
for (final Bin bin : binManager.removeReadyBins(true)) {
this.readyBins.add(bin);
added++;
}
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now.
if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
this.readyBins.add(bin);
}
}
return added;
}
private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final Bin bin = readyBins.poll();
if (bin == null) {
return 0;
}
final List<Bin> bins = new ArrayList<>();
bins.add(bin);
final ComponentLog logger = getLogger();
final ProcessSession session = sessionFactory.createSession();
final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents());
boolean binAlreadyCommitted = false;
try {
binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
} catch (final ProcessException e) {
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
}
session.rollback();
return 1;
}
// we first commit the bundle's session before the originals' sessions because if we are restarted or crash
// between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
// across multiple sessions, we cannot guarantee atomicity across the sessions
session.commit();
// If this bin's session has been committed, move on.
if (!binAlreadyCommitted) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
wrapper.getSession().commit();
}
}
return 1;
}
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int flowFilesBinned = 0;
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
if (!isScheduled()) {
break;
}
final ProcessSession session = sessionFactory.createSession();
FlowFile flowFile = session.get();
if (flowFile == null) {
break;
}
flowFile = this.preprocessFlowFile(context, session, flowFile);
String groupId = this.getGroupId(context, flowFile);
final boolean binned = binManager.offer(groupId, flowFile, session);
// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
if (!binned) {
Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
bin.offer(flowFile, session);
this.readyBins.add(bin);
}
flowFilesBinned++;
}
return flowFilesBinned;
}
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
if (context.getProperty(MAX_BIN_AGE).isSet()) {
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
} else {
binManager.setMaxBinAge(Integer.MAX_VALUE);
}
if (context.getProperty(MAX_SIZE).isSet()) {
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
} else {
binManager.setMaximumSize(Long.MAX_VALUE);
}
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
if (context.getProperty(MAX_ENTRIES).isSet()) {
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
} else {
binManager.setMaximumEntries(Integer.MAX_VALUE);
}
this.setUpBinManager(binManager, context);
}
@Override
protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
if (maxBytes != null && maxBytes.longValue() < minBytes) {
problems.add(
new ValidationResult.Builder()
.subject(MIN_SIZE.getName())
.input(context.getProperty(MIN_SIZE).getValue())
.valid(false)
.explanation("Min Size must be less than or equal to Max Size")
.build()
);
}
final Long min = context.getProperty(MIN_ENTRIES).asLong();
final Long max = context.getProperty(MAX_ENTRIES).asLong();
if (min != null && max != null) {
if (min > max) {
problems.add(
new ValidationResult.Builder().subject(MIN_ENTRIES.getName())
.input(context.getProperty(MIN_ENTRIES).getValue())
.valid(false)
.explanation("Min Entries must be less than or equal to Max Entries")
.build()
);
}
}
Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
if (otherProblems != null) {
problems.addAll(otherProblems);
}
return problems;
}
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
@ -55,7 +56,6 @@ import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.XZInputStream;
@ -203,7 +203,7 @@ public class CompressContent extends AbstractProcessor {
}
final String compressionFormat = compressionFormatValue;
final ObjectHolder<String> mimeTypeRef = new ObjectHolder<>(null);
final AtomicReference<String> mimeTypeRef = new AtomicReference<>(null);
final StopWatch stopWatch = new StopWatch(true);
final String fileExtension;

View File

@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -57,7 +58,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
@ -276,7 +276,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
final AtomicReference<JsonNode> rootNodeRef = new AtomicReference<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override

View File

@ -52,12 +52,12 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import java.util.concurrent.atomic.AtomicReference;
@EventDriven
@SideEffectFree
@ -277,7 +277,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
final JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
final String pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue();
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
final AtomicReference<Object> resultHolder = new AtomicReference<>(null);
try {
final Object result = documentContext.read(jsonPathExp);
if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(result)) {

View File

@ -75,7 +75,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.xml.sax.InputSource;
import net.sf.saxon.lib.NamespaceConstant;
@ -271,8 +270,8 @@ public class EvaluateXPath extends AbstractProcessor {
flowFileLoop:
for (FlowFile flowFile : flowFiles) {
final ObjectHolder<Throwable> error = new ObjectHolder<>(null);
final ObjectHolder<Source> sourceRef = new ObjectHolder<>(null);
final AtomicReference<Throwable> error = new AtomicReference<>(null);
final AtomicReference<Source> sourceRef = new AtomicReference<>(null);
session.read(flowFile, new InputStreamCallback() {
@Override
@ -402,7 +401,7 @@ public class EvaluateXPath extends AbstractProcessor {
final ComponentLog logger = getLogger();
final ObjectHolder<TransformerException> error = new ObjectHolder<>(null);
final AtomicReference<TransformerException> error = new AtomicReference<>(null);
transformer.setErrorListener(new ErrorListener() {
@Override
public void warning(final TransformerException exception) throws TransformerException {

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.OutputKeys;
@ -66,7 +67,6 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;
@ -264,8 +264,8 @@ public class EvaluateXQuery extends AbstractProcessor {
return;
}
final ObjectHolder<Throwable> error = new ObjectHolder<>(null);
final ObjectHolder<XdmNode> sourceRef = new ObjectHolder<>(null);
final AtomicReference<Throwable> error = new AtomicReference<>(null);
final AtomicReference<XdmNode> sourceRef = new AtomicReference<>(null);
session.read(flowFile, new InputStreamCallback() {
@Override

View File

@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -51,7 +52,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
@EventDriven
@ -180,7 +180,7 @@ public class ExecuteSQL extends AbstractProcessor {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L);
final AtomicLong nrOfRows = new AtomicLong(0L);
if (fileToProcess == null) {
fileToProcess = session.create();
}
@ -198,7 +198,7 @@ public class ExecuteSQL extends AbstractProcessor {
});
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString());
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});

View File

@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -48,7 +49,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@SupportsBatching
@ -129,7 +129,7 @@ public class HashContent extends AbstractProcessor {
return;
}
final ObjectHolder<String> hashValueHolder = new ObjectHolder<>(null);
final AtomicReference<String> hashValueHolder = new AtomicReference<>(null);
try {
session.read(flowFile, new InputStreamCallback() {

View File

@ -22,6 +22,7 @@ import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -40,7 +41,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.util.ObjectHolder;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.detect.Detector;
import org.apache.tika.io.TikaInputStream;
@ -117,7 +117,7 @@ public class IdentifyMimeType extends AbstractProcessor {
}
final ComponentLog logger = getLogger();
final ObjectHolder<String> mimeTypeRef = new ObjectHolder<>(null);
final AtomicReference<String> mimeTypeRef = new AtomicReference<>(null);
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
session.read(flowFile, new InputStreamCallback() {

View File

@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.MapMessage;
@ -60,7 +61,6 @@ import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.StopWatch;
public abstract class JmsConsumer extends AbstractProcessor {
@ -181,7 +181,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
throws Exception {
// Currently not very useful, because always one Message == one FlowFile
final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
final AtomicInteger msgsThisFlowFile = new AtomicInteger(1);
FlowFile flowFile = session.create();
try {

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@ -85,7 +86,6 @@ import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.ObjectHolder;
@SideEffectFree
@TriggerWhenEmpty
@ -557,7 +557,7 @@ public class MergeContent extends BinFiles {
}
FlowFile bundle = session.create(parentFlowFiles);
final ObjectHolder<String> bundleMimeTypeRef = new ObjectHolder<>(null);
final AtomicReference<String> bundleMimeTypeRef = new AtomicReference<>(null);
bundle = session.write(bundle, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
@ -884,8 +884,8 @@ public class MergeContent extends BinFiles {
public FlowFile merge(ProcessContext context, final ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
final Map<String, byte[]> metadata = new TreeMap<>();
final ObjectHolder<Schema> schema = new ObjectHolder<>(null);
final ObjectHolder<String> inputCodec = new ObjectHolder<>(null);
final AtomicReference<Schema> schema = new AtomicReference<>(null);
final AtomicReference<String> inputCodec = new AtomicReference<>(null);
final DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
// we don't pass the parents to the #create method because the parents belong to different sessions

View File

@ -117,7 +117,6 @@ import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import com.sun.jersey.api.client.ClientResponse.Status;
@ -466,7 +465,7 @@ public class PostHTTP extends AbstractProcessor {
CloseableHttpClient client = null;
final String transactionId = UUID.randomUUID().toString();
final ObjectHolder<String> dnHolder = new ObjectHolder<>("none");
final AtomicReference<String> dnHolder = new AtomicReference<>("none");
while (true) {
FlowFile flowFile = session.get();
if (flowFile == null) {

View File

@ -30,7 +30,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import java.io.File;
@ -40,6 +39,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Base class for PutFTP & PutSFTP
@ -123,7 +123,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
beforePut(flowFile, context, transfer);
final FlowFile flowFileToTransfer = flowFile;
final ObjectHolder<String> fullPathRef = new ObjectHolder<>(null);
final AtomicReference<String> fullPathRef = new AtomicReference<>(null);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {

View File

@ -40,7 +40,6 @@ import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import javax.net.ssl.SSLContext;
@ -55,6 +54,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -321,7 +321,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
final String port = context.getProperty(PORT).getValue();
final String host = context.getProperty(HOSTNAME).getValue();
final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
final AtomicReference<IOException> exceptionHolder = new AtomicReference<>(null);
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
try {

View File

@ -41,7 +41,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
@ -93,6 +92,7 @@ import static java.sql.Types.TIMESTAMP;
import static java.sql.Types.TINYINT;
import static java.sql.Types.VARBINARY;
import static java.sql.Types.VARCHAR;
import java.util.concurrent.atomic.AtomicLong;
@EventDriven
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -296,7 +296,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L);
final AtomicLong nrOfRows = new AtomicLong(0L);
fileToProcess = session.create();
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@ -317,7 +317,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
if (nrOfRows.get() > 0) {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString());
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});

View File

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@ -51,7 +52,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.IntegerHolder;
@EventDriven
@SideEffectFree
@ -182,7 +182,7 @@ public class RouteOnContent extends AbstractProcessor {
final Set<Relationship> destinations = new HashSet<>();
flowFileDestinationMap.put(flowFile, destinations);
final IntegerHolder bufferedByteCount = new IntegerHolder(0);
final AtomicInteger bufferedByteCount = new AtomicInteger(0);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {

View File

@ -54,7 +54,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
import org.apache.nifi.util.search.Search;
@ -219,7 +218,7 @@ public class ScanContent extends AbstractProcessor {
}
final Search<byte[]> finalSearch = search;
final ObjectHolder<SearchTerm<byte[]>> termRef = new ObjectHolder<>(null);
final AtomicReference<SearchTerm<byte[]>> termRef = new AtomicReference<>(null);
termRef.set(null);
session.read(flowFile, new InputStreamCallback() {

View File

@ -32,6 +32,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -63,9 +66,6 @@ import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@SideEffectFree
@ -412,7 +412,7 @@ public class SplitText extends AbstractProcessor {
final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
final AtomicReference<String> errorMessage = new AtomicReference<>(null);
final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
final long startNanos = System.nanoTime();
@ -479,8 +479,8 @@ public class SplitText extends AbstractProcessor {
if (headerInfoLineCount > 0) {
// if we have header lines, create a new FlowFile, copy the header lines to that file,
// and then start copying lines
final IntegerHolder linesCopied = new IntegerHolder(0);
final LongHolder bytesCopied = new LongHolder(0L);
final AtomicInteger linesCopied = new AtomicInteger(0);
final AtomicLong bytesCopied = new AtomicLong(0L);
FlowFile splitFile = session.create(flowFile);
try {
splitFile = session.write(splitFile, new OutputStreamCallback() {

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
@ -53,7 +54,6 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.XmlElementNotifier;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.BooleanHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.Attributes;
@ -160,7 +160,7 @@ public class SplitXml extends AbstractProcessor {
}
}, depth);
final BooleanHolder failed = new BooleanHolder(false);
final AtomicBoolean failed = new AtomicBoolean(false);
session.read(original, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
@ -68,7 +69,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.LongHolder;
// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
@TriggerSerially
@ -463,7 +463,7 @@ public class TailFile extends AbstractProcessor {
FlowFile flowFile = session.create();
final FileChannel fileReader = reader;
final LongHolder positionHolder = new LongHolder(position);
final AtomicLong positionHolder = new AtomicLong(position);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.compress.archivers.ArchiveEntry;
@ -69,7 +70,6 @@ import org.apache.nifi.util.FlowFileUnpackager;
import org.apache.nifi.util.FlowFileUnpackagerV1;
import org.apache.nifi.util.FlowFileUnpackagerV2;
import org.apache.nifi.util.FlowFileUnpackagerV3;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@SideEffectFree
@ -415,7 +415,7 @@ public class UnpackContent extends AbstractProcessor {
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
while (unpackager.hasMoreData()) {
final ObjectHolder<Map<String, String>> attributesRef = new ObjectHolder<>(null);
final AtomicReference<Map<String, String>> attributesRef = new AtomicReference<>(null);
FlowFile unpackedFile = session.create(source);
try {
unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.transform.stream.StreamSource;
@ -49,7 +50,6 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.BooleanHolder;
import org.xml.sax.SAXException;
@EventDriven
@ -128,7 +128,7 @@ public class ValidateXml extends AbstractProcessor {
final ComponentLog logger = getLogger();
for (final FlowFile flowFile : flowFiles) {
final BooleanHolder valid = new BooleanHolder(true);
final AtomicBoolean valid = new AtomicBoolean(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {

View File

@ -230,9 +230,6 @@ public class ListenHTTPServlet extends HttpServlet {
}
}
// remove deprecated FlowFile attribute that was used in older versions of NiFi
attributes.remove("parent.uuid");
hasMoreData.set(unpackager.hasMoreData());
}
}

View File

@ -1,170 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
/**
* Note: {@code Bin} objects are NOT thread safe. If multiple threads access a {@code Bin}, the caller must synchronize
* access.
* @deprecated As of release 0.5.0, replaced by
* {@link org.apache.nifi.processor.util.bin.Bin}
*/
@Deprecated
public class Bin {
private final long creationMomentEpochNs;
private final long minimumSizeBytes;
private final long maximumSizeBytes;
private volatile int minimumEntries = 0;
private volatile int maximumEntries = Integer.MAX_VALUE;
private final String fileCountAttribute;
final List<FlowFileSessionWrapper> binContents = new ArrayList<>();
long size;
int successiveFailedOfferings = 0;
/**
* Constructs a new bin
*
* @param minSizeBytes min bytes
* @param maxSizeBytes max bytes
* @param minEntries min entries
* @param maxEntries max entries
* @param fileCountAttribute num files
* @throws IllegalArgumentException if the min is not less than or equal to the max.
*/
public Bin(final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) {
this.minimumSizeBytes = minSizeBytes;
this.maximumSizeBytes = maxSizeBytes;
this.minimumEntries = minEntries;
this.maximumEntries = maxEntries;
this.fileCountAttribute = fileCountAttribute;
this.creationMomentEpochNs = System.nanoTime();
if (minSizeBytes > maxSizeBytes) {
throw new IllegalArgumentException();
}
}
/**
* Indicates whether the bin has enough items to be considered full. This is based on whether the current size of the bin is greater than the minimum size in bytes and based on having a number of
* successive unsuccessful attempts to add a new item (because it is so close to the max or the size of the objects being attempted do not favor tight packing)
*
* @return true if considered full; false otherwise
*/
public boolean isFull() {
return (((size >= minimumSizeBytes) && binContents.size() >= minimumEntries) && (successiveFailedOfferings > 5))
|| (size >= maximumSizeBytes) || (binContents.size() >= maximumEntries);
}
/**
* Indicates enough size exists to meet the minimum requirements
*
* @return true if full enough
*/
public boolean isFullEnough() {
return isFull() || (size >= minimumSizeBytes && (binContents.size() >= minimumEntries));
}
/**
* Determines if this bin is older than the time specified.
*
* @param duration duration
* @param unit unit
* @return true if this bin is older than the length of time given; false otherwise
*/
public boolean isOlderThan(final int duration, final TimeUnit unit) {
final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
return ageInNanos > TimeUnit.NANOSECONDS.convert(duration, unit);
}
/**
* Determines if this bin is older than the specified bin
*
* @param other other bin
* @return true if this is older than given bin
*/
public boolean isOlderThan(final Bin other) {
return creationMomentEpochNs < other.creationMomentEpochNs;
}
/**
* If this bin has enough room for the size of the given flow file then it is added otherwise it is not
*
* @param flowFile flowfile to offer
* @param session the ProcessSession to which the FlowFile belongs
* @return true if added; false otherwise
*/
public boolean offer(final FlowFile flowFile, final ProcessSession session) {
if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents.size() >= maximumEntries)) {
successiveFailedOfferings++;
return false;
}
if (fileCountAttribute != null) {
final String countValue = flowFile.getAttribute(fileCountAttribute);
final Integer count = toInteger(countValue);
if (count != null) {
final int currentMaximumEntries = this.maximumEntries;
this.maximumEntries = Math.min(count, currentMaximumEntries);
this.minimumEntries = currentMaximumEntries;
}
}
size += flowFile.getSize();
binContents.add(new FlowFileSessionWrapper(flowFile, session));
successiveFailedOfferings = 0;
return true;
}
private static final Pattern intPattern = Pattern.compile("\\d+");
public Integer toInteger(final String value) {
if (value == null) {
return null;
}
if (!intPattern.matcher(value).matches()) {
return null;
}
try {
return Integer.parseInt(value);
} catch (final Exception e) {
return null;
}
}
/**
* @return the underlying list of flow files within this bin
*/
public List<FlowFileSessionWrapper> getContents() {
return binContents;
}
public long getBinAge() {
final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
return TimeUnit.MILLISECONDS.convert(ageInNanos, TimeUnit.NANOSECONDS);
}
}

View File

@ -1,238 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
/**
* This class is thread safe
*
* @deprecated As of release 0.5.0, replaced by
* {@link org.apache.nifi.processor.util.bin.BinManager}
*/
@Deprecated
public class BinManager {
private final AtomicLong minSizeBytes = new AtomicLong(0L);
private final AtomicLong maxSizeBytes = new AtomicLong(Long.MAX_VALUE);
private final AtomicInteger minEntries = new AtomicInteger(0);
private final AtomicInteger maxEntries = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicReference<String> fileCountAttribute = new AtomicReference<>(null);
private final AtomicInteger maxBinAgeSeconds = new AtomicInteger(Integer.MAX_VALUE);
private final Map<String, List<Bin>> groupBinMap = new HashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock rLock = rwLock.readLock();
private final Lock wLock = rwLock.writeLock();
private int binCount = 0; // guarded by read/write lock
public BinManager() {
}
public void purge() {
wLock.lock();
try {
for (final List<Bin> binList : groupBinMap.values()) {
for (final Bin bin : binList) {
for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
wrapper.getSession().rollback();
}
}
}
groupBinMap.clear();
binCount = 0;
} finally {
wLock.unlock();
}
}
public void setFileCountAttribute(final String fileCountAttribute) {
this.fileCountAttribute.set(fileCountAttribute);
}
public void setMinimumEntries(final int minimumEntries) {
this.minEntries.set(minimumEntries);
}
public void setMaximumEntries(final int maximumEntries) {
this.maxEntries.set(maximumEntries);
}
public int getBinCount() {
rLock.lock();
try {
return binCount;
} finally {
rLock.unlock();
}
}
public void setMinimumSize(final long numBytes) {
minSizeBytes.set(numBytes);
}
public void setMaximumSize(final long numBytes) {
maxSizeBytes.set(numBytes);
}
public void setMaxBinAge(final int seconds) {
maxBinAgeSeconds.set(seconds);
}
/**
* Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
* <p/>
* @param groupIdentifier the group to which the flow file belongs; can be null
* @param flowFile the flow file to bin
* @param session the ProcessSession to which the FlowFile belongs
* @return true if added; false if no bin exists which can fit this item and no bin can be created based on current min/max criteria
*/
public boolean offer(final String groupIdentifier, final FlowFile flowFile, final ProcessSession session) {
final long currentMaxSizeBytes = maxSizeBytes.get();
if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing)
return false;
}
wLock.lock();
try {
final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
if (currentBins == null) { // this is a new group we need to register
final List<Bin> bins = new ArrayList<>();
final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get());
bins.add(bin);
groupBinMap.put(groupIdentifier, bins);
binCount++;
return bin.offer(flowFile, session);
} else {
for (final Bin bin : currentBins) {
final boolean accepted = bin.offer(flowFile, session);
if (accepted) {
return true;
}
}
//if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get());
currentBins.add(bin);
binCount++;
return bin.offer(flowFile, session);
}
} finally {
wLock.unlock();
}
}
/**
* Finds all bins that are considered full and removes them from the manager.
* <p/>
* @param relaxFullnessConstraint if false will require bins to be full before considered ready; if true bins only have to meet their minimum size criteria or be 'old' and then they'll be
* considered ready
* @return bins that are considered full
*/
public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
final Map<String, List<Bin>> newGroupMap = new HashMap<>();
final List<Bin> readyBins = new ArrayList<>();
wLock.lock();
try {
for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) {
final List<Bin> remainingBins = new ArrayList<>();
for (final Bin bin : group.getValue()) {
if (relaxFullnessConstraint && (bin.isFullEnough() || bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check
readyBins.add(bin);
} else if (!relaxFullnessConstraint && bin.isFull()) { //strict check
readyBins.add(bin);
} else { //it isn't time yet...
remainingBins.add(bin);
}
}
if (!remainingBins.isEmpty()) {
newGroupMap.put(group.getKey(), remainingBins);
}
}
groupBinMap.clear();
groupBinMap.putAll(newGroupMap);
binCount -= readyBins.size();
} finally {
wLock.unlock();
}
return readyBins;
}
public Bin removeOldestBin() {
wLock.lock();
try {
Bin oldestBin = null;
String oldestBinGroup = null;
for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) {
for (final Bin bin : group.getValue()) {
if (oldestBin == null || bin.isOlderThan(oldestBin)) {
oldestBin = bin;
oldestBinGroup = group.getKey();
}
}
}
if (oldestBin == null) {
return null;
}
binCount--;
final List<Bin> bins = groupBinMap.get(oldestBinGroup);
bins.remove(oldestBin);
if (bins.isEmpty()) {
groupBinMap.remove(oldestBinGroup);
}
return oldestBin;
} finally {
wLock.unlock();
}
}
/**
* @return true if any current bins are older than the allowable max
*/
public boolean containsOldBins() {
rLock.lock();
try {
for (final List<Bin> bins : groupBinMap.values()) {
for (final Bin bin : bins) {
if (bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) {
return true;
}
}
}
} finally {
rLock.unlock();
}
return false;
}
}

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
/**
* @deprecated As of release 0.5.0, replaced by
* {@link org.apache.nifi.processor.util.FlowFileSessionWrapper}
*/
@Deprecated
public class FlowFileSessionWrapper {
private final FlowFile flowFile;
private final ProcessSession session;
public FlowFileSessionWrapper(final FlowFile flowFile, final ProcessSession session) {
this.flowFile = flowFile;
this.session = session;
}
public FlowFile getFlowFile() {
return flowFile;
}
public ProcessSession getSession() {
return session;
}
@Override
public String toString() {
return flowFile.toString();
}
}

View File

@ -93,27 +93,6 @@ public class BcryptCipherProvider extends RandomIVPBECipherProvider {
return logger;
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.
* <p>
* This method is deprecated because while Bcrypt could generate a random salt to use, it would not be returned to the caller of this method and future derivations would fail. Provide a valid
* salt generated by {@link BcryptCipherProvider#generateSalt()}.
* </p>
*
* @param encryptionMethod the {@link EncryptionMethod}
* @param password the secret input
* @param keyLength the desired key length in bits
* @param encryptMode true for encrypt, false for decrypt
* @return the initialized cipher
* @throws Exception if there is a problem initializing the cipher
* @deprecated Provide a salt parameter using {@link BcryptCipherProvider#getCipher(EncryptionMethod, String, byte[], int, boolean)}
*/
@Deprecated
@Override
public Cipher getCipher(EncryptionMethod encryptionMethod, String password, int keyLength, boolean encryptMode) throws Exception {
throw new UnsupportedOperationException("The cipher cannot be initialized without a valid salt. Use BcryptCipherProvider#generateSalt() to generate a valid salt");
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.
*

View File

@ -44,22 +44,6 @@ public class NiFiLegacyCipherProvider extends OpenSSLPKCS5CipherProvider impleme
// Legacy magic number value
private static final int ITERATION_COUNT = 1000;
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived using the NiFi legacy code, based on @see org.apache.nifi.processors.standard.util.crypto
* .OpenSSLPKCS5CipherProvider#getCipher(java.lang.String, java.lang.String, java.lang.String, boolean) [essentially {@code MD5(password || salt) * 1000 }].
*
* @param encryptionMethod the {@link EncryptionMethod}
* @param password the secret input
* @param keyLength the desired key length in bits (ignored because OpenSSL ciphers provide key length in algorithm name)
* @param encryptMode true for encrypt, false for decrypt
* @return the initialized cipher
* @throws Exception if there is a problem initializing the cipher
*/
@Override
public Cipher getCipher(EncryptionMethod encryptionMethod, String password, int keyLength, boolean encryptMode) throws Exception {
return getCipher(encryptionMethod, password, new byte[0], keyLength, encryptMode);
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived using the NiFi legacy code, based on @see org.apache.nifi.processors.standard.util.crypto
* .OpenSSLPKCS5CipherProvider#getCipher(java.lang.String, java.lang.String, java.lang.String, byte[], boolean) [essentially {@code MD5(password || salt) * 1000 }].
@ -145,6 +129,7 @@ public class NiFiLegacyCipherProvider extends OpenSSLPKCS5CipherProvider impleme
out.write(salt);
}
@Override
protected int getIterationCount() {
return ITERATION_COUNT;
}

View File

@ -52,22 +52,6 @@ public class OpenSSLPKCS5CipherProvider implements PBECipherProvider {
private static final String OPENSSL_EVP_HEADER_MARKER = "Salted__";
private static final int OPENSSL_EVP_HEADER_SIZE = 8;
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived using the
* <a href="https://www.openssl.org/docs/manmaster/crypto/EVP_BytesToKey.html">OpenSSL EVP_BytesToKey proprietary KDF</a> [essentially {@code MD5(password || salt) }].
*
* @param encryptionMethod the {@link EncryptionMethod}
* @param password the secret input
* @param keyLength the desired key length in bits (ignored because OpenSSL ciphers provide key length in algorithm name)
* @param encryptMode true for encrypt, false for decrypt
* @return the initialized cipher
* @throws Exception if there is a problem initializing the cipher
*/
@Override
public Cipher getCipher(EncryptionMethod encryptionMethod, String password, int keyLength, boolean encryptMode) throws Exception {
return getCipher(encryptionMethod, password, new byte[0], keyLength, encryptMode);
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived using the
* <a href="https://www.openssl.org/docs/manmaster/crypto/EVP_BytesToKey.html">OpenSSL EVP_BytesToKey proprietary KDF</a> [essentially {@code MD5(password || salt) }].

View File

@ -24,17 +24,6 @@ import java.io.InputStream;
import java.io.OutputStream;
public interface PBECipherProvider extends CipherProvider {
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.
*
* @param encryptionMethod the {@link EncryptionMethod}
* @param password the secret input
* @param keyLength the desired key length in bits
* @param encryptMode true for encrypt, false for decrypt
* @return the initialized cipher
* @throws Exception if there is a problem initializing the cipher
*/
Cipher getCipher(EncryptionMethod encryptionMethod, String password, int keyLength, boolean encryptMode) throws Exception;
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.

View File

@ -101,23 +101,6 @@ public class PBKDF2CipherProvider extends RandomIVPBECipherProvider {
return logger;
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.
*
* The IV can be retrieved by the calling method using {@link Cipher#getIV()}.
*
* @param encryptionMethod the {@link EncryptionMethod}
* @param password the secret input
* @param keyLength the desired key length in bits
* @param encryptMode true for encrypt, false for decrypt
* @return the initialized cipher
* @throws Exception if there is a problem initializing the cipher
*/
@Override
public Cipher getCipher(EncryptionMethod encryptionMethod, String password, int keyLength, boolean encryptMode) throws Exception {
return getCipher(encryptionMethod, password, new byte[0], new byte[0], keyLength, encryptMode);
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.
*

View File

@ -111,26 +111,6 @@ public class ScryptCipherProvider extends RandomIVPBECipherProvider {
return logger;
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.
* <p/>
* This method is deprecated because while Scrypt could generate a random salt to use, it would not be returned to the caller of this method and future derivations would fail. Provide a valid
* salt generated by {@link ScryptCipherProvider#generateSalt()}.
*
* @param encryptionMethod the {@link EncryptionMethod}
* @param password the secret input
* @param keyLength the desired key length in bits
* @param encryptMode true for encrypt, false for decrypt
* @return the initialized cipher
* @throws Exception if there is a problem initializing the cipher
* @deprecated Provide a salt parameter using {@link ScryptCipherProvider#getCipher(EncryptionMethod, String, byte[], int, boolean)}
*/
@Deprecated
@Override
public Cipher getCipher(EncryptionMethod encryptionMethod, String password, int keyLength, boolean encryptMode) throws Exception {
throw new UnsupportedOperationException("The cipher cannot be initialized without a valid salt. Use ScryptCipherProvider#generateSalt() to generate a valid salt");
}
/**
* Returns an initialized cipher for the specified algorithm. The key (and IV if necessary) are derived by the KDF of the implementation.
*

View File

@ -31,7 +31,6 @@ import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
//import org.mindrot.jbcrypt.BCrypt
import org.slf4j.LoggerFactory
import javax.crypto.Cipher
@ -345,16 +344,7 @@ public class BcryptCipherProviderGroovyTest {
// Two different errors -- one explaining the no-salt method is not supported, and the other for an empty byte[] passed
// Act
def msg = shouldFail(UnsupportedOperationException) {
Cipher cipher = cipherProvider.getCipher(encryptionMethod, PASSWORD, DEFAULT_KEY_LENGTH, true);
}
logger.expected(msg)
// Assert
assert msg =~ "The cipher cannot be initialized without a valid salt\\. Use BcryptCipherProvider#generateSalt\\(\\) to generate a valid salt"
// Act
msg = shouldFail(IllegalArgumentException) {
def msg = shouldFail(IllegalArgumentException) {
Cipher cipher = cipherProvider.getCipher(encryptionMethod, PASSWORD, new byte[0], DEFAULT_KEY_LENGTH, true);
}
logger.expected(msg)

View File

@ -349,19 +349,8 @@ public class ScryptCipherProviderGroovyTest {
EncryptionMethod encryptionMethod = EncryptionMethod.AES_CBC
logger.info("Using algorithm: ${encryptionMethod.getAlgorithm()}");
// Two different errors -- one explaining the no-salt method is not supported, and the other for an empty byte[] passed
// Act
def msg = shouldFail(UnsupportedOperationException) {
Cipher cipher = cipherProvider.getCipher(encryptionMethod, PASSWORD, DEFAULT_KEY_LENGTH, true);
}
logger.expected(msg)
// Assert
assert msg =~ "The cipher cannot be initialized without a valid salt\\. Use ScryptCipherProvider#generateSalt\\(\\) to generate a valid salt"
// Act
msg = shouldFail(IllegalArgumentException) {
def msg = shouldFail(IllegalArgumentException) {
Cipher cipher = cipherProvider.getCipher(encryptionMethod, PASSWORD, new byte[0], DEFAULT_KEY_LENGTH, true);
}
logger.expected(msg)

View File

@ -29,7 +29,6 @@ import org.apache.nifi.processors.standard.syslog.SyslogEvent;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -48,6 +47,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
@ -399,7 +399,7 @@ public class TestListenSyslog {
// Add message that will throw a FlowFileAccessException the first time that we attempt to read
// the contents but will succeed the second time.
final IntegerHolder getMessageAttempts = new IntegerHolder(0);
final AtomicInteger getMessageAttempts = new AtomicInteger(0);
msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") {
@Override
public byte[] getData() {

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import java.text.MessageFormat;
import org.apache.nifi.components.AllowableValue;
/**
* An immutable object for holding information about a database system.
*
*/
@Deprecated
public class DatabaseSystemDescriptor extends AllowableValue {
public final String driverClassName;
public final Integer defaultPort;
public final String urlTemplate;
public final boolean internalDriverJar;
public DatabaseSystemDescriptor(String value, String description, String driverClassName, Integer defaultPort, String urlTemplate, boolean internalDriverJar) {
super(value, value, description);
if (defaultPort==null)
throw new IllegalArgumentException("defaultPort cannot be null");
this.driverClassName = driverClassName;
this.defaultPort = defaultPort;
this.urlTemplate = urlTemplate;
this.internalDriverJar = internalDriverJar;
}
public String buildUrl(String host, Integer port, String dbname) {
return MessageFormat.format(urlTemplate, host, port.toString(), dbname);
}
}

View File

@ -1,83 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
@Deprecated
public class DatabaseSystems {
/**
* Currently contain only few known Database systems.
* Please help to expand this list.
*
* Please be ensure that all JDBC drivers are license-compatible with Apache.
* http://www.apache.org/legal/resolved.html
* If not include them in "JDBC driver jar must be loaded from external location" section
* and do not include actual driver in NiFi distribution (don't include driver in pom.xml file)
*
* {0} host name/ip
* {1} port number
* {2} database name
*
* for example url template
* "jdbc:postgresql://{0}:{1}/{2}"
* will be after building
* "jdbc:postgresql://bighost:5432/Trove"
*/
public static DatabaseSystemDescriptor[] knownDatabaseSystems = {
// ================= JDBC driver jar should be included in nar (in pom.xml dependencies) =======================
new DatabaseSystemDescriptor("Postgres", "PostgreSQL open soure object-relational database.",
"org.postgresql.Driver", 5432, "jdbc:postgresql://{0}:{1}/{2}", true),
new DatabaseSystemDescriptor("JavaDB", "Java DB is Oracle's supported distribution of the Apache Derby open source database. Included in JDK.",
"org.apache.derby.jdbc.EmbeddedDriver", 1, "jdbc:derby:{2};create=true", true),
new DatabaseSystemDescriptor("Derby", "Apache Derby is an open source relational database.",
"org.apache.derby.jdbc.EmbeddedDriver", 1, "jdbc:derby:{2};create=true", true),
// ================= JDBC driver jar must be loaded from external location =======================
// Such drivers cannot be included in NiFi distribution because are not license-compatible with Apache.
new DatabaseSystemDescriptor("MariaDB",
"MariaDB is a community-developed fork of the MySQL relational database management system intended to remain free under the GNU GPL.",
"org.mariadb.jdbc.Driver", 3306, "jdbc:mariadb://{0}:{1}/{2}", false),
new DatabaseSystemDescriptor("Oracle",
"Oracle Database is an object-relational database management system.",
"oracle.jdbc.OracleDriver", 1521, "jdbc:oracle:thin:@//{0}:{1}/{2}", false),
new DatabaseSystemDescriptor("Sybase",
"Sybase is an relational database management system.",
"com.sybase.jdbc3.jdbc.SybDriver", 5000, "jdbc:sybase:Tds:{0}:{1}/{2}", false),
// ================= Unknown JDBC driver, user must provide connection details =====================
new DatabaseSystemDescriptor("Other DB", "Other JDBC compliant JDBC driver",
null, 1, null, false),
};
public static DatabaseSystemDescriptor getDescriptor(String name) {
for ( DatabaseSystemDescriptor descr : DatabaseSystems.knownDatabaseSystems) {
if (descr.getValue().equalsIgnoreCase(name))
return descr;
}
throw new IllegalArgumentException("Can't find DatabaseSystemDescriptor by name " + name);
}
}

View File

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import static org.apache.nifi.dbcp.DatabaseSystems.getDescriptor;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class TestDatabaseSystems {
@Test
public void testKnownDatabaseSystems() {
assertEquals("jdbc:postgresql://bighost:5432/Trove", getDescriptor("Postgres").buildUrl("bighost",5432,"Trove"));
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.ssl;
import org.apache.nifi.ssl.SSLContextService;
import java.util.ArrayList;
import java.util.List;
@ -25,7 +24,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
public class TestProcessor extends AbstractProcessor {
@ -39,7 +37,7 @@ public class TestProcessor extends AbstractProcessor {
propDescs.add(new PropertyDescriptor.Builder()
.name("SSL Context Svc ID")
.description("ID of SSL Context Svc")
.addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
.identifiesControllerService(SSLContextService.class)
.required(true)
.build());
return propDescs;