This commit is contained in:
joewitt 2015-04-27 13:25:44 -04:00
parent 666de3d410
commit 6a706458d0
50 changed files with 372 additions and 455 deletions

View File

@ -253,7 +253,7 @@
<module name="Checker">
<property name="charset" value="UTF-8" />
<property name="severity" value="warning" />
<!-- Checks for whitespace -->
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
<module name="FileTabCharacter">
<property name="eachLine" value="true" />
@ -267,10 +267,6 @@
<property name="format" value="[@]see\s+[{][@]link" />
<property name="message" value="Javadoc @see does not need @link: pick one or the other." />
</module>
<module name="RegexpSinglelineJava">
<property name="format" value="jline[.]internal[.]Preconditions" />
<property name="message" value="Please use Guava Preconditions not JLine" />
</module>
<module name="OuterTypeFilename" />
<module name="LineLength">
<!-- needs extra, because Eclipse formatter ignores the ending left brace -->

View File

@ -225,8 +225,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
try {
binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
} catch (final ProcessException e) {
logger.
error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
@ -294,8 +293,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
if (context.getProperty(MAX_SIZE).isSet()) {
binManager.setMaximumSize(context.getProperty(MAX_SIZE).
asDataSize(DataUnit.B).longValue());
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
} else {
binManager.setMaximumSize(Long.MAX_VALUE);
}
@ -313,8 +311,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
@Override
protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.
customValidate(context));
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
@ -330,10 +327,8 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
);
}
final Long min = context.getProperty(MIN_ENTRIES).
asLong();
final Long max = context.getProperty(MAX_ENTRIES).
asLong();
final Long min = context.getProperty(MIN_ENTRIES).asLong();
final Long max = context.getProperty(MAX_ENTRIES).asLong();
if (min != null && max != null) {
if (min > max) {

View File

@ -146,8 +146,7 @@ public class CompressContent extends AbstractProcessor {
mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP);
mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2);
mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
this.compressionFormatMimeTypeMap = Collections.
unmodifiableMap(mimeTypeMap);
this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
}
@Override
@ -286,8 +285,7 @@ public class CompressContent extends AbstractProcessor {
final long sizeAfterCompression = flowFile.getSize();
if (MODE_DECOMPRESS.equalsIgnoreCase(compressionMode)) {
flowFile = session.
removeAttribute(flowFile, CoreAttributes.MIME_TYPE.key());
flowFile = session.removeAttribute(flowFile, CoreAttributes.MIME_TYPE.key());
if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
@ -296,8 +294,7 @@ public class CompressContent extends AbstractProcessor {
}
}
} else {
flowFile = session.
putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());

View File

@ -156,10 +156,10 @@ public class ControlRate extends AbstractProcessor {
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
if (rateAttr == null) {
validationResults.add(new ValidationResult.Builder().
subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()).
explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'").
build());
validationResults.add(new ValidationResult.Builder()
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
.explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
.build());
}
break;
case FLOWFILE_RATE:
@ -281,8 +281,7 @@ public class ControlRate extends AbstractProcessor {
throttle.lock();
try {
if (throttle.tryAdd(rateValue)) {
logger.
info("transferring {} to 'success'", new Object[]{flowFile});
logger.info("transferring {} to 'success'", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
} else {
flowFile = session.penalize(flowFile);

View File

@ -16,6 +16,13 @@
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessorInitializationContext;
@ -34,13 +41,16 @@ import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import java.io.*;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -117,12 +127,8 @@ public class ConvertCharacterSet extends AbstractProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ProcessorLog logger = getLogger();
final Charset inputCharset = Charset.forName(context.
getProperty(INPUT_CHARSET).
getValue());
final Charset outputCharset = Charset.forName(context.
getProperty(OUTPUT_CHARSET).
getValue());
final Charset inputCharset = Charset.forName(context.getProperty(INPUT_CHARSET).getValue());
final Charset outputCharset = Charset.forName(context.getProperty(OUTPUT_CHARSET).getValue());
final CharBuffer charBuffer = CharBuffer.allocate(MAX_BUFFER_SIZE);
final CharsetDecoder decoder = inputCharset.newDecoder();

View File

@ -117,11 +117,11 @@ public class DistributeLoad extends AbstractProcessor {
}
}).build();
public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
.name("Load Distribution Service ID").
description("The identifier of the Load Distribution Service").
required(true).
identifiesControllerService(LoadDistributionService.class).
build();
.name("Load Distribution Service ID")
.description("The identifier of the Load Distribution Service")
.required(true)
.identifiesControllerService(LoadDistributionService.class)
.build();
private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
@ -327,8 +327,7 @@ public class DistributeLoad extends AbstractProcessor {
final List<Relationship> relationshipList = new ArrayList<>();
for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) {
final String relationshipName = String.valueOf(entry.getKey());
final Relationship relationship = new Relationship.Builder().
name(relationshipName).build();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
for (int i = 0; i < entry.getValue(); i++) {
relationshipList.add(relationship);
}
@ -386,8 +385,8 @@ public class DistributeLoad extends AbstractProcessor {
private static interface DistributionStrategy {
/**
* @param session session
* @param flowFiles flowFile
* @param context context
* @param flowFile flowFile
* @return a mapping of FlowFile to Relationship or <code>null</code> if the needed relationships are not available to accept files
*/
Relationship mapToRelationship(ProcessContext context, FlowFile flowFile);

View File

@ -151,8 +151,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>(super.
customValidate(context));
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
final String destination = context.getProperty(DESTINATION).getValue();
if (DESTINATION_CONTENT.equals(destination)) {
@ -165,8 +164,8 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
}
if (jsonPathCount != 1) {
results.add(new ValidationResult.Builder().subject("JsonPaths").valid(false).
explanation("Exactly one JsonPath must be set if using destination of " + DESTINATION_CONTENT).build());
results.add(new ValidationResult.Builder().subject("JsonPaths").valid(false)
.explanation("Exactly one JsonPath must be set if using destination of " + DESTINATION_CONTENT).build());
}
}
@ -185,18 +184,17 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder().name(propertyDescriptorName).expressionLanguageSupported(false).
addValidator(new JsonPathValidator() {
@Override
public void cacheComputedValue(String subject, String input, JsonPath computedJsonPath) {
cachedJsonPathMap.put(input, computedJsonPath);
}
return new PropertyDescriptor.Builder().name(propertyDescriptorName).expressionLanguageSupported(false).addValidator(new JsonPathValidator() {
@Override
public void cacheComputedValue(String subject, String input, JsonPath computedJsonPath) {
cachedJsonPathMap.put(input, computedJsonPath);
}
@Override
public boolean isStale(String subject, String input) {
return cachedJsonPathMap.get(input) == null;
}
}).required(false).dynamic(true).build();
@Override
public boolean isStale(String subject, String input) {
return cachedJsonPathMap.get(input) == null;
}
}).required(false).dynamic(true).build();
}
@Override
@ -235,8 +233,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
final ProcessorLog logger = getLogger();
String representationOption = processContext.
getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
/* Build the JsonPath expressions from attributes */
@ -309,8 +306,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
@Override
public void process(final OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(resultRepresentation.
getBytes(StandardCharsets.UTF_8));
outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
}
}
});

View File

@ -165,11 +165,9 @@ public class EvaluateXPath extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>(super.
customValidate(context));
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
final String destination = context.getProperty(DESTINATION).
getValue();
final String destination = context.getProperty(DESTINATION).getValue();
if (DESTINATION_CONTENT.equals(destination)) {
int xpathCount = 0;
@ -356,8 +354,7 @@ public class EvaluateXPath extends AbstractProcessor {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
out.write(resultString.
getBytes("UTF-8"));
out.write(resultString.getBytes("UTF-8"));
}
}
});

View File

@ -184,8 +184,7 @@ public class EvaluateXQuery extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>(super.
customValidate(context));
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
final String destination = context.getProperty(DESTINATION).getValue();
if (DESTINATION_CONTENT.equals(destination)) {
@ -311,8 +310,7 @@ public class EvaluateXQuery extends AbstractProcessor {
}
} else { // if (DESTINATION_CONTENT.equals(destination)){
if (result.size() == 0) {
logger.
info("Routing {} to 'unmatched'", new Object[]{flowFile});
logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
session.transfer(flowFile, REL_NO_MATCH);
continue flowFileLoop;
} else if (result.size() == 1) {

View File

@ -163,8 +163,7 @@ public class ExecuteProcess extends AbstractProcessor {
if (inQuotes) {
sb.append(c);
} else {
final String arg = sb.toString().
trim();
final String arg = sb.toString().trim();
if (!arg.isEmpty()) {
args.add(arg);
}
@ -377,13 +376,11 @@ public class ExecuteProcess extends AbstractProcessor {
}
final int exitCode;
final long millis = TimeUnit.NANOSECONDS.
toMillis(System.nanoTime() - startNanos);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
try {
exitCode = process.waitFor();
} catch (final InterruptedException ie) {
getLogger().
warn("Process was interrupted before finishing");
getLogger().warn("Process was interrupted before finishing");
return;
}

View File

@ -121,10 +121,10 @@ import org.apache.nifi.stream.io.StreamUtils;
@WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")})
public class ExecuteStreamCommand extends AbstractProcessor {
public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder().
name("original").
description("FlowFiles that were successfully processed").
build();
public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder()
.name("original")
.description("FlowFiles that were successfully processed")
.build();
public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder()
.name("output stream")
.description("The destination path for the flow file created from the command's output")
@ -139,8 +139,8 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true);
static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder().
name("Command Path")
static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder()
.name("Command Path")
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
.expressionLanguageSupported(true)
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
@ -158,8 +158,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.subject(subject).valid(true).input(input).build();
String[] args = input.split(";");
for (String arg : args) {
ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.
validate(subject, arg, context);
ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
if (!valResult.isValid()) {
result = valResult;
break;
@ -255,8 +254,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
session.read(flowFile, callback);
outputStreamFlowFile = callback.outputStreamFlowFile;
exitCode = callback.exitCode;
logger.
debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
Map<String, String> attributes = new HashMap<>();
@ -328,8 +326,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
try {
StreamUtils.copy(incomingFlowFileIS, stdInWritable);
} catch (IOException e) {
logger.
error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
}
// MUST close the output stream to the stdIn so that whatever is reading knows
// there is no more data

View File

@ -272,8 +272,7 @@ public class ExtractText extends AbstractProcessor {
final Map<String, Pattern> patternMap = compiledPattersMapRef.get();
for (final Map.Entry<String, Pattern> entry : patternMap.entrySet()) {
final Matcher matcher = entry.getValue().
matcher(contentString);
final Matcher matcher = entry.getValue().matcher(contentString);
if (matcher.find()) {
final String baseKey = entry.getKey();

View File

@ -149,8 +149,7 @@ public class GenerateFlowFile extends AbstractProcessor {
data = this.data.get();
}
for (int i = 0; i < context.getProperty(BATCH_SIZE).
asInteger(); i++) {
for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
FlowFile flowFile = session.create();
if (data.length > 0) {
flowFile = session.write(flowFile, new OutputStreamCallback() {

View File

@ -344,8 +344,7 @@ public class GetFile extends AbstractProcessor {
if (store.supportsFileAttributeView("posix")) {
try {
PosixFileAttributeView view = Files.getFileAttributeView(file, PosixFileAttributeView.class);
attributes.
put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
} catch (Exception ignore) {
} // allow other attributes if these fail
@ -425,8 +424,7 @@ public class GetFile extends AbstractProcessor {
flowFile = session.create();
final long importStart = System.nanoTime();
flowFile = session.
importFrom(filePath, keepingSourceFile, flowFile);
flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
final long importNanos = System.nanoTime() - importStart;
final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);

View File

@ -268,8 +268,7 @@ public class GetJMSTopic extends JmsConsumer {
final String serverUrl = props.getProperty(URL.getName());
final String username = props.getProperty(USERNAME.getName());
final String encryptedPassword = props.getProperty(PASSWORD.getName());
final String subscriptionName = props.
getProperty(SUBSCRIPTION_NAME_PROPERTY);
final String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY);
final String jmsProvider = props.getProperty(JMS_PROVIDER.getName());
final String password = encryptedPassword == null ? null : context.decrypt(encryptedPassword);

View File

@ -104,8 +104,7 @@ import com.sun.jersey.api.client.ClientResponse.Status;
public class HandleHttpRequest extends AbstractProcessor {
public static final String HTTP_CONTEXT_ID = "http.context.identifier";
private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.
compile("&");
private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&");
// Allowable values for client auth
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
@ -174,13 +173,13 @@ public class HandleHttpRequest extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor ALLOW_PUT = new PropertyDescriptor.Builder().
name("Allow PUT").
description("Allow HTTP PUT Method").
required(true).
allowableValues("true", "false").
defaultValue("true").
build();
public static final PropertyDescriptor ALLOW_PUT = new PropertyDescriptor.Builder()
.name("Allow PUT")
.description("Allow HTTP PUT Method")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor ALLOW_DELETE = new PropertyDescriptor.Builder()
.name("Allow DELETE")
.description("Allow HTTP DELETE Method")

View File

@ -154,8 +154,7 @@ public class HandleHttpResponse extends AbstractProcessor {
response.flushBuffer();
} catch (final IOException ioe) {
session.transfer(flowFile, REL_FAILURE);
getLogger().
error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, ioe});
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, ioe});
return;
}

View File

@ -129,8 +129,7 @@ public class HashAttribute extends AbstractProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private final AtomicReference<Map<String, Pattern>> regexMapRef = new AtomicReference<>(Collections.
<String, Pattern>emptyMap());
private final AtomicReference<Map<String, Pattern>> regexMapRef = new AtomicReference<>(Collections.<String, Pattern>emptyMap());
@Override
protected void init(final ProcessorInitializationContext context) {
@ -157,12 +156,7 @@ public class HashAttribute extends AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName).
addValidator(StandardValidators.
createRegexValidator(0, 1, false)).
required(false).
dynamic(true).
build();
.name(propertyDescriptorName).addValidator(StandardValidators.createRegexValidator(0, 1, false)).required(false).dynamic(true).build();
}
@Override

View File

@ -244,17 +244,16 @@ public final class InvokeHTTP extends AbstractProcessor {
.identifiesControllerService(SSLContextService.class)
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.
unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
PROP_SSL_CONTEXT_SERVICE,
PROP_CONNECT_TIMEOUT,
PROP_READ_TIMEOUT,
PROP_DATE_HEADER,
PROP_FOLLOW_REDIRECTS,
PROP_ATTRIBUTES_TO_SEND
));
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
PROP_SSL_CONTEXT_SERVICE,
PROP_CONNECT_TIMEOUT,
PROP_READ_TIMEOUT,
PROP_DATE_HEADER,
PROP_FOLLOW_REDIRECTS,
PROP_ATTRIBUTES_TO_SEND
));
// property to allow the hostname verifier to be overridden
// this is a "hidden" property - it's configured using a dynamic user property
@ -559,8 +558,7 @@ public final class InvokeHTTP extends AbstractProcessor {
private Map<String, String> convertAttributesFromHeaders() throws IOException {
// create a new hashmap to store the values from the connection
Map<String, String> map = new HashMap<>();
for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().
entrySet()) {
for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) {
String key = entry.getKey();
if (key == null) {
continue;

View File

@ -175,7 +175,8 @@ public abstract class JmsConsumer extends AbstractProcessor {
}
}
public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception {
public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger)
throws Exception {
// Currently not very useful, because always one Message == one FlowFile
final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
@ -186,8 +187,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
} // all other message types, write Message body to FlowFile content
else {
} else { // all other message types, write Message body to FlowFile content
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {

View File

@ -226,11 +226,11 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
} catch (SocketException e) {
}
}
public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder().
name("Local Network Interface").
description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN."
+ "May be a system property or an environment variable.").
addValidator(new Validator() {
public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder()
.name("Local Network Interface")
.description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN."
+ "May be a system property or an environment variable.")
.addValidator(new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult result = new ValidationResult.Builder()
@ -257,7 +257,8 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
return result;
}
}).expressionLanguageSupported(true).build();
})
.expressionLanguageSupported(true).build();
static {
List<PropertyDescriptor> props = new ArrayList<>();
@ -303,102 +304,100 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
/**
* Create the ChannelListener and a thread that causes the Consumer to create flow files.
*
* @param context
* @throws IOException
* @param context context
* @throws IOException ex
*/
@OnScheduled
public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException {
getChannelListener(context);
stopping.set(false);
Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService.
submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() {
Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService.submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() {
@Override
public Tuple<ProcessSession, List<FlowFile>> call() {
final int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger();
final long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
// number of waits in 5 secs, or 1
final int maxWaits = (int) (channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1);
final ProcessorLog logger = getLogger();
int flowFileCount = maxFlowFilesPerSession;
ProcessSession session = null;
int numWaits = 0;
while (!stopping.get()) {
UDPStreamConsumer consumer = consumerRef.get();
if (consumer == null || sessionFactoryRef.get() == null) {
try {
Thread.sleep(100L);
} catch (InterruptedException swallow) {
@Override
public Tuple<ProcessSession, List<FlowFile>> call() {
final int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger();
final long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
// number of waits in 5 secs, or 1
final int maxWaits = (int) (channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1);
final ProcessorLog logger = getLogger();
int flowFileCount = maxFlowFilesPerSession;
ProcessSession session = null;
int numWaits = 0;
while (!stopping.get()) {
UDPStreamConsumer consumer = consumerRef.get();
if (consumer == null || sessionFactoryRef.get() == null) {
try {
Thread.sleep(100L);
} catch (InterruptedException swallow) {
}
} else {
try {
// first time through, flowFileCount is maxFlowFilesPerSession so that a session
// is created and the consumer is updated with it.
if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) {
logger.debug("Have waited {} times", new Object[]{numWaits});
numWaits = 0;
if (session != null) {
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
newFlowFiles.clear();
flowFilesPerSessionQueue.add(flowFilesPerSession);
}
session = sessionFactoryRef.get().createSession();
consumer.setSession(session);
flowFileCount = 0;
}
// this will throttle the processing of the received datagrams. If there are no more
// buffers to read into because none have been returned to the pool via consumer.process(),
// then the desired back pressure on the channel is created.
if (context.getAvailableRelationships().size() > 0) {
consumer.process();
if (flowFileCount == newFlowFiles.size()) {
// no new datagrams received, need to throttle this thread back so it does
// not consume all cpu...but don't want to cause back pressure on the channel
// so the sleep time is same as the reader interval
// If have done this for approx. 5 secs, assume datagram sender is down. So, push
// out the remaining flow files (see numWaits == maxWaits above)
Thread.sleep(channelReaderIntervalMSecs);
if (flowFileCount > 0) {
numWaits++;
}
} else {
flowFileCount = newFlowFiles.size();
}
} else {
try {
// first time through, flowFileCount is maxFlowFilesPerSession so that a session
// is created and the consumer is updated with it.
if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) {
logger.debug("Have waited {} times", new Object[]{numWaits});
numWaits = 0;
if (session != null) {
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
newFlowFiles.clear();
flowFilesPerSessionQueue.
add(flowFilesPerSession);
}
session = sessionFactoryRef.get().createSession();
consumer.setSession(session);
flowFileCount = 0;
}
// this will throttle the processing of the received datagrams. If there are no more
// buffers to read into because none have been returned to the pool via consumer.process(),
// then the desired back pressure on the channel is created.
if (context.getAvailableRelationships().size() > 0) {
consumer.process();
if (flowFileCount == newFlowFiles.size()) {
// no new datagrams received, need to throttle this thread back so it does
// not consume all cpu...but don't want to cause back pressure on the channel
// so the sleep time is same as the reader interval
// If have done this for approx. 5 secs, assume datagram sender is down. So, push
// out the remaining flow files (see numWaits == maxWaits above)
Thread.sleep(channelReaderIntervalMSecs);
if (flowFileCount > 0) {
numWaits++;
}
} else {
flowFileCount = newFlowFiles.size();
}
} else {
logger.debug("Creating back pressure...no available destinations");
Thread.sleep(1000L);
}
} catch (final IOException ioe) {
logger.error("Unable to fully process consumer {}", new Object[]{consumer}, ioe);
} catch (InterruptedException e) {
// don't care
} finally {
if (consumer.isConsumerFinished()) {
logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
consumerRef.set(null);
disconnect();
if (!stopping.get()) {
resetChannelListener.set(true);
}
}
logger.debug("Creating back pressure...no available destinations");
Thread.sleep(1000L);
}
} catch (final IOException ioe) {
logger.error("Unable to fully process consumer {}", new Object[]{consumer}, ioe);
} catch (InterruptedException e) {
// don't care
} finally {
if (consumer.isConsumerFinished()) {
logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
consumerRef.set(null);
disconnect();
if (!stopping.get()) {
resetChannelListener.set(true);
}
}
}
// when shutting down, need consumer to drain rest of cached buffers and clean up.
// prior to getting here, the channelListener was shutdown
UDPStreamConsumer consumer;
while ((consumer = consumerRef.get()) != null && !consumer.isConsumerFinished()) {
try {
consumer.process();
} catch (IOException swallow) {
// if this is blown...consumer.isConsumerFinished will be true
}
}
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
return flowFilesPerSession;
}
});
}
// when shutting down, need consumer to drain rest of cached buffers and clean up.
// prior to getting here, the channelListener was shutdown
UDPStreamConsumer consumer;
while ((consumer = consumerRef.get()) != null && !consumer.isConsumerFinished()) {
try {
consumer.process();
} catch (IOException swallow) {
// if this is blown...consumer.isConsumerFinished will be true
}
}
Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
return flowFilesPerSession;
}
});
consumerFutureRef.set(consumerFuture);
}
@ -434,8 +433,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
@Override
public StreamConsumer newInstance(final String streamId) {
final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.
intValue(), getLogger());
final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger());
consumerRef.set(consumer);
return consumer;
}

View File

@ -327,8 +327,7 @@ public class MergeContent extends BinFiles {
protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
final ProcessSession session) throws ProcessException {
final String mergeFormat = context.getProperty(MERGE_FORMAT).
getValue();
final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
MergeBin merger;
switch (mergeFormat) {
case MERGE_FORMAT_TAR_VALUE:
@ -458,8 +457,7 @@ public class MergeContent extends BinFiles {
return false;
}
return NUMBER_PATTERN.matcher(value).
matches();
return NUMBER_PATTERN.matcher(value).matches();
}
private class BinaryConcatenationMerge implements MergeBin {

View File

@ -418,9 +418,8 @@ public class PostHTTP extends AbstractProcessor {
try {
new java.net.URL(url);
} catch (final MalformedURLException e) {
logger.
error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure",
new Object[]{flowFile, url});
logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure",
new Object[]{flowFile, url});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
continue;
@ -442,29 +441,28 @@ public class PostHTTP extends AbstractProcessor {
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
clientBuilder.setConnectionManager(conMan);
clientBuilder.setUserAgent(userAgent);
clientBuilder.
addInterceptorFirst(new HttpResponseInterceptor() {
@Override
public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
if (!conn.isOpen()) {
return;
}
clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
@Override
public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
if (!conn.isOpen()) {
return;
}
SSLSession sslSession = conn.getSSLSession();
SSLSession sslSession = conn.getSSLSession();
if (sslSession != null) {
final X509Certificate[] certChain = sslSession.getPeerCertificateChain();
if (certChain == null || certChain.length == 0) {
throw new SSLPeerUnverifiedException("No certificates found");
}
final X509Certificate cert = certChain[0];
dnHolder.set(cert.getSubjectDN().getName().trim());
}
if (sslSession != null) {
final X509Certificate[] certChain = sslSession.getPeerCertificateChain();
if (certChain == null || certChain.length == 0) {
throw new SSLPeerUnverifiedException("No certificates found");
}
});
final X509Certificate cert = certChain[0];
dnHolder.set(cert.getSubjectDN().getName().trim());
}
}
});
clientBuilder.disableAutomaticRetries();
clientBuilder.disableContentCompression();
@ -783,8 +781,7 @@ public class PostHTTP extends AbstractProcessor {
if (!isScheduled()) {
context.yield();
logger.
warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription});
logger.warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);

View File

@ -274,8 +274,7 @@ public class PutEmail extends AbstractProcessor {
final String bcc = context.getProperty(BCC).getValue();
if (to == null && cc == null && bcc == null) {
errors.add(new ValidationResult.Builder().subject("To, CC, BCC").
valid(false).explanation("Must specify at least one To/CC/BCC address").build());
errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build());
}
return errors;
@ -390,8 +389,7 @@ public class PutEmail extends AbstractProcessor {
final ProcessorLog logger = this.getLogger();
for (Entry<String, PropertyDescriptor> entry : propertyToContext.
entrySet()) {
for (Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) {
// Evaluate the property descriptor against the flow file
String flowFileValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions(flowFile).getValue();

View File

@ -57,10 +57,8 @@ import org.apache.nifi.processors.standard.util.FTPTransfer;
+ " you leave off the .")})
public class PutFTP extends PutFileTransfer<FTPTransfer> {
private static final Pattern PRE_SEND_CMD_PATTERN = Pattern.
compile("^pre\\.cmd\\.(\\d+)$");
private static final Pattern POST_SEND_CMD_PATTERN = Pattern.
compile("^post\\.cmd\\.(\\d+)$");
private static final Pattern PRE_SEND_CMD_PATTERN = Pattern.compile("^pre\\.cmd\\.(\\d+)$");
private static final Pattern POST_SEND_CMD_PATTERN = Pattern.compile("^post\\.cmd\\.(\\d+)$");
private final AtomicReference<List<PropertyDescriptor>> preSendDescriptorRef = new AtomicReference<>();
private final AtomicReference<List<PropertyDescriptor>> postSendDescriptorRef = new AtomicReference<>();
@ -109,8 +107,7 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> {
@Override
protected void afterPut(final FlowFile flowFile, final ProcessContext context, final FTPTransfer transfer) throws IOException {
transfer.
sendCommands(getCommands(postSendDescriptorRef.get(), context, flowFile), flowFile);
transfer.sendCommands(getCommands(postSendDescriptorRef.get(), context, flowFile), flowFile);
}
@Override

View File

@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
/**
* Base class for PutFTP & PutSFTP
*
* @param <T>
* @param <T> type of transfer
*/
public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractProcessor {
@ -181,7 +181,14 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
}
//Attempts to identify naming or content issues with files before they are transferred.
private ConflictResult identifyAndResolveConflictFile(final String conflictResolutionType, final T transfer, final String path, final FlowFile flowFile, final boolean rejectZeroByteFiles, final ProcessorLog logger) throws IOException {
private ConflictResult identifyAndResolveConflictFile(
final String conflictResolutionType,
final T transfer,
final String path,
final FlowFile flowFile,
final boolean rejectZeroByteFiles,
final ProcessorLog logger)
throws IOException {
Relationship destinationRelationship = REL_SUCCESS;
String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
boolean transferFile = true;

View File

@ -336,8 +336,7 @@ public class PutJMS extends AbstractProcessor {
final String key = entry.getKey();
final String value = entry.getValue();
if (key.toLowerCase().
startsWith(ATTRIBUTE_PREFIX.toLowerCase()) && !key.toLowerCase().endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
if (key.toLowerCase().startsWith(ATTRIBUTE_PREFIX.toLowerCase()) && !key.toLowerCase().endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
final String jmsPropName = key.substring(ATTRIBUTE_PREFIX.length());
final String type = attributes.get(key + ATTRIBUTE_TYPE_SUFFIX);

View File

@ -76,12 +76,10 @@ public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
if (SFTPTransfer.DISABLE_DIRECTORY_LISTING.getName().
equalsIgnoreCase(propertyDescriptorName)) {
if (SFTPTransfer.DISABLE_DIRECTORY_LISTING.getName().equalsIgnoreCase(propertyDescriptorName)) {
return SFTPTransfer.DISABLE_DIRECTORY_LISTING;
}
return super.
getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
}
@Override

View File

@ -49,7 +49,11 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -58,7 +62,8 @@ import java.util.regex.Pattern;
@SideEffectFree
@SupportsBatching
@Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex"})
@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of the content that matches the Regular Expression with some alternate value.")
@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of "
+ "the content that matches the Regular Expression with some alternate value.")
public class ReplaceText extends AbstractProcessor {
//Constants
@ -77,7 +82,8 @@ public class ReplaceText extends AbstractProcessor {
.build();
public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
.name("Replacement Value")
.description("The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.")
.description("The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but "
+ "back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.")
.required(true)
.defaultValue("$1")
.addValidator(Validator.VALID)
@ -92,15 +98,20 @@ public class ReplaceText extends AbstractProcessor {
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+ "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'")
.description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to "
+ "apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, "
+ "the FlowFile will be routed to 'failure'. "
+ "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value "
+ "of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. "
+ "This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder()
.name("Evaluation Mode")
.description("Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.")
.description("Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and "
+ "then evaluate the 'Regular Expression'.")
.allowableValues(LINE_BY_LINE, ENTIRE_TEXT)
.defaultValue(ENTIRE_TEXT)
.required(true)
@ -108,7 +119,8 @@ public class ReplaceText extends AbstractProcessor {
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression")
.description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not "
+ "match the given Regular Expression")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
@ -205,7 +217,7 @@ public class ReplaceText extends AbstractProcessor {
final int originalBackRefIndex = Integer.parseInt(backRefNum);
int backRefIndex = originalBackRefIndex;
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
// then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
// we want to truncate the 1 and get 0.

View File

@ -68,7 +68,8 @@ import org.apache.commons.lang3.StringUtils;
@SideEffectFree
@SupportsBatching
@Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex", "Mapping"})
@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of the content that matches the Regular Expression with some alternate value provided in a mapping file.")
@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of the content that "
+ "matches the Regular Expression with some alternate value provided in a mapping file.")
public class ReplaceTextWithMapping extends AbstractProcessor {
public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder()
@ -109,7 +110,8 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. If a FlowFile is larger than this value, the FlowFile will be routed to 'failure'")
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. If a FlowFile is larger "
+ "than this value, the FlowFile will be routed to 'failure'")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
@ -270,13 +272,6 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
}
}
/**
* Loads a file containing mappings.
*
* @param is
* @return
* @throws IOException
*/
protected Map<String, String> loadMappingFile(InputStream is) throws IOException {
Map<String, String> mapping = new HashMap<>();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));

View File

@ -77,7 +77,8 @@ public class RouteOnAttribute extends AbstractProcessor {
"A copy of the FlowFile will be routed to each relationship whose corresponding expression evaluates to 'true'");
public static final AllowableValue ROUTE_ALL_MATCH = new AllowableValue(routeAllMatchValue, "Route to 'matched' if all match",
"Requires that all user-defined expressions evaluate to 'true' for the FlowFile to be considered a match");
public static final AllowableValue ROUTE_ANY_MATCHES = new AllowableValue(routeAnyMatches, // keep the word 'match' instead of 'matched' to maintain backward compatibility (there was a typo originally)
// keep the word 'match' instead of 'matched' to maintain backward compatibility (there was a typo originally)
public static final AllowableValue ROUTE_ANY_MATCHES = new AllowableValue(routeAnyMatches,
"Route to 'matched' if any matches",
"Requires that at least one user-defined expression evaluate to 'true' for hte FlowFile to be considered a match");
@ -243,8 +244,7 @@ public class RouteOnAttribute extends AbstractProcessor {
}
//now transfer the original flow file
logger.
info("Routing {} to {}", new Object[]{flowFile, firstRelationship});
logger.info("Routing {} to {}", new Object[]{flowFile, firstRelationship});
session.getProvenanceReporter().route(flowFile, firstRelationship);
flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, firstRelationship.getName());
session.transfer(flowFile, firstRelationship);

View File

@ -86,7 +86,10 @@ public class ScanAttribute extends AbstractProcessor {
.build();
public static final PropertyDescriptor DICTIONARY_FILTER = new PropertyDescriptor.Builder()
.name("Dictionary Filter Pattern")
.description("A Regular Expression that will be applied to each line in the dictionary file. If the regular expression does not match the line, the line will not be included in the list of terms to search for. If a Matching Group is specified, only the portion of the term that matches that Matching Group will be used instead of the entire term. If not specified, all terms in the dictionary will be used and each term will consist of the text of the entire line in the file")
.description("A Regular Expression that will be applied to each line in the dictionary file. If the regular expression does not "
+ "match the line, the line will not be included in the list of terms to search for. If a Matching Group is specified, only the "
+ "portion of the term that matches that Matching Group will be used instead of the entire term. If not specified, all terms in "
+ "the dictionary will be used and each term will consist of the text of the entire line in the file")
.required(false)
.addValidator(StandardValidators.createRegexValidator(0, 1, false))
.defaultValue(null)

View File

@ -225,8 +225,7 @@ public class SplitContent extends AbstractProcessor {
}
bytesRead++;
boolean matched = buffer.
addAndCompare((byte) (nextByte & 0xFF));
boolean matched = buffer.addAndCompare((byte) (nextByte & 0xFF));
if (matched) {
long splitLength;
@ -255,8 +254,7 @@ public class SplitContent extends AbstractProcessor {
FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(clone, REL_SPLITS);
logger.
info("Found no match for {}; transferring original 'original' and transferring clone {} to 'splits'", new Object[]{flowFile, clone});
logger.info("Found no match for {}; transferring original 'original' and transferring clone {} to 'splits'", new Object[]{flowFile, clone});
return;
}
@ -303,8 +301,7 @@ public class SplitContent extends AbstractProcessor {
* @param splits splits
*/
private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
final String originalFilename = source.
getAttribute(CoreAttributes.FILENAME.key());
final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
final String fragmentId = UUID.randomUUID().toString();
final ArrayList<FlowFile> newList = new ArrayList<>(splits);

View File

@ -140,15 +140,6 @@ public class SplitText extends AbstractProcessor {
return properties;
}
/**
* Reads up to the given maximum number of lines, copying them to out
*
* @param in
* @param maxNumLines
* @param out
* @return the number of lines actually copied
* @throws IOException
*/
private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines) throws IOException {
int numLines = 0;
for (int i = 0; i < maxNumLines; i++) {
@ -279,7 +270,7 @@ public class SplitText extends AbstractProcessor {
if (linesCopied.get() > 0) {
splits.add(splitFile);
} else {
// if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
// if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
// the last flow file will contain just a header; don't forward that one
session.remove(splitFile);
}
@ -341,13 +332,6 @@ public class SplitText extends AbstractProcessor {
session.transfer(splits, REL_SPLITS);
}
/**
* Apply split index, count and other attributes.
*
* @param session
* @param source
* @param unpacked
*/
private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());

View File

@ -68,7 +68,8 @@ public class SplitXml extends AbstractProcessor {
public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder()
.name("Split Depth")
.description("Indicates the XML-nesting depth to start splitting XML fragments. A depth of 1 means split the root's children, whereas a depth of 2 means split the root's children's children and so forth.")
.description("Indicates the XML-nesting depth to start splitting XML fragments. A depth of 1 means split the root's children, whereas a depth of"
+ " 2 means split the root's children's children and so forth.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")

View File

@ -132,35 +132,33 @@ public class TransformXml extends AbstractProcessor {
final StopWatch stopWatch = new StopWatch(true);
try {
FlowFile transformed = session.
write(original, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final OutputStream out) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
FlowFile transformed = session.write(original, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final OutputStream out) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
File stylesheet = new File(context.getProperty(XSLT_FILE_NAME).getValue());
StreamSource styleSource = new StreamSource(stylesheet);
TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl();
Transformer transformer = tfactory.newTransformer(styleSource);
File stylesheet = new File(context.getProperty(XSLT_FILE_NAME).getValue());
StreamSource styleSource = new StreamSource(stylesheet);
TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl();
Transformer transformer = tfactory.newTransformer(styleSource);
// pass all dynamic properties to the transformer
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().
entrySet()) {
if (entry.getKey().isDynamic()) {
String value = context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue();
transformer.setParameter(entry.getKey().getName(), value);
}
}
// use a StreamSource with Saxon
StreamSource source = new StreamSource(in);
StreamResult result = new StreamResult(out);
transformer.transform(source, result);
} catch (final Exception e) {
throw new IOException(e);
// pass all dynamic properties to the transformer
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
String value = context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue();
transformer.setParameter(entry.getKey().getName(), value);
}
}
});
// use a StreamSource with Saxon
StreamSource source = new StreamSource(in);
StreamResult result = new StreamResult(out);
transformer.transform(source, result);
} catch (final Exception e) {
throw new IOException(e);
}
}
});
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});

View File

@ -68,14 +68,24 @@ import org.apache.nifi.util.ObjectHolder;
@SideEffectFree
@SupportsBatching
@Tags({"Unpack", "un-merge", "tar", "zip", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription("Unpacks the content of FlowFiles that have been packaged with one of several different Packaging Formats, emitting one to many FlowFiles for each input FlowFile")
@ReadsAttribute(attribute = "mime.type", description = "If the <Packaging Format> property is set to use mime.type attribute, this attribute is used to determine the FlowFile's MIME Type. In this case, if the attribute is set to application/tar, the TAR Packaging Format will be used. If the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or application/flowfile-v2 or application/flowfile-v1, the appropriate FlowFile Packaging Format will be used. If this attribute is missing, the FlowFile will be routed to 'failure'. Otherwise, if the attribute's value is not one of those mentioned above, the FlowFile will be routed to 'success' without being unpacked")
@CapabilityDescription("Unpacks the content of FlowFiles that have been packaged with one of several different Packaging Formats, emitting one to many "
+ "FlowFiles for each input FlowFile")
@ReadsAttribute(attribute = "mime.type", description = "If the <Packaging Format> property is set to use mime.type attribute, this attribute is used "
+ "to determine the FlowFile's MIME Type. In this case, if the attribute is set to application/tar, the TAR Packaging Format will be used. If "
+ "the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or "
+ "application/flowfile-v2 or application/flowfile-v1, the appropriate FlowFile Packaging Format will be used. If this attribute is missing, "
+ "the FlowFile will be routed to 'failure'. Otherwise, if the attribute's value is not one of those mentioned above, the FlowFile will be "
+ "routed to 'success' without being unpacked")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type attribute is set to application/octet-stream."),
@WritesAttribute(attribute = "fragment.identifier", description = "All unpacked FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the unpacked FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type "
+ "attribute is set to application/octet-stream."),
@WritesAttribute(attribute = "fragment.identifier", description = "All unpacked FlowFiles produced from the same parent FlowFile will have the same randomly generated "
+ "UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the unpacked FlowFiles that were created from a single "
+ "parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile")})
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because "
+ "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile")})
@SeeAlso(MergeContent.class)
public class UnpackContent extends AbstractProcessor {
@ -380,8 +390,7 @@ public class UnpackContent extends AbstractProcessor {
mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
if (!attributes.
containsKey(CoreAttributes.MIME_TYPE.key())) {
if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
}
@ -396,26 +405,12 @@ public class UnpackContent extends AbstractProcessor {
}
}
/**
* Maps attributes from legacy nifi to the new naming scheme
*
* @param attributes
* @param oldKey
* @param newKey
*/
private static void mapAttributes(final Map<String, String> attributes, final String oldKey, final String newKey) {
if (!attributes.containsKey(newKey) && attributes.containsKey(oldKey)) {
attributes.put(newKey, attributes.get(oldKey));
}
}
/**
* If the unpacked flowfiles contain fragment index attributes, then we need to apply fragment count and other attributes for completeness.
*
* @param session
* @param source
* @param unpacked
*/
private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
// first pass verifies all FlowFiles have the FRAGMENT_INDEX attribute and gets the total number of fragments
int fragmentCount = 0;

View File

@ -58,21 +58,21 @@ import org.xml.sax.SAXException;
@CapabilityDescription("Validates the contents of FlowFiles against a user-specified XML Schema file")
public class ValidateXml extends AbstractProcessor {
public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder().
name("Schema File").
description("The path to the Schema file that is to be used for validation").
required(true).
addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).
build();
public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
.name("Schema File")
.description("The path to the Schema file that is to be used for validation")
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();
public static final Relationship REL_VALID = new Relationship.Builder().
name("valid").
description("FlowFiles that are successfully validated against the schema are routed to this relationship").
build();
public static final Relationship REL_INVALID = new Relationship.Builder().
name("invalid").
description("FlowFiles that are not valid according to the specified schema are routed to this relationship").
build();
public static final Relationship REL_VALID = new Relationship.Builder()
.name("valid")
.description("FlowFiles that are successfully validated against the schema are routed to this relationship")
.build();
public static final Relationship REL_INVALID = new Relationship.Builder()
.name("invalid")
.description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
.build();
private static final String SCHEMA_LANGUAGE = "http://www.w3.org/2001/XMLSchema";
@ -105,10 +105,8 @@ public class ValidateXml extends AbstractProcessor {
@OnScheduled
public void parseSchema(final ProcessContext context) throws IOException, SAXException {
try {
final File file = new File(context.getProperty(SCHEMA_FILE).
getValue());
final SchemaFactory schemaFactory = SchemaFactory.
newInstance(SCHEMA_LANGUAGE);
final File file = new File(context.getProperty(SCHEMA_FILE).getValue());
final SchemaFactory schemaFactory = SchemaFactory.newInstance(SCHEMA_LANGUAGE);
final Schema schema = schemaFactory.newSchema(file);
this.schemaRef.set(schema);
} catch (final SAXException e) {
@ -136,23 +134,18 @@ public class ValidateXml extends AbstractProcessor {
validator.validate(new StreamSource(in));
} catch (final IllegalArgumentException | SAXException e) {
valid.set(false);
logger.
debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e});
logger.debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e});
}
}
});
if (valid.get()) {
logger.
info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
session.getProvenanceReporter().
route(flowFile, REL_VALID);
logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID);
} else {
logger.
info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile});
session.getProvenanceReporter().
route(flowFile, REL_INVALID);
logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile});
session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.transfer(flowFile, REL_INVALID);
}
}

View File

@ -50,11 +50,6 @@ public class ContentAcknowledgmentServlet extends HttpServlet {
private ProcessorLog logger;
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
/**
*
* @param config
* @throws ServletException
*/
@SuppressWarnings("unchecked")
@Override
public void init(final ServletConfig config) throws ServletException {

View File

@ -94,11 +94,6 @@ public class ListenHTTPServlet extends HttpServlet {
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
private StreamThrottler streamThrottler;
/**
*
* @param config
* @throws ServletException
*/
@SuppressWarnings("unchecked")
@Override
public void init(final ServletConfig config) throws ServletException {

View File

@ -41,11 +41,11 @@ public class Bin {
/**
* Constructs a new bin
*
* @param minSizeBytes
* @param maxSizeBytes
* @param minEntries
* @param maxEntries
* @param fileCountAttribute
* @param minSizeBytes min bytes
* @param maxSizeBytes max bytes
* @param minEntries min entries
* @param maxEntries max entries
* @param fileCountAttribute num files
* @throws IllegalArgumentException if the min is not less than or equal to the max.
*/
public Bin(final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) {
@ -75,7 +75,7 @@ public class Bin {
/**
* Indicates enough size exists to meet the minimum requirements
*
* @return
* @return true if full enough
*/
public boolean isFullEnough() {
return isFull() || (size >= minimumSizeBytes && (binContents.size() >= minimumEntries));
@ -84,8 +84,8 @@ public class Bin {
/**
* Determines if this bin is older than the time specified.
*
* @param duration
* @param unit
* @param duration duration
* @param unit unit
* @return true if this bin is older than the length of time given; false otherwise
*/
public boolean isOlderThan(final int duration, final TimeUnit unit) {
@ -96,8 +96,8 @@ public class Bin {
/**
* Determines if this bin is older than the specified bin
*
* @param other
* @return
* @param other other bin
* @return true if this is older than given bin
*/
public boolean isOlderThan(final Bin other) {
return creationMomentEpochNs < other.creationMomentEpochNs;
@ -106,7 +106,7 @@ public class Bin {
/**
* If this bin has enough room for the size of the given flow file then it is added otherwise it is not
*
* @param flowFile
* @param flowFile flowfile to offer
* @param session the ProcessSession to which the FlowFile belongs
* @return true if added; false otherwise
*/

View File

@ -152,7 +152,7 @@ public class BinManager {
* <p/>
* @param relaxFullnessConstraint if false will require bins to be full before considered ready; if true bins only have to meet their minimum size criteria or be 'old' and then they'll be
* considered ready
* @return
* @return bins that are considered full
*/
public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
final Map<String, List<Bin>> newGroupMap = new HashMap<>();

View File

@ -42,12 +42,6 @@ public class DocumentReaderCallback implements InputStreamCallback {
this.isNamespaceAware = isNamespaceAware;
}
/**
* Loads the Document from the specified stream.
*
* @param stream
* @throws IOException
*/
@Override
public void process(final InputStream stream) throws IOException {
try {
@ -63,9 +57,7 @@ public class DocumentReaderCallback implements InputStreamCallback {
}
/**
* Returns the document.
*
* @return
* @return the document
*/
public Document getDocument() {
return document;

View File

@ -305,8 +305,7 @@ public class FTPTransfer implements FileTransfer {
final FTPFile[] files = client.listFiles(path);
FTPFile matchingFile = null;
for (final FTPFile file : files) {
if (file.getName().
equalsIgnoreCase(remoteFileName)) {
if (file.getName().equalsIgnoreCase(remoteFileName)) {
matchingFile = file;
break;
}

View File

@ -75,7 +75,7 @@ public class FTPUtils {
* value of zero means do not timeout. Users should probably set a value here unless using very reliable communications links or else risk indefinite hangs that require a restart.</li>
* </ul>
*
* @param conf
* @param conf conf
* @param monitor if provided will be used to monitor FTP commands processed but may be null
* @return FTPClient connected to FTP server as configured
* @throws NullPointerException if either argument is null

View File

@ -136,7 +136,10 @@ public interface FileTransfer extends Closeable {
.build();
public static final PropertyDescriptor REMOTE_POLL_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Remote Poll Batch Size")
.description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower than normal.")
.description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
+ "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
+ "be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
+ "than normal.")
.defaultValue("5000")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.required(true)
@ -194,41 +197,53 @@ public interface FileTransfer extends Closeable {
.build();
public static final PropertyDescriptor DOT_RENAME = new PropertyDescriptor.Builder()
.name("Dot Rename")
.description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the Temporary Filename property is set.")
.description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
+ "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
+ "Temporary Filename property is set.")
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor TEMP_FILENAME = new PropertyDescriptor.Builder()
.name("Temporary Filename")
.description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
.description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
+ "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
public static final PropertyDescriptor LAST_MODIFIED_TIME = new PropertyDescriptor.Builder()
.name("Last Modified Time")
.description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
.description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
+ "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
+ "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
.name("Permissions")
.description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will fail to change permissions of the file.")
.description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
+ "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
+ "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
+ "fail to change permissions of the file.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
.name("Remote Owner")
.description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but will fail to change the owner of the file.")
.description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
+ "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
+ "will fail to change the owner of the file.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
.name("Remote Group")
.description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but will fail to change the group of the file.")
.description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
+ "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
+ "will fail to change the group of the file.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)

View File

@ -149,8 +149,7 @@ public class SFTPTransfer implements FileTransfer {
return;
}
final boolean ignoreDottedFiles = ctx.
getProperty(FileTransfer.IGNORE_DOTTED_FILES).asBoolean();
final boolean ignoreDottedFiles = ctx.getProperty(FileTransfer.IGNORE_DOTTED_FILES).asBoolean();
final boolean recurse = ctx.getProperty(FileTransfer.RECURSIVE_SEARCH).asBoolean();
final String fileFilterRegex = ctx.getProperty(FileTransfer.FILE_FILTER_REGEX).getValue();
final Pattern pattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
@ -234,8 +233,7 @@ public class SFTPTransfer implements FileTransfer {
try {
getListing(newFullForwardPath, depth + 1, maxResults, listing);
} catch (final IOException e) {
logger.
error("Unable to get listing from " + newFullForwardPath + "; skipping this subdirectory");
logger.error("Unable to get listing from " + newFullForwardPath + "; skipping this subdirectory");
}
}
}
@ -310,8 +308,7 @@ public class SFTPTransfer implements FileTransfer {
channel.mkdir(remoteDirectory);
} catch (SftpException e) {
if (e.id != ChannelSftp.SSH_FX_FAILURE) {
throw new IOException("Could not blindly create remote directory due to " + e.
getMessage(), e);
throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e);
}
}
return;

View File

@ -71,9 +71,7 @@ public class XmlSplitterSaxParser extends DefaultHandler {
@Override
public void endElement(final String uri, final String localName, final String qName) throws SAXException {
// Add the element end tag.
sb.append("</").
append(qName).
append(">");
sb.append("</").append(qName).append(">");
// We have finished processing this element. Decrement the depth.
int newDepth = depth.decrementAndGet();
@ -104,12 +102,7 @@ public class XmlSplitterSaxParser extends DefaultHandler {
for (int i = 0; i < attCount; i++) {
String attName = atts.getQName(i);
String attValue = atts.getValue(i);
sb.append(" ").
append(attName).
append("=").
append("\"").
append(attValue).
append("\"");
sb.append(" ").append(attName).append("=").append("\"").append(attValue).append("\"");
}
sb.append(">");

View File

@ -29,8 +29,7 @@ public class TestDistributeLoad {
public static void before() {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.
setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DistributeLoad", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DistributeLoad", "debug");
}
@Test

View File

@ -60,8 +60,8 @@ public class TestHandleHttpRequest {
public void run() {
try {
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").
openConnection();
final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");