mirror of https://github.com/apache/nifi.git
NIFI-13265 Removed instantiation of Object arrays for log arguments
This closes #8896 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
983f209083
commit
86d36d2327
|
@ -49,7 +49,7 @@ public class LogUtil {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
DockerPort dockerPort = container.port(8000);
|
||||
logger.info("Connecting to external port {} for docker internal port of {}", new Object[]{dockerPort.getExternalPort(), dockerPort.getInternalPort()});
|
||||
logger.info("Connecting to external port {} for docker internal port of {}", dockerPort.getExternalPort(), dockerPort.getInternalPort());
|
||||
URL url = URI.create("http://" + dockerPort.getIp() + ":" + dockerPort.getExternalPort()).toURL();
|
||||
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
|
||||
try (InputStream inputStream = urlConnection.getInputStream();
|
||||
|
|
|
@ -84,7 +84,7 @@ public class IdentityMappingUtil {
|
|||
final String identityPattern = properties.getProperty(propertyName);
|
||||
|
||||
if (StringUtils.isBlank(identityPattern)) {
|
||||
LOGGER.warn("{} Mapping property {} was found, but was empty", new Object[] {getSubject.get(), propertyName});
|
||||
LOGGER.warn("{} Mapping property {} was found, but was empty", getSubject.get(), propertyName);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -92,8 +92,7 @@ public class IdentityMappingUtil {
|
|||
final String identityValue = properties.getProperty(identityValueProperty);
|
||||
|
||||
if (StringUtils.isBlank(identityValue)) {
|
||||
LOGGER.warn("{} Mapping property {} was found, but corresponding value {} was not found",
|
||||
new Object[] {getSubject.get(), propertyName, identityValueProperty});
|
||||
LOGGER.warn("{} Mapping property {} was found, but corresponding value {} was not found", getSubject.get(), propertyName, identityValueProperty);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -101,7 +100,7 @@ public class IdentityMappingUtil {
|
|||
String rawIdentityTransform = properties.getProperty(identityTransformProperty);
|
||||
|
||||
if (StringUtils.isBlank(rawIdentityTransform)) {
|
||||
LOGGER.debug("{} Mapping property {} was found, but no transform was present. Using NONE.", new Object[] {getSubject.get(), propertyName});
|
||||
LOGGER.debug("{} Mapping property {} was found, but no transform was present. Using NONE.", getSubject.get(), propertyName);
|
||||
rawIdentityTransform = Transform.NONE.name();
|
||||
}
|
||||
|
||||
|
@ -110,15 +109,14 @@ public class IdentityMappingUtil {
|
|||
identityTransform = Transform.valueOf(rawIdentityTransform);
|
||||
} catch (final IllegalArgumentException iae) {
|
||||
LOGGER.warn("{} Mapping property {} was found, but corresponding transform {} was not valid. Allowed values {}",
|
||||
new Object[] {getSubject.get(), propertyName, rawIdentityTransform, StringUtils.join(Transform.values(), ", ")});
|
||||
getSubject.get(), propertyName, rawIdentityTransform, StringUtils.join(Transform.values(), ", "));
|
||||
continue;
|
||||
}
|
||||
|
||||
final IdentityMapping identityMapping = new IdentityMapping(key, Pattern.compile(identityPattern), identityValue, identityTransform);
|
||||
mappings.add(identityMapping);
|
||||
|
||||
LOGGER.debug("Found {} Mapping with key = {}, pattern = {}, value = {}, transform = {}",
|
||||
new Object[] {getSubject.get(), key, identityPattern, identityValue, rawIdentityTransform});
|
||||
LOGGER.debug("Found {} Mapping with key = {}, pattern = {}, value = {}, transform = {}", getSubject.get(), key, identityPattern, identityValue, rawIdentityTransform);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ public class KerberosAction<T> {
|
|||
if (!kerberosUser.isLoggedIn()) {
|
||||
try {
|
||||
kerberosUser.login();
|
||||
logger.info("Successful login for {}", new Object[]{kerberosUser.getPrincipal()});
|
||||
logger.info("Successful login for {}", kerberosUser.getPrincipal());
|
||||
} catch (final KerberosLoginException e) {
|
||||
throw new ProcessException("Login failed due to: " + e.getMessage(), e);
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public final class ChannelDispatcher implements Runnable {
|
|||
selectServerSocketKeys();
|
||||
selectSocketChannelKeys();
|
||||
} catch (final Exception ex) {
|
||||
LOGGER.warn("Key selection failed: {} Normal during shutdown.", new Object[]{ex});
|
||||
LOGGER.warn("Key selection failed: Normal during shutdown.", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,8 +155,7 @@ public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T
|
|||
}
|
||||
this.swapLocations.addAll(swapLocations);
|
||||
|
||||
logger.info("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}",
|
||||
new Object[] {this, numRecords, swapLocations.size(), maxTransactionId});
|
||||
logger.info("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", this, numRecords, swapLocations.size(), maxTransactionId);
|
||||
|
||||
return new StandardSnapshotRecovery<>(recordMap, swapLocations, snapshotFile, maxTransactionId);
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ public class StandardRecordModelIteratorProvider implements RecordModelIteratorP
|
|||
|
||||
try {
|
||||
final int decode = model.decode(inputStream);
|
||||
logger.debug("Decoded {} bytes into {}", new Object[]{decode, model.getClass()});
|
||||
logger.debug("Decoded {} bytes into {}", decode, model.getClass());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to decode " + rootClass.getCanonicalName(), e);
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ public class ExtractAvroMetadata extends AbstractProcessor {
|
|||
}
|
||||
});
|
||||
} catch (final ProcessException pe) {
|
||||
getLogger().error("Failed to extract Avro metadata for {} due to {}; transferring to failure", new Object[] {flowFile, pe});
|
||||
getLogger().error("Transferring to failure since failed to extract Avro metadata for {}", flowFile, pe);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -446,7 +446,7 @@ public class FetchS3Object extends AbstractS3Processor {
|
|||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
|
||||
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", flowFile, transferMillis);
|
||||
session.getProvenanceReporter().fetch(flowFile, url, transferMillis);
|
||||
}
|
||||
|
||||
|
|
|
@ -367,12 +367,10 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
return null;
|
||||
}
|
||||
if (localUploadExistsInS3(s3, bucket, currState)) {
|
||||
getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
|
||||
new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
|
||||
getLogger().info("Local state for {} loaded with uploadId {} and {} partETags", s3ObjectKey, currState.getUploadId(), currState.getPartETags().size());
|
||||
return currState;
|
||||
} else {
|
||||
getLogger().info("Local state for {} with uploadId {} does not exist in S3, deleting local state",
|
||||
new Object[]{s3ObjectKey, currState.getUploadId()});
|
||||
getLogger().info("Local state for {} with uploadId {} does not exist in S3, deleting local state", s3ObjectKey, currState.getUploadId());
|
||||
persistLocalState(s3ObjectKey, null);
|
||||
return null;
|
||||
}
|
||||
|
@ -387,8 +385,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
|
||||
props.load(fis);
|
||||
} catch (IOException ioe) {
|
||||
getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
|
||||
"restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
|
||||
getLogger().warn("Assuming no local state and restarting upload since failed to recover local state for {}", s3ObjectKey, ioe);
|
||||
return null;
|
||||
}
|
||||
if (props.containsKey(s3ObjectKey)) {
|
||||
|
@ -397,7 +394,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
try {
|
||||
return new MultipartState(localSerialState);
|
||||
} catch (final RuntimeException rte) {
|
||||
getLogger().warn("Failed to recover local state for {} due to corrupt data in state.", new Object[]{s3ObjectKey, rte.getMessage()});
|
||||
getLogger().warn("Failed to recover local state for {} due to corrupt data in state.", s3ObjectKey, rte);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -431,16 +428,14 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
|
||||
props.store(fos, null);
|
||||
} catch (IOException ioe) {
|
||||
getLogger().error("Could not store state {} due to {}.",
|
||||
new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
|
||||
getLogger().error("Could not store state {}", persistenceFile.getAbsolutePath(), ioe);
|
||||
}
|
||||
} else {
|
||||
if (persistenceFile.exists()) {
|
||||
try {
|
||||
Files.delete(persistenceFile.toPath());
|
||||
} catch (IOException ioe) {
|
||||
getLogger().error("Could not remove state file {} due to {}.",
|
||||
new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
|
||||
getLogger().error("Could not remove state file {}", persistenceFile.getAbsolutePath(), ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -458,8 +453,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
|
||||
props.load(fis);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to ageoff remove local state due to {}",
|
||||
new Object[]{ioe.getMessage()});
|
||||
getLogger().warn("Failed to ageoff remove local state", ioe);
|
||||
return;
|
||||
}
|
||||
for (Entry<Object, Object> entry: props.entrySet()) {
|
||||
|
@ -468,13 +462,11 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
if (localSerialState != null) {
|
||||
final MultipartState state = new MultipartState(localSerialState);
|
||||
if (state.getTimestamp() < ageCutoff) {
|
||||
getLogger().warn("Removing local state for {} due to exceeding ageoff time",
|
||||
new Object[]{key});
|
||||
getLogger().warn("Removing local state for {} due to exceeding ageoff time", key);
|
||||
try {
|
||||
removeLocalState(key);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to remove local state for {} due to {}",
|
||||
new Object[]{key, ioe.getMessage()});
|
||||
getLogger().warn("Failed to remove local state for {}", key, ioe);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -734,15 +726,14 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
}
|
||||
getLogger().info("Success initiating upload flowfile={} available={} position={} " +
|
||||
"length={} bucket={} key={} uploadId={}",
|
||||
new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
|
||||
currentState.getContentLength(), bucket, key,
|
||||
currentState.getUploadId()});
|
||||
ffFilename, in.available(), currentState.getFilePosition(),
|
||||
currentState.getContentLength(), bucket, key,
|
||||
currentState.getUploadId());
|
||||
if (initiateResult.getUploadId() != null) {
|
||||
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
|
||||
new Object[]{ffFilename, bucket, key, e.getMessage()});
|
||||
getLogger().info("Failure initiating upload flowfile={} bucket={} key={}", ffFilename, bucket, key, e);
|
||||
throw (e);
|
||||
}
|
||||
} else {
|
||||
|
@ -750,16 +741,11 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
try {
|
||||
final long skipped = in.skip(currentState.getFilePosition());
|
||||
if (skipped != currentState.getFilePosition()) {
|
||||
getLogger().info("Failure skipping to resume upload flowfile={} " +
|
||||
"bucket={} key={} position={} skipped={}",
|
||||
new Object[]{ffFilename, bucket, key,
|
||||
currentState.getFilePosition(), skipped});
|
||||
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} key={} position={} skipped={}",
|
||||
ffFilename, bucket, key, currentState.getFilePosition(), skipped);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
|
||||
"key={} position={} reason={}",
|
||||
new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
|
||||
e.getMessage()});
|
||||
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} key={} position={}", ffFilename, bucket, key, currentState.getFilePosition(), e);
|
||||
throw (new ProcessException(e));
|
||||
}
|
||||
}
|
||||
|
@ -805,12 +791,10 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
} catch (IOException e) {
|
||||
// in case of the last part, the stream is already closed
|
||||
}
|
||||
getLogger().info("Success uploading part flowfile={} part={} available={} " +
|
||||
"etag={} uploadId={}", new Object[]{ffFilename, part, available,
|
||||
uploadPartResult.getETag(), currentState.getUploadId()});
|
||||
getLogger().info("Success uploading part flowfile={} part={} available={} etag={} uploadId={}",
|
||||
ffFilename, part, available, uploadPartResult.getETag(), currentState.getUploadId());
|
||||
} catch (AmazonClientException e) {
|
||||
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
|
||||
"reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
|
||||
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={}", ffFilename, part, bucket, key, e);
|
||||
throw (e);
|
||||
}
|
||||
}
|
||||
|
@ -825,7 +809,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
CompleteMultipartUploadResult completeResult =
|
||||
s3.completeMultipartUpload(completeRequest);
|
||||
getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
|
||||
new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
|
||||
ffFilename, completeResult.getETag(), currentState.getUploadId());
|
||||
if (completeResult.getVersionId() != null) {
|
||||
attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
|
||||
}
|
||||
|
@ -848,8 +832,8 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
}
|
||||
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
|
||||
} catch (AmazonClientException e) {
|
||||
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
|
||||
new Object[]{ffFilename, bucket, key, e.getMessage()});
|
||||
getLogger().info("Failure completing upload flowfile={} bucket={} key={}",
|
||||
ffFilename, bucket, key, e);
|
||||
throw (e);
|
||||
}
|
||||
}
|
||||
|
@ -866,12 +850,11 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().send(flowFile, url, millis);
|
||||
|
||||
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
|
||||
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", ff, millis);
|
||||
try {
|
||||
removeLocalState(cacheKey);
|
||||
} catch (IOException e) {
|
||||
getLogger().info("Error trying to delete key {} from cache: {}",
|
||||
new Object[]{cacheKey, e.getMessage()});
|
||||
getLogger().info("Error trying to delete key {} from cache:", cacheKey, e);
|
||||
}
|
||||
|
||||
} catch (final ProcessException | AmazonClientException | IOException e) {
|
||||
|
@ -925,12 +908,10 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
getLogger().warn("AccessDenied checking S3 Multipart Upload list for {}: {} " +
|
||||
"** The configured user does not have the s3:ListBucketMultipartUploads permission " +
|
||||
"for this bucket, S3 ageoff cannot occur without this permission. Next ageoff check " +
|
||||
"time is being advanced by interval to prevent checking on every upload **",
|
||||
new Object[]{bucket, e.getMessage()});
|
||||
"time is being advanced by interval to prevent checking on every upload **", bucket, e.getMessage());
|
||||
lastS3AgeOff.set(System.currentTimeMillis());
|
||||
} else {
|
||||
getLogger().error("Error checking S3 Multipart Upload list for {}: {}",
|
||||
new Object[]{bucket, e.getMessage()});
|
||||
getLogger().error("Error checking S3 Multipart Upload list for {}", bucket, e);
|
||||
}
|
||||
} finally {
|
||||
s3BucketLock.unlock();
|
||||
|
|
|
@ -162,7 +162,7 @@ public class PutSNS extends AbstractAwsSyncProcessor<SnsClient, SnsClientBuilder
|
|||
}
|
||||
|
||||
if (flowFile.getSize() > MAX_SIZE) {
|
||||
getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile});
|
||||
getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -109,10 +109,10 @@ public class DeleteSQS extends AbstractAwsSyncProcessor<SqsClient, SqsClientBuil
|
|||
throw new ProcessException(response.failed().get(0).toString());
|
||||
}
|
||||
|
||||
getLogger().info("Successfully deleted message from SQS for {}", new Object[] {flowFile});
|
||||
getLogger().info("Successfully deleted message from SQS for {}", flowFile);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to delete message from SQS due to {}", new Object[] {e});
|
||||
getLogger().error("Failed to delete message from SQS", e);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
|
|
|
@ -160,7 +160,7 @@ public class GetSQS extends AbstractAwsSyncProcessor<SqsClient, SqsClientBuilder
|
|||
try {
|
||||
response = client.receiveMessage(request);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
|
||||
getLogger().error("Failed to receive messages from Amazon SQS", e);
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ public class GetSQS extends AbstractAwsSyncProcessor<SqsClient, SqsClientBuilder
|
|||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().receive(flowFile, queueUrl);
|
||||
|
||||
getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
|
||||
getLogger().info("Successfully received {} from Amazon SQS", flowFile);
|
||||
}
|
||||
|
||||
if (autoDelete) {
|
||||
|
@ -223,8 +223,7 @@ public class GetSQS extends AbstractAwsSyncProcessor<SqsClient, SqsClientBuilder
|
|||
try {
|
||||
client.deleteMessageBatch(deleteRequest);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages"
|
||||
+ " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e});
|
||||
getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated", messages.size(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -414,7 +414,7 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
|||
}
|
||||
|
||||
processSession.getProvenanceReporter().receive(flowFile, this.displayUrl, "Received message from " + fromAddressesString, executionDuration);
|
||||
this.getLogger().info("Successfully received {} from {} in {} millis", new Object[]{flowFile, fromAddressesString, executionDuration});
|
||||
this.getLogger().info("Successfully received {} from {} in {} millis", flowFile, fromAddressesString, executionDuration);
|
||||
processSession.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
}
|
||||
|
|
|
@ -70,8 +70,7 @@ public class ISPEnrichIP extends AbstractEnrichIP {
|
|||
|
||||
if (StringUtils.isEmpty(ipAttributeName)) {
|
||||
session.transfer(flowFile, REL_NOT_FOUND);
|
||||
getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing to failure",
|
||||
new Object[]{flowFile, IP_ADDRESS_ATTRIBUTE.getDisplayName()});
|
||||
getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing to failure", flowFile, IP_ADDRESS_ATTRIBUTE.getDisplayName());
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -239,8 +239,8 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
|
|||
results.put(key, row);
|
||||
}
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
getLogger().warn("Could not find capture group {} while processing result. You may want to review your " +
|
||||
"Regular Expression to match against the content \"{}\"", new Object[]{lookupKey, rawResult});
|
||||
getLogger().warn("Could not find capture group {} while processing result. You may want to review your Regular Expression to match against the content \"{}\"",
|
||||
lookupKey, rawResult);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -258,7 +258,7 @@ public class QueryDNS extends AbstractEnrichProcessor {
|
|||
attrs = ictx.getAttributes(queryInput, new String[]{queryType});
|
||||
return attrs;
|
||||
} catch ( NameNotFoundException e) {
|
||||
getLogger().debug("Resolution for domain {} failed due to {}", new Object[]{queryInput, e});
|
||||
getLogger().debug("Resolution for domain {} failed", queryInput, e);
|
||||
attrs = new BasicAttributes(queryType, "NXDOMAIN", true);
|
||||
return attrs;
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ public class ParseEvtx extends AbstractProcessor {
|
|||
if (basename.endsWith(EVTX_EXTENSION)) {
|
||||
return basename.substring(0, basename.length() - EVTX_EXTENSION.length());
|
||||
} else {
|
||||
logger.warn("Trying to parse file without .evtx extension {} from flowfile {}", new Object[]{basename, flowFile});
|
||||
logger.warn("Trying to parse file without .evtx extension {} from flowfile {}", basename, flowFile);
|
||||
return basename;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,8 +181,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
|||
getLogger().debug("Binned {} FlowFiles", binningResult.getFlowFilesBinned());
|
||||
} else {
|
||||
binningResult = BinningResult.EMPTY;
|
||||
getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
|
||||
+ "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
|
||||
getLogger().debug("Will not bin any FlowFiles because {} bins already exist; will wait until bins have been emptied before any more are created", totalBinCount);
|
||||
}
|
||||
|
||||
if (!isScheduled()) {
|
||||
|
@ -235,7 +234,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
|||
try {
|
||||
binProcessingResult = this.processBin(bin, context);
|
||||
} catch (final ProcessException e) {
|
||||
logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e});
|
||||
logger.error("Failed to process bundle of {} files", bin.getContents().size(), e);
|
||||
|
||||
final ProcessSession binSession = bin.getSession();
|
||||
for (final FlowFile flowFile : bin.getContents()) {
|
||||
|
@ -244,7 +243,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
|||
binSession.commitAsync();
|
||||
continue;
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e});
|
||||
logger.error("Rolling back sessions since failed to process bundle of {} files", bin.getContents().size(), e);
|
||||
|
||||
bin.getSession().rollback();
|
||||
continue;
|
||||
|
|
|
@ -97,14 +97,14 @@ public abstract class AbstractListenEventBatchingProcessor<E extends Event> exte
|
|||
|
||||
if (flowFile.getSize() == 0L || events.size() == 0) {
|
||||
session.remove(flowFile);
|
||||
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
|
||||
getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
|
||||
continue;
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = getAttributes(entry.getValue());
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
|
||||
getLogger().debug("Transferring {} to success", flowFile);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
|
||||
|
||||
|
|
|
@ -130,10 +130,10 @@ public abstract class GetFileTransfer extends AbstractProcessor {
|
|||
try {
|
||||
transfer.close();
|
||||
} catch (final IOException e1) {
|
||||
logger.warn("Unable to close connection due to {}", new Object[]{e1});
|
||||
logger.warn("Unable to close connection", e1);
|
||||
}
|
||||
|
||||
logger.error("Unable to fetch listing from remote server due to {}", new Object[]{e});
|
||||
logger.error("Unable to fetch listing from remote server", e);
|
||||
return;
|
||||
}
|
||||
} finally {
|
||||
|
@ -149,7 +149,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
|
|||
try {
|
||||
transfer.close();
|
||||
} catch (final IOException e1) {
|
||||
logger.warn("Unable to close connection due to {}", new Object[]{e1});
|
||||
logger.warn("Unable to close connection", e1);
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
@ -205,17 +205,16 @@ public abstract class GetFileTransfer extends AbstractProcessor {
|
|||
|
||||
session.getProvenanceReporter().receive(flowFile, transfer.getProtocolName() + "://" + hostname + "/" + file.getFullPathFileName(), millis);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
logger.info("Successfully retrieved {} from {} in {} milliseconds at a rate of {} and transferred to success",
|
||||
new Object[]{flowFile, hostname, millis, dataRate});
|
||||
logger.info("Successfully retrieved {} from {} in {} milliseconds at a rate of {} and transferred to success", flowFile, hostname, millis, dataRate);
|
||||
|
||||
flowFilesReceived.put(flowFile, file.getFullPathFileName());
|
||||
} catch (final IOException e) {
|
||||
context.yield();
|
||||
logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e});
|
||||
logger.error("Unable to retrieve file {}", file.getFullPathFileName(), e);
|
||||
try {
|
||||
transfer.close();
|
||||
} catch (IOException e1) {
|
||||
logger.warn("Unable to close connection to remote host due to {}", new Object[]{e1});
|
||||
logger.warn("Unable to close connection to remote host", e1);
|
||||
}
|
||||
|
||||
session.rollback();
|
||||
|
@ -269,7 +268,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
|
|||
try {
|
||||
transfer.close();
|
||||
} catch (final IOException e) {
|
||||
getLogger().warn("Failed to close connection to {} due to {}", new Object[]{hostname, e});
|
||||
getLogger().warn("Failed to close connection to {}", hostname, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -138,7 +138,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
|
||||
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||
logger.info("Successfully transferred {} to {} on remote host {} in {} milliseconds at a rate of {}",
|
||||
new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate});
|
||||
flowFile, fullPathRef.get(), hostname, millis, dataRate);
|
||||
|
||||
String fullPathWithSlash = fullPathRef.get();
|
||||
if (!fullPathWithSlash.startsWith("/")) {
|
||||
|
@ -160,17 +160,17 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
&& ((flowFile = session.get()) != null));
|
||||
} catch (final IOException e) {
|
||||
context.yield();
|
||||
logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e});
|
||||
logger.error("Unable to transfer {} to remote host {} ", flowFile, hostname, e);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} catch (final FlowFileAccessException e) {
|
||||
context.yield();
|
||||
logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.getCause()});
|
||||
logger.error("Unable to transfer {} to remote host {}", flowFile, hostname, e.getCause());
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} catch (final ProcessException e) {
|
||||
context.yield();
|
||||
logger.error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.getCause()});
|
||||
logger.error("Routing to failure since unable to transfer {} to remote host {}", flowFile, hostname, e.getCause());
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
|
@ -195,7 +195,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
if (rejectZeroByteFiles) {
|
||||
final long sizeInBytes = flowFile.getSize();
|
||||
if (sizeInBytes == 0) {
|
||||
logger.warn("Rejecting {} because it is zero bytes", new Object[]{flowFile});
|
||||
logger.warn("Rejecting {} because it is zero bytes", flowFile);
|
||||
return new ConflictResult(REL_REJECT, false, fileName, true);
|
||||
}
|
||||
}
|
||||
|
@ -211,26 +211,25 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
}
|
||||
|
||||
if (remoteFileInfo.isDirectory()) {
|
||||
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
|
||||
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", flowFile);
|
||||
return new ConflictResult(REL_REJECT, false, fileName, false);
|
||||
}
|
||||
|
||||
logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}",
|
||||
new Object[]{flowFile, conflictResolutionType});
|
||||
logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}", flowFile, conflictResolutionType);
|
||||
|
||||
switch (conflictResolutionType.toUpperCase()) {
|
||||
case FileTransfer.CONFLICT_RESOLUTION_REJECT:
|
||||
destinationRelationship = REL_REJECT;
|
||||
transferFile = false;
|
||||
penalizeFile = false;
|
||||
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
|
||||
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", flowFile);
|
||||
break;
|
||||
case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
|
||||
transfer.deleteFile(flowFile, path, fileName);
|
||||
destinationRelationship = REL_SUCCESS;
|
||||
transferFile = true;
|
||||
penalizeFile = false;
|
||||
logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile});
|
||||
logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", flowFile);
|
||||
break;
|
||||
case FileTransfer.CONFLICT_RESOLUTION_RENAME:
|
||||
boolean uniqueNameGenerated = false;
|
||||
|
@ -241,7 +240,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
uniqueNameGenerated = (renamedFileInfo == null);
|
||||
if (uniqueNameGenerated) {
|
||||
fileName = possibleFileName;
|
||||
logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName});
|
||||
logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", flowFile, fileName);
|
||||
destinationRelationship = REL_SUCCESS;
|
||||
transferFile = true;
|
||||
penalizeFile = false;
|
||||
|
@ -259,13 +258,13 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
destinationRelationship = REL_SUCCESS;
|
||||
transferFile = false;
|
||||
penalizeFile = false;
|
||||
logger.info("Resolving conflict for {} by not transferring file and and still considering the process a success.", new Object[]{flowFile});
|
||||
logger.info("Resolving conflict for {} by not transferring file and and still considering the process a success.", flowFile);
|
||||
break;
|
||||
case FileTransfer.CONFLICT_RESOLUTION_FAIL:
|
||||
destinationRelationship = REL_FAILURE;
|
||||
transferFile = false;
|
||||
penalizeFile = true;
|
||||
logger.warn("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile});
|
||||
logger.warn("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", flowFile);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -364,7 +364,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen
|
|||
hdfsResources.set(resources);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
getLogger().error("HDFS Configuration error - {}", new Object[]{ex});
|
||||
getLogger().error("HDFS Configuration failed", ex);
|
||||
hdfsResources.set(EMPTY_HDFS_RESOURCES);
|
||||
throw ex;
|
||||
}
|
||||
|
@ -473,7 +473,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen
|
|||
}
|
||||
fs = getFileSystemAsUser(config, ugi);
|
||||
}
|
||||
getLogger().debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser});
|
||||
getLogger().debug("resetHDFSResources UGI [{}], KerberosUser [{}]", ugi, kerberosUser);
|
||||
|
||||
final Path workingDir = fs.getWorkingDirectory();
|
||||
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
|
||||
|
|
|
@ -898,7 +898,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
}
|
||||
|
||||
if (processedNewFiles) {
|
||||
getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed});
|
||||
getLogger().info("Successfully created listing with {} new objects", entitiesListed);
|
||||
session.commitAsync();
|
||||
}
|
||||
|
||||
|
|
|
@ -211,14 +211,14 @@ public class ListedEntityTracker<T extends ListableEntity> {
|
|||
|
||||
private void persistListedEntities(Map<String, ListedEntity> listedEntities) throws IOException {
|
||||
final String cacheKey = getCacheKey();
|
||||
logger.debug("Persisting listed entities: {}={}", new Object[]{cacheKey, listedEntities});
|
||||
logger.debug("Persisting listed entities: {}={}", cacheKey, listedEntities);
|
||||
mapCacheClient.put(cacheKey, listedEntities, stringSerializer, listedEntitiesSerializer);
|
||||
}
|
||||
|
||||
private Map<String, ListedEntity> fetchListedEntities() throws IOException {
|
||||
final String cacheKey = getCacheKey();
|
||||
final Map<String, ListedEntity> listedEntities = mapCacheClient.get(cacheKey, stringSerializer, listedEntitiesDeserializer);
|
||||
logger.debug("Fetched listed entities: {}={}", new Object[]{cacheKey, listedEntities});
|
||||
logger.debug("Fetched listed entities: {}={}", cacheKey, listedEntities);
|
||||
return listedEntities;
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
|
|||
alreadyListedEntities = null;
|
||||
if (mapCacheClient != null) {
|
||||
final String cacheKey = getCacheKey();
|
||||
logger.debug("Removing listed entities from cache storage: {}", new Object[]{cacheKey});
|
||||
logger.debug("Removing listed entities from cache storage: {}", cacheKey);
|
||||
mapCacheClient.remove(cacheKey, stringSerializer);
|
||||
}
|
||||
}
|
||||
|
@ -281,27 +281,27 @@ public class ListedEntityTracker<T extends ListableEntity> {
|
|||
final String identifier = entity.getIdentifier();
|
||||
|
||||
if (entity.getTimestamp() < minTimestampToList) {
|
||||
logger.trace("Skipped {} having older timestamp than the minTimestampToList {}.", new Object[]{identifier, entity.getTimestamp(), minTimestampToList});
|
||||
logger.trace("Skipped {} having older timestamp {} than the minTimestampToList {}.", identifier, entity.getTimestamp(), minTimestampToList);
|
||||
return false;
|
||||
}
|
||||
|
||||
final ListedEntity alreadyListedEntity = alreadyListedEntities.get(identifier);
|
||||
if (alreadyListedEntity == null) {
|
||||
logger.trace("Picked {} being newly found.", new Object[]{identifier});
|
||||
logger.trace("Picked {} being newly found.", identifier);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (entity.getTimestamp() > alreadyListedEntity.getTimestamp()) {
|
||||
logger.trace("Picked {} having newer timestamp {} than {}.", new Object[]{identifier, entity.getTimestamp(), alreadyListedEntity.getTimestamp()});
|
||||
logger.trace("Picked {} having newer timestamp {} than {}.", identifier, entity.getTimestamp(), alreadyListedEntity.getTimestamp());
|
||||
return true;
|
||||
}
|
||||
|
||||
if (entity.getSize() != alreadyListedEntity.getSize()) {
|
||||
logger.trace("Picked {} having different size {} than {}.", new Object[]{identifier, entity.getSize(), alreadyListedEntity.getSize()});
|
||||
logger.trace("Picked {} having different size {} than {}.", identifier, entity.getSize(), alreadyListedEntity.getSize());
|
||||
return true;
|
||||
}
|
||||
|
||||
logger.trace("Skipped {}, not changed.", new Object[]{identifier, entity.getTimestamp(), minTimestampToList});
|
||||
logger.trace("Skipped {}, not changed.", identifier);
|
||||
return false;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
|
@ -334,8 +334,8 @@ public class ListedEntityTracker<T extends ListableEntity> {
|
|||
// In case persisting listed entities failure, same entities may be listed again, but better than not listing.
|
||||
session.commitAsync(() -> {
|
||||
try {
|
||||
logger.debug("Removed old entities count: {}, Updated entities count: {}", new Object[]{oldEntityIds.size(), updatedEntities.size()});
|
||||
logger.trace("Removed old entities: {}, Updated entities: {}", new Object[]{oldEntityIds, updatedEntities});
|
||||
logger.debug("Removed old entities count: {}, Updated entities count: {}", oldEntityIds.size(), updatedEntities.size());
|
||||
logger.trace("Removed old entities: {}, Updated entities: {}", oldEntityIds, updatedEntities);
|
||||
|
||||
persistListedEntities(alreadyListedEntities);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -120,7 +120,7 @@ public class PartialFunctions {
|
|||
onTrigger.execute(session);
|
||||
session.commitAsync();
|
||||
} catch (final Throwable t) {
|
||||
logger.error("{} failed to process due to {}; rolling back session", new Object[]{onTrigger, t});
|
||||
logger.error("Rolling back session since {} failed to process", onTrigger, t);
|
||||
rollbackSession.rollback(session, t);
|
||||
throw t;
|
||||
}
|
||||
|
|
|
@ -235,22 +235,22 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
|
|||
|
||||
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
|
||||
successFlowFile = session.putAttribute(successFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
|
||||
getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, stopWatch.getDuration()});
|
||||
getLogger().info("Successfully received content from {} for {} in {} milliseconds", qualifiedPath, successFlowFile, stopWatch.getDuration());
|
||||
session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
|
||||
session.transfer(successFlowFile, REL_SUCCESS);
|
||||
session.remove(originalFlowFile);
|
||||
return null;
|
||||
|
||||
} catch (final FileNotFoundException | AccessControlException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e});
|
||||
getLogger().error("Routing to failure since failed to retrieve content from {} for {}", filenameValue, originalFlowFile, e);
|
||||
final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage() == null ? e.toString() : e.getMessage());
|
||||
session.transfer(failureFlowFile, REL_FAILURE);
|
||||
} catch (final IOException | FlowFileAccessException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e});
|
||||
getLogger().error("Routing to retry since failed to retrieve content from {} for {}", filenameValue, originalFlowFile, e);
|
||||
session.transfer(session.penalize(originalFlowFile), REL_RETRY);
|
||||
context.yield();
|
||||
} catch (final Throwable t) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t});
|
||||
getLogger().error("Routing to failure since failed to retrieve content from {} for {}", filenameValue, originalFlowFile, t);
|
||||
final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage() == null ? t.toString() : t.getMessage());
|
||||
session.transfer(failureFlowFile, REL_FAILURE);
|
||||
}
|
||||
|
|
|
@ -298,7 +298,7 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
|||
// if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
|
||||
if (destinationOrTempExists && !shouldOverwrite) {
|
||||
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
|
||||
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
|
||||
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", putFlowFile);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -353,8 +353,7 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
|||
// If destination file already exists, resolve that based on processor configuration
|
||||
if (destinationExists && shouldOverwrite) {
|
||||
if (fileSystem.delete(destFile, false)) {
|
||||
getLogger().info("deleted {} in order to replace with the contents of {}",
|
||||
new Object[]{destFile, putFlowFile});
|
||||
getLogger().info("deleted {} in order to replace with the contents of {}", destFile, putFlowFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -362,7 +361,7 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
|||
rename(fileSystem, tempFile, destFile);
|
||||
changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
|
||||
|
||||
getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
|
||||
getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", putFlowFile, destFile, millis, dataRate);
|
||||
|
||||
putFlowFile = postProcess(context, session, putFlowFile, destFile);
|
||||
|
||||
|
@ -384,12 +383,12 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
|||
|
||||
} catch (IOException | FlowFileAccessException e) {
|
||||
deleteQuietly(fileSystem, tempDotCopyFile);
|
||||
getLogger().error("Failed to write due to {}", new Object[]{e});
|
||||
getLogger().error("Failed to write", e);
|
||||
session.transfer(session.penalize(putFlowFile), REL_RETRY);
|
||||
context.yield();
|
||||
} catch (Throwable t) {
|
||||
deleteQuietly(fileSystem, tempDotCopyFile);
|
||||
getLogger().error("Failed to write due to {}", new Object[]{t});
|
||||
getLogger().error("Failed to write", t);
|
||||
session.transfer(putFlowFile, REL_FAILURE);
|
||||
}
|
||||
|
||||
|
@ -449,7 +448,7 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
|||
try {
|
||||
fileSystem.delete(file, false);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Unable to remove file {} due to {}", new Object[]{file, e});
|
||||
getLogger().error("Unable to remove file {}", file, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -469,7 +468,7 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
|||
fileSystem.setOwner(path, remoteOwner, remoteGroup);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Could not change owner or group of {} on due to {}", new Object[]{path, e});
|
||||
getLogger().warn("Could not change owner or group of {} on", path, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
|||
try {
|
||||
value = ctx.read(jsonPath);
|
||||
} catch (final PathNotFoundException pnfe) {
|
||||
logger.debug("Evaluated JSONPath Expression {} but the path was not found; will use a null value", new Object[] {entry.getValue()});
|
||||
logger.debug("Evaluated JSONPath Expression {} but the path was not found; will use a null value", entry.getValue());
|
||||
value = null;
|
||||
}
|
||||
|
||||
|
|
|
@ -87,14 +87,14 @@ public class SocketChannelRecordReaderDispatcher implements Runnable, Closeable
|
|||
if (currentConnections.incrementAndGet() > maxConnections) {
|
||||
currentConnections.decrementAndGet();
|
||||
final String remoteAddress = remoteSocketAddress == null ? "null" : remoteSocketAddress.toString();
|
||||
logger.warn("Rejecting connection from {} because max connections has been met", new Object[]{remoteAddress});
|
||||
logger.warn("Rejecting connection from {} because max connections has been met", remoteAddress);
|
||||
IOUtils.closeQuietly(socketChannel);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
final String remoteAddress = remoteSocketAddress == null ? "null" : remoteSocketAddress.toString();
|
||||
logger.debug("Accepted connection from {}", new Object[]{remoteAddress});
|
||||
logger.debug("Accepted connection from {}", remoteAddress);
|
||||
}
|
||||
|
||||
// create a StandardSocketChannelRecordReader or an SSLSocketChannelRecordReader based on presence of SSLContext
|
||||
|
|
|
@ -181,11 +181,11 @@ public class ProvenanceEventConsumer {
|
|||
if (currMaxId < (firstEventId - 1)) {
|
||||
if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) {
|
||||
logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
|
||||
"ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
|
||||
"ids. Restarting querying from the beginning.", currMaxId, firstEventId);
|
||||
firstEventId = -1;
|
||||
} else {
|
||||
logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
|
||||
"ids. Restarting querying from the latest event in the Provenance Repository.", new Object[]{currMaxId, firstEventId});
|
||||
"ids. Restarting querying from the latest event in the Provenance Repository.", currMaxId, firstEventId);
|
||||
firstEventId = currMaxId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -134,7 +134,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
|
|||
subscriber = getSubscriber(context);
|
||||
} catch (IOException e) {
|
||||
storedException.set(e);
|
||||
getLogger().error("Failed to create Google Cloud Subscriber due to {}", new Object[]{e});
|
||||
getLogger().error("Failed to create Google Cloud Subscriber", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -444,7 +444,7 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
|
|||
publisher.shutdown();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher due to {}", new Object[]{e});
|
||||
getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -132,6 +132,6 @@ public class DeleteGCSObject extends AbstractGCSProcessor {
|
|||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
getLogger().info("Successfully deleted GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});
|
||||
getLogger().info("Successfully deleted GCS Object for {} in {} millis; routing to success", flowFile, millis);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -303,7 +303,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
|
|||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
getLogger().info("Successfully retrieved GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});
|
||||
getLogger().info("Successfully retrieved GCS Object for {} in {} millis; routing to success", flowFile, millis);
|
||||
|
||||
final String transitUri = getTransitUri(storage.getOptions().getHost(), bucketName, key);
|
||||
session.getProvenanceReporter().fetch(flowFile, transitUri, millis);
|
||||
|
@ -328,7 +328,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
|
|||
}
|
||||
if (rangeStart > 0 && rangeStart >= blob.getSize()) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Start position: {}, blob size: {}", new Object[] {rangeStart, blob.getSize()});
|
||||
getLogger().debug("Start position: {}, blob size: {}", rangeStart, blob.getSize());
|
||||
}
|
||||
throw new StorageException(416, "The range specified is not valid for the blob " + blob.getBlobId()
|
||||
+ ". Range Start is beyond the end of the blob.");
|
||||
|
|
|
@ -534,8 +534,7 @@ public class PutGCSObject extends AbstractGCSProcessor {
|
|||
|
||||
final String transitUri = getTransitUri(storage.getOptions().getHost(), bucket, key);
|
||||
session.getProvenanceReporter().send(flowFile, transitUri, millis);
|
||||
getLogger().info("Successfully put {} to Google Cloud Storage in {} milliseconds",
|
||||
new Object[]{ff, millis});
|
||||
getLogger().info("Successfully put {} to Google Cloud Storage in {} milliseconds", ff, millis);
|
||||
|
||||
} catch (final ProcessException | StorageException | IOException e) {
|
||||
getLogger().error("Failed to put {} to Google Cloud Storage due to {}", flowFile, e.getMessage(), e);
|
||||
|
|
|
@ -239,7 +239,7 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
|
|||
|
||||
dynamicPropertyMap.putAll(input.getAttributes());
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Dynamic Properties: {}", new Object[]{dynamicPropertyMap});
|
||||
getLogger().debug("Dynamic Properties: {}", dynamicPropertyMap);
|
||||
}
|
||||
List<Map<String, Object>> graphResponses = new ArrayList<>(executeQuery(recordScript, dynamicPropertyMap));
|
||||
|
||||
|
|
|
@ -238,8 +238,7 @@ public class Neo4JCypherClientService extends AbstractControllerService implemen
|
|||
getLogger().error("Error while getting connection " + e.getLocalizedMessage(), e);
|
||||
throw new ProcessException("Error while getting connection" + e.getLocalizedMessage(), e);
|
||||
}
|
||||
getLogger().info("Neo4JCypherExecutor connection created for url {}",
|
||||
new Object[] {connectionUrl});
|
||||
getLogger().info("Neo4JCypherExecutor connection created for url {}", connectionUrl);
|
||||
}
|
||||
|
||||
@OnDisabled
|
||||
|
|
|
@ -226,11 +226,11 @@ public class HDFSExternalResourceProvider implements ExternalResourceProvider {
|
|||
fs = getFileSystemAsUser(config, ugi);
|
||||
}
|
||||
|
||||
LOGGER.debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser});
|
||||
LOGGER.debug("resetHDFSResources UGI [{}], KerberosUser [{}]", ugi, kerberosUser);
|
||||
|
||||
final Path workingDir = fs.getWorkingDirectory();
|
||||
LOGGER.debug("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
|
||||
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
|
||||
workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config);
|
||||
|
||||
if (!fs.exists(sourceDirectory)) {
|
||||
throw new IllegalArgumentException("Source directory is not existing");
|
||||
|
|
|
@ -137,9 +137,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
|
|||
packagingFormat = ZIP_FORMAT;
|
||||
break;
|
||||
default:
|
||||
getLogger().warn(
|
||||
"Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked",
|
||||
new Object[]{flowFile, mimeType});
|
||||
getLogger().warn("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked", flowFile, mimeType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -180,7 +178,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
|
|||
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, configuration, compressionType, codec);
|
||||
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
|
||||
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
|
||||
getLogger().info("Transferred flowfile {} to {}", flowFile, RELATIONSHIP_SUCCESS);
|
||||
} catch (ProcessException e) {
|
||||
getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", flowFile, e);
|
||||
session.transfer(flowFile, RELATIONSHIP_FAILURE);
|
||||
|
|
|
@ -173,7 +173,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
|
|||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
fileSystem.delete(path, isRecursive(context, session));
|
||||
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
|
||||
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", originalFlowFile, path.getParent(), path.getName());
|
||||
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
|
||||
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
|
||||
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
|
||||
|
|
|
@ -133,7 +133,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
|
|||
try {
|
||||
path = getNormalizedPath(getPath(context, flowFile));
|
||||
} catch (IllegalArgumentException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
|
||||
getLogger().error("Failed to retrieve content from {} for {}", filenameValue, flowFile, e);
|
||||
flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, getFailureRelationship());
|
||||
|
@ -181,7 +181,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
|
|||
session.getProvenanceReporter().fetch(outgoingFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
|
||||
session.transfer(outgoingFlowFile, getSuccessRelationship());
|
||||
} catch (final FileNotFoundException | AccessControlException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{qualifiedPath, outgoingFlowFile, e});
|
||||
getLogger().error("Failed to retrieve content from {} for {}", qualifiedPath, outgoingFlowFile, e);
|
||||
outgoingFlowFile = session.putAttribute(outgoingFlowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
|
||||
outgoingFlowFile = session.penalize(outgoingFlowFile);
|
||||
session.transfer(outgoingFlowFile, getFailureRelationship());
|
||||
|
|
|
@ -286,7 +286,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Could not add to processing queue due to {}", new Object[]{e});
|
||||
getLogger().warn("Could not add to processing queue", e);
|
||||
} finally {
|
||||
queueLock.unlock();
|
||||
}
|
||||
|
@ -433,7 +433,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
final boolean directoryExists = getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(directoryPath));
|
||||
if (!directoryExists) {
|
||||
context.yield();
|
||||
getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
|
||||
getLogger().warn("The directory {} does not exist.", directoryPath);
|
||||
} else {
|
||||
// get listing
|
||||
listing = selectFiles(hdfs, directoryPath, null);
|
||||
|
|
|
@ -330,7 +330,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
|
|||
} catch (final Exception e) {
|
||||
// Catch GSSExceptions and reset the resources
|
||||
if (!handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
|
||||
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[]{e});
|
||||
getLogger().error("Failed to perform listing of HDFS", e);
|
||||
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
|
||||
session.transfer(ff, REL_FAILURE);
|
||||
}
|
||||
|
|
|
@ -128,9 +128,8 @@ public class GetHDFSSequenceFile extends GetHDFS {
|
|||
if (totalSize > 0) {
|
||||
final String dataRate = stopWatch.calculateDataRate(totalSize);
|
||||
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||
logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", new Object[]{
|
||||
flowFiles.size(), file.toUri().toASCIIString(), millis, dataRate});
|
||||
logger.info("Transferred flowFiles {} to success", new Object[]{flowFiles});
|
||||
logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", flowFiles.size(), file.toUri().toASCIIString(), millis, dataRate);
|
||||
logger.info("Transferred flowFiles {} to success", flowFiles);
|
||||
session.transfer(flowFiles, REL_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class KeyValueReader implements SequenceFileReader<Set<FlowFile>> {
|
|||
final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
|
||||
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
|
||||
int counter = 0;
|
||||
LOG.debug("Read from SequenceFile: {} ", new Object[]{file});
|
||||
LOG.debug("Read from SequenceFile: {}", file);
|
||||
try {
|
||||
while (reader.next(key)) {
|
||||
String fileName = key.toString();
|
||||
|
@ -89,7 +89,7 @@ public class KeyValueReader implements SequenceFileReader<Set<FlowFile>> {
|
|||
flowFile = session.write(flowFile, callback);
|
||||
flowFiles.add(flowFile);
|
||||
} catch (ProcessException e) {
|
||||
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
|
||||
LOG.error("Could not write to flowfile {}", flowFile, e);
|
||||
session.remove(flowFile);
|
||||
}
|
||||
key.clear();
|
||||
|
|
|
@ -306,7 +306,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
|
|||
}
|
||||
} catch (IOException e) {
|
||||
context.yield();
|
||||
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
|
||||
getLogger().warn("Error while retrieving list of files", e);
|
||||
return;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -446,7 +446,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
|
|||
if (causeOptional.isPresent()) {
|
||||
throw new UncheckedIOException(new IOException(causeOptional.get()));
|
||||
}
|
||||
getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t});
|
||||
getLogger().error("Failed to rename on HDFS", t);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
context.yield();
|
||||
}
|
||||
|
|
|
@ -513,10 +513,10 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
try {
|
||||
hdfs.delete(tempDotCopyFile, false);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
|
||||
getLogger().error("Unable to remove temporary file {}", tempDotCopyFile, e);
|
||||
}
|
||||
}
|
||||
getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
|
||||
getLogger().error("Failed to write to HDFS", t);
|
||||
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
|
||||
context.yield();
|
||||
}
|
||||
|
@ -593,7 +593,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
hdfs.setOwner(name, owner, group);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
|
||||
getLogger().warn("Could not change owner or group of {} on HDFS", name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class TarUnpackerSequenceFileWriter extends SequenceFileWriterImpl {
|
|||
final long fileSize = tarEntry.getSize();
|
||||
final InputStreamWritable inStreamWritable = new InputStreamWritable(tarIn, (int) fileSize);
|
||||
writer.append(new Text(key), inStreamWritable);
|
||||
logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
|
||||
logger.debug("Appending FlowFile {} to Sequence File", key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ public class ValueReader implements SequenceFileReader<Set<FlowFile>> {
|
|||
final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
|
||||
final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
|
||||
int counter = 0;
|
||||
LOG.debug("Reading from sequence file {}", new Object[]{file});
|
||||
LOG.debug("Reading from sequence file {}", file);
|
||||
final OutputStreamWritableCallback writer = new OutputStreamWritableCallback(reader);
|
||||
Text key = new Text();
|
||||
try {
|
||||
|
@ -85,7 +85,7 @@ public class ValueReader implements SequenceFileReader<Set<FlowFile>> {
|
|||
flowFile = session.write(flowFile, writer);
|
||||
flowFiles.add(flowFile);
|
||||
} catch (ProcessException e) {
|
||||
LOG.error("Could not write to flowfile {}", new Object[]{flowFile}, e);
|
||||
LOG.error("Could not write to flowfile {}", flowFile, e);
|
||||
session.remove(flowFile);
|
||||
}
|
||||
key.clear();
|
||||
|
|
|
@ -48,7 +48,7 @@ public class ZipUnpackerSequenceFileWriter extends SequenceFileWriterImpl {
|
|||
long fileSize = zipEntry.getSize();
|
||||
final InputStreamWritable inStreamWritable = new InputStreamWritable(zipIn, (int) fileSize);
|
||||
writer.append(new Text(key), inStreamWritable);
|
||||
logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
|
||||
logger.debug("Appending FlowFile {} to Sequence File", key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -193,7 +193,7 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
|
|||
List<FlowFile> flowFiles = new ArrayList<>(eventBatch.getEvents().length);
|
||||
for (Event e : eventBatch.getEvents()) {
|
||||
if (toProcessEvent(context, e)) {
|
||||
getLogger().debug("Creating flow file for event: {}.", new Object[]{e});
|
||||
getLogger().debug("Creating flow file for event: {}.", e);
|
||||
final String path = getPath(e);
|
||||
|
||||
FlowFile flowFile = session.create();
|
||||
|
@ -214,7 +214,7 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
|
|||
for (FlowFile flowFile : flowFiles) {
|
||||
final String path = flowFile.getAttribute(EventAttributes.EVENT_PATH);
|
||||
final String transitUri = path.startsWith("/") ? "hdfs:/" + path : "hdfs://" + path;
|
||||
getLogger().debug("Transferring flow file {} and creating provenance event with URI {}.", new Object[]{flowFile, transitUri});
|
||||
getLogger().debug("Transferring flow file {} and creating provenance event with URI {}.", flowFile, transitUri);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().receive(flowFile, transitUri);
|
||||
}
|
||||
|
@ -223,7 +223,7 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
|
|||
lastTxId = eventBatch.getTxid();
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
getLogger().error("Unable to get notification information: {}", new Object[]{e});
|
||||
getLogger().error("Unable to get notification information", e);
|
||||
context.yield();
|
||||
return;
|
||||
} catch (MissingEventsException e) {
|
||||
|
@ -231,7 +231,7 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
|
|||
// org.apache.hadoop.hdfs.client.HdfsAdmin#getInotifyEventStrea API. It suggests tuning a couple parameters if this API is used.
|
||||
lastTxId = -1L;
|
||||
getLogger().error("Unable to get notification information. Setting transaction id to -1. This may cause some events to get missed. " +
|
||||
"Please see javadoc for org.apache.hadoop.hdfs.client.HdfsAdmin#getInotifyEventStream: {}", new Object[]{e});
|
||||
"Please see javadoc for org.apache.hadoop.hdfs.client.HdfsAdmin#getInotifyEventStream", e);
|
||||
}
|
||||
|
||||
updateClusterStateForTxId(session);
|
||||
|
@ -250,7 +250,7 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
|
|||
getLogger().debug("Failed to poll for event batch. Reached max retry times.", e);
|
||||
throw e;
|
||||
} else {
|
||||
getLogger().debug("Attempt {} failed to poll for event batch. Retrying.", new Object[]{i});
|
||||
getLogger().debug("Attempt {} failed to poll for event batch. Retrying.", i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -193,14 +193,14 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
|||
session.transfer(flowFile, REL_FAILURE);
|
||||
} else if (!putFlowFile.isValid()) {
|
||||
if (StringUtils.isBlank(putFlowFile.getTableName())) {
|
||||
getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile});
|
||||
getLogger().error("Missing table name for FlowFile {}; routing to failure", flowFile);
|
||||
} else if (null == putFlowFile.getRow()) {
|
||||
getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile});
|
||||
getLogger().error("Missing row id for FlowFile {}; routing to failure", flowFile);
|
||||
} else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) {
|
||||
getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile});
|
||||
getLogger().error("No columns provided for FlowFile {}; routing to failure", flowFile);
|
||||
} else {
|
||||
// really shouldn't get here, but just in case
|
||||
getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile});
|
||||
getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", flowFile);
|
||||
}
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} else {
|
||||
|
@ -213,7 +213,7 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]{flowFiles.size(), tablePuts.size()});
|
||||
getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", flowFiles.size(), tablePuts.size());
|
||||
|
||||
final long start = System.nanoTime();
|
||||
final List<PutFlowFile> successes = new ArrayList<>();
|
||||
|
@ -228,7 +228,7 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
|||
} catch (final Exception e) {
|
||||
getLogger().error(e.getMessage(), e);
|
||||
for (PutFlowFile putFlowFile : entry.getValue()) {
|
||||
getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
|
||||
getLogger().error("Failed to send {} to HBase ", putFlowFile.getFlowFile(), e);
|
||||
final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
|
||||
session.transfer(failure, REL_FAILURE);
|
||||
}
|
||||
|
@ -236,7 +236,7 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
|||
}
|
||||
|
||||
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]{successes.size(), sendMillis});
|
||||
getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", successes.size(), sendMillis);
|
||||
|
||||
for (PutFlowFile putFlowFile : successes) {
|
||||
session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
|
||||
|
|
|
@ -236,14 +236,14 @@ public class FetchHBaseRow extends AbstractProcessor implements VisibilityFetchS
|
|||
|
||||
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (StringUtils.isBlank(tableName)) {
|
||||
getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile});
|
||||
getLogger().error("Table Name is blank or null for {}, transferring to failure", flowFile);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (StringUtils.isBlank(rowId)) {
|
||||
getLogger().error("Row Identifier is blank or null for {}, transferring to failure", new Object[] {flowFile});
|
||||
getLogger().error("Row Identifier is blank or null for {}, transferring to failure", flowFile);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -265,20 +265,20 @@ public class FetchHBaseRow extends AbstractProcessor implements VisibilityFetchS
|
|||
try {
|
||||
hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, columns, authorizations, handler);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Unable to fetch row {} from {} due to {}", new Object[] {rowId, tableName, e});
|
||||
getLogger().error("Unable to fetch row {} from {}", rowId, tableName, e);
|
||||
session.transfer(handler.getFlowFile(), REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
FlowFile handlerFlowFile = handler.getFlowFile();
|
||||
if (!handler.handledRow()) {
|
||||
getLogger().debug("Row {} not found in {}, transferring to not found", new Object[] {rowId, tableName});
|
||||
getLogger().debug("Row {} not found in {}, transferring to not found", rowId, tableName);
|
||||
session.transfer(handlerFlowFile, REL_NOT_FOUND);
|
||||
return;
|
||||
}
|
||||
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Fetched {} from {} with row id {}", new Object[]{handlerFlowFile, tableName, rowId});
|
||||
getLogger().debug("Fetched {} from {} with row id {}", handlerFlowFile, tableName, rowId);
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
|
|
@ -285,7 +285,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
|
|||
if (allSeen) {
|
||||
// we have already seen all of the cells for this row. We do not want to
|
||||
// include this cell in our output.
|
||||
getLogger().debug("all cells for row {} have already been seen", new Object[] {rowKeyString});
|
||||
getLogger().debug("all cells for row {} have already been seen", rowKeyString);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -324,7 +324,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
|
|||
|
||||
session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
|
||||
getLogger().debug("Received {} from HBase with row key {}", flowFile, rowKeyString);
|
||||
|
||||
// we could potentially have a huge number of rows. If we get to 500, go ahead and commit the
|
||||
// session so that we can avoid buffering tons of FlowFiles without ever sending any out.
|
||||
|
|
|
@ -196,7 +196,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
|||
final JsonNode rootNode = rootNodeRef.get();
|
||||
|
||||
if (rootNode.isArray()) {
|
||||
getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", new Object[]{flowFile});
|
||||
getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", flowFile);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -211,7 +211,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
|||
|
||||
final JsonNode fieldNode = rootNode.get(fieldName);
|
||||
if (fieldNode.isNull()) {
|
||||
getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
|
||||
getLogger().debug("Skipping {} because value was null", fieldName);
|
||||
} else if (fieldNode.isValueNode()) {
|
||||
// for a value node we need to determine if we are storing the bytes of a string, or the bytes of actual types
|
||||
if (STRING_ENCODING_VALUE.equals(fieldEncodingStrategy)) {
|
||||
|
@ -224,10 +224,10 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
|||
// for non-null, non-value nodes, determine what to do based on the handling strategy
|
||||
switch (complexFieldStrategy) {
|
||||
case FAIL_VALUE:
|
||||
getLogger().error("Complex value found for {}; routing to failure", new Object[]{fieldName});
|
||||
getLogger().error("Complex value found for {}; routing to failure", fieldName);
|
||||
return null;
|
||||
case WARN_VALUE:
|
||||
getLogger().warn("Complex value found for {}; skipping", new Object[]{fieldName});
|
||||
getLogger().warn("Complex value found for {}; skipping", fieldName);
|
||||
break;
|
||||
case TEXT_VALUE:
|
||||
// use toString() here because asText() is only guaranteed to be supported on value nodes
|
||||
|
@ -266,7 +266,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
|||
// log an error message so the user can see what the field names were and return null so it gets routed to failure
|
||||
if (extractRowId && rowIdHolder.get() == null) {
|
||||
final String fieldNameStr = StringUtils.join(rootNode.fieldNames(), ",");
|
||||
getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", new Object[] {rowFieldName, fieldNameStr});
|
||||
getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", rowFieldName, fieldNameStr);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -361,10 +361,10 @@ public class PutHBaseRecord extends AbstractPutHBase {
|
|||
private byte[] handleComplexField(Record record, String field, String complexFieldStrategy) throws PutCreationFailedInvokedException {
|
||||
switch (complexFieldStrategy) {
|
||||
case FAIL_VALUE:
|
||||
getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
|
||||
getLogger().error("Complex value found for {}; routing to failure", field);
|
||||
throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
|
||||
case WARN_VALUE:
|
||||
getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
|
||||
getLogger().warn("Complex value found for {}; skipping", field);
|
||||
return null;
|
||||
case TEXT_VALUE:
|
||||
final String value = record.getAsString(field);
|
||||
|
|
|
@ -334,7 +334,7 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
|
|||
try {
|
||||
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (StringUtils.isBlank(tableName)) {
|
||||
getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile});
|
||||
getLogger().error("Table Name is blank or null for {}, transferring to failure", flowFile);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -369,11 +369,11 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
|
|||
}
|
||||
|
||||
if (timerangeMin == null && timerangeMax != null) {
|
||||
getLogger().error("Time range min value cannot be blank when max value provided for {}, transferring to failure", new Object[] {flowFile});
|
||||
getLogger().error("Time range min value cannot be blank when max value provided for {}, transferring to failure", flowFile);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
return;
|
||||
} else if (timerangeMin != null && timerangeMax == null) {
|
||||
getLogger().error("Time range max value cannot be blank when min value provided for {}, transferring to failure", new Object[] {flowFile});
|
||||
getLogger().error("Time range max value cannot be blank when min value provided for {}, transferring to failure", flowFile);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -408,7 +408,7 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
|
|||
if (handler.getFlowFile() != null) {
|
||||
session.remove(handler.getFlowFile());
|
||||
}
|
||||
getLogger().error("Unable to fetch rows from HBase table {} due to {}", new Object[] {tableName, e});
|
||||
getLogger().error("Unable to fetch rows from HBase table {}", tableName, e);
|
||||
flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny()));
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
|
|
|
@ -196,9 +196,9 @@ public class ExtractHL7Attributes extends AbstractProcessor {
|
|||
final Message message = parser.parse(hl7Text);
|
||||
final Map<String, String> attributes = getAttributes(message, useSegmentNames, parseSegmentFields);
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
getLogger().debug("Added the following attributes for {}: {}", new Object[]{flowFile, attributes});
|
||||
getLogger().debug("Added the following attributes for {}: {}", flowFile, attributes);
|
||||
} catch (final HL7Exception e) {
|
||||
getLogger().error("Failed to extract attributes from {} due to {}", new Object[]{flowFile, e});
|
||||
getLogger().error("Failed to extract attributes from {}", flowFile, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -169,7 +169,7 @@ public class RouteHL7 extends AbstractProcessor {
|
|||
final Message hapiMessage = parser.parse(hl7Text);
|
||||
message = new HapiMessage(hapiMessage);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to parse {} as HL7 due to {}; routing to failure", new Object[]{flowFile, e});
|
||||
getLogger().error("Routing to failure since failed to parse {} as HL7", flowFile, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -191,7 +191,7 @@ public class RouteHL7 extends AbstractProcessor {
|
|||
}
|
||||
|
||||
session.transfer(flowFile, REL_ORIGINAL);
|
||||
getLogger().info("Routed a copy of {} to {} relationships: {}", new Object[]{flowFile, matchingRels.size(), matchingRels});
|
||||
getLogger().info("Routed a copy of {} to {} relationships: {}", flowFile, matchingRels.size(), matchingRels);
|
||||
}
|
||||
|
||||
private static class HL7QueryValidator implements Validator {
|
||||
|
|
|
@ -71,12 +71,12 @@ public class JndiJmsConnectionFactoryHandler extends CachedJMSConnectionFactoryH
|
|||
private ConnectionFactory lookupConnectionFactory() {
|
||||
try {
|
||||
final String factoryName = context.getProperty(JNDI_CONNECTION_FACTORY_NAME).evaluateAttributeExpressions().getValue().trim();
|
||||
logger.debug("Looking up Connection Factory with name [{}]", new Object[] {factoryName});
|
||||
logger.debug("Looking up Connection Factory with name [{}]", factoryName);
|
||||
|
||||
final Context initialContext = createInitialContext();
|
||||
final Object factoryObject = initialContext.lookup(factoryName);
|
||||
|
||||
logger.debug("Obtained {} from JNDI", new Object[] {factoryObject});
|
||||
logger.debug("Obtained {} from JNDI", factoryObject);
|
||||
|
||||
if (factoryObject == null) {
|
||||
throw new ProcessException("Got a null Factory Object from JNDI");
|
||||
|
@ -114,7 +114,7 @@ public class JndiJmsConnectionFactoryHandler extends CachedJMSConnectionFactoryH
|
|||
}
|
||||
});
|
||||
|
||||
logger.debug("Creating Initial Context using JNDI Environment {}", new Object[] {env});
|
||||
logger.debug("Creating Initial Context using JNDI Environment {}", env);
|
||||
|
||||
final Context initialContext = new InitialContext(env);
|
||||
return initialContext;
|
||||
|
|
|
@ -42,7 +42,7 @@ class JMSPublisher extends JMSWorker {
|
|||
|
||||
JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) {
|
||||
super(connectionFactory, jmsTemplate, processLog);
|
||||
processLog.debug("Created Message Publisher for {}", new Object[] {jmsTemplate});
|
||||
processLog.debug("Created Message Publisher for {}", jmsTemplate);
|
||||
}
|
||||
|
||||
void publish(String destinationName, byte[] messageBytes) {
|
||||
|
@ -140,7 +140,7 @@ class JMSPublisher extends JMSWorker {
|
|||
}
|
||||
|
||||
private void logUnbuildableDestination(String destinationName, String headerName) {
|
||||
this.processLog.warn("Failed to determine destination type from destination name '{}'. The '{}' header will not be set.", new Object[] {destinationName, headerName});
|
||||
this.processLog.warn("Failed to determine destination type from destination name '{}'. The '{}' header will not be set.", destinationName, headerName);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ConsumerPartitionsUtil {
|
|||
return null;
|
||||
}
|
||||
|
||||
logger.info("Found the following mapping of hosts to partitions: {}", new Object[] {hostnameToPartitionString});
|
||||
logger.info("Found the following mapping of hosts to partitions: {}", hostnameToPartitionString);
|
||||
|
||||
// Determine the partitions based on hostname/IP.
|
||||
int[] partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
|
||||
|
|
|
@ -263,14 +263,14 @@ public class YandexTranslate extends AbstractProcessor {
|
|||
try {
|
||||
response = invocation.invoke();
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to make request to Yandex to transate text for {} due to {}; routing to comms.failure", new Object[]{flowFile, e});
|
||||
getLogger().error("Routing to {} since failed to make request to Yandex to translate text for {}", REL_COMMS_FAILURE, flowFile, e);
|
||||
session.transfer(flowFile, REL_COMMS_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
|
||||
getLogger().error("Failed to translate text using Yandex for {}; response was {}: {}; routing to {}", new Object[]{
|
||||
flowFile, response.getStatus(), response.getStatusInfo().getReasonPhrase(), REL_TRANSLATION_FAILED.getName()});
|
||||
getLogger().error("Failed to translate text using Yandex for {}; response was {}: {}; routing to {}", flowFile,
|
||||
response.getStatus(), response.getStatusInfo().getReasonPhrase(), REL_TRANSLATION_FAILED.getName());
|
||||
flowFile = session.putAttribute(flowFile, "yandex.translate.failure.reason", response.getStatusInfo().getReasonPhrase());
|
||||
session.transfer(flowFile, REL_TRANSLATION_FAILED);
|
||||
return;
|
||||
|
|
|
@ -142,7 +142,7 @@ public class ExtractImageMetadata extends AbstractProcessor {
|
|||
|
||||
session.transfer(flowfile, SUCCESS);
|
||||
} catch (ProcessException e) {
|
||||
logger.error("Failed to extract image metadata from {} due to {}", new Object[]{flowfile, e});
|
||||
logger.error("Failed to extract image metadata from {}", flowfile, e);
|
||||
session.transfer(flowfile, FAILURE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -201,7 +201,7 @@ public class ExtractMediaMetadata extends AbstractProcessor {
|
|||
session.transfer(flowFile, SUCCESS);
|
||||
session.getProvenanceReporter().modifyAttributes(flowFile, "media attributes extracted");
|
||||
} catch (ProcessException e) {
|
||||
logger.error("Failed to extract media metadata from {} due to {}", new Object[]{flowFile, e});
|
||||
logger.error("Failed to extract media metadata from {}", flowFile, e);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, FAILURE);
|
||||
}
|
||||
|
|
|
@ -224,7 +224,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
|
|||
writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
|
||||
batch = new ArrayList<>();
|
||||
} catch (Exception e) {
|
||||
logger.error("Error building batch due to {}", new Object[] {e});
|
||||
logger.error("Error building batch", e);
|
||||
}
|
||||
}
|
||||
sent++;
|
||||
|
@ -234,7 +234,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
|
|||
try {
|
||||
writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error building batch due to {}", new Object[] {e});
|
||||
logger.error("Error building batch", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -207,7 +207,7 @@ public class PutMongo extends AbstractMongoProcessor {
|
|||
|
||||
if (MODE_INSERT.equalsIgnoreCase(mode)) {
|
||||
collection.insertOne((Document) doc);
|
||||
logger.info("inserted {} into MongoDB", new Object[] {flowFile});
|
||||
logger.info("inserted {} into MongoDB", flowFile);
|
||||
} else {
|
||||
// update
|
||||
final boolean upsert = context.getProperty(UPSERT).asBoolean();
|
||||
|
@ -229,7 +229,7 @@ public class PutMongo extends AbstractMongoProcessor {
|
|||
update.remove(updateKey);
|
||||
collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
|
||||
}
|
||||
logger.info("updated {} into MongoDB", new Object[] {flowFile});
|
||||
logger.info("updated {} into MongoDB", flowFile);
|
||||
}
|
||||
|
||||
session.getProvenanceReporter().send(flowFile, getURI(context));
|
||||
|
|
|
@ -269,7 +269,7 @@ public class PutMongoRecord extends AbstractMongoProcessor {
|
|||
if (!error) {
|
||||
session.getProvenanceReporter().send(flowFile, clientService.getURI(), String.format("Written %d documents to MongoDB.", written));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
getLogger().info("Written {} records into MongoDB", new Object[]{written});
|
||||
getLogger().info("Written {} records into MongoDB", written);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,10 +125,10 @@ public class ParseNetflowv5 extends AbstractProcessor {
|
|||
try {
|
||||
processedRecord = parser.parse(buffer);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Parsed {} records from the packet", new Object[] {processedRecord});
|
||||
logger.debug("Parsed {} records from the packet", processedRecord);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] {e, flowFile});
|
||||
logger.error("Routing to failure since while processing {}, parser returned unexpected Exception", flowFile, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ public class ParseNetflowv5 extends AbstractProcessor {
|
|||
session.adjustCounter("Records Processed", processedRecord, false);
|
||||
} catch (Exception e) {
|
||||
// The flowfile has failed parsing & validation, routing to failure
|
||||
logger.error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] {flowFile, e});
|
||||
logger.error("Routing to failure since failed to parse {} as a netflowv5 message", flowFile, e);
|
||||
|
||||
// Create a provenance event recording the routing to failure
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
|
|
|
@ -180,7 +180,7 @@ public class ConvertAvroToParquet extends AbstractProcessor {
|
|||
session.getProvenanceReporter().modifyContent(putFlowFile, "Converted " + totalRecordCount.get() + " records", System.currentTimeMillis() - startTime);
|
||||
|
||||
} catch (final ProcessException pe) {
|
||||
getLogger().error("Failed to convert {} from Avro to Parquet due to {}; transferring to failure", new Object[]{flowFile, pe});
|
||||
getLogger().error("Transferring to failure since failed to convert {} from Avro to Parquet", flowFile, pe);
|
||||
session.transfer(flowFile, FAILURE);
|
||||
}
|
||||
|
||||
|
|
|
@ -220,7 +220,7 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
|
|||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final long transformCount = counts.getRecordCount() - counts.getDroppedCount();
|
||||
getLogger().info("Successfully transformed {} Records and dropped {} Records for {}", new Object[] {transformCount, counts.getDroppedCount(), flowFile});
|
||||
getLogger().info("Successfully transformed {} Records and dropped {} Records for {}", transformCount, counts.getDroppedCount(), flowFile);
|
||||
session.adjustCounter("Records Transformed", transformCount, true);
|
||||
session.adjustCounter("Records Dropped", counts.getDroppedCount(), true);
|
||||
|
||||
|
@ -246,7 +246,7 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
|
|||
|
||||
// If a null value was returned, drop the Record
|
||||
if (returnValue == null) {
|
||||
getLogger().trace("Script returned null for Record {} [{}] so will drop Record from {}", new Object[]{index, inputRecord, flowFile});
|
||||
getLogger().trace("Script returned null for Record {} [{}] so will drop Record from {}", index, inputRecord, flowFile);
|
||||
counts.incrementDroppedCount();
|
||||
return;
|
||||
}
|
||||
|
@ -254,7 +254,7 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
|
|||
// If a single Record was returned, write it out
|
||||
if (returnValue instanceof Record) {
|
||||
final Record transformedRecord = (Record) returnValue;
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[]{index, inputRecord, transformedRecord, flowFile});
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", index, inputRecord, transformedRecord, flowFile);
|
||||
recordWriteAction.write(transformedRecord);
|
||||
return;
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
|
|||
// If a Collection was returned, ensure that every element in the collection is a Record and write them out
|
||||
if (returnValue instanceof Collection) {
|
||||
final Collection<?> collection = (Collection<?>) returnValue;
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[]{index, inputRecord, collection, flowFile});
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", index, inputRecord, collection, flowFile);
|
||||
|
||||
for (final Object element : collection) {
|
||||
if (!(element instanceof Record)) {
|
||||
|
|
|
@ -214,7 +214,7 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
|
|||
transaction.complete();
|
||||
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
|
||||
getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", transferMillis, transactionId);
|
||||
} catch (final Exception e) {
|
||||
if (transaction != null) {
|
||||
transaction.error();
|
||||
|
|
|
@ -554,7 +554,7 @@ public class GetSmbFile extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not establish smb connection because of error {}", new Object[]{e});
|
||||
logger.error("Could not establish smb connection", e);
|
||||
context.yield();
|
||||
smbClient.getServerList().unregister(hostname);
|
||||
}
|
||||
|
|
|
@ -271,10 +271,10 @@ public class PutSmbFile extends AbstractProcessor {
|
|||
Collections.reverse(paths);
|
||||
for (String path : paths) {
|
||||
if (!share.folderExists(path)) {
|
||||
logger.debug("Creating folder {}", new Object[]{path});
|
||||
logger.debug("Creating folder {}", path);
|
||||
share.mkdir(path);
|
||||
} else {
|
||||
logger.debug("Folder already exists {}. Moving on", new Object[]{path});
|
||||
logger.debug("Folder already exists {}. Moving on", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -287,7 +287,7 @@ public class PutSmbFile extends AbstractProcessor {
|
|||
return;
|
||||
}
|
||||
final ComponentLog logger = getLogger();
|
||||
logger.debug("Processing next {} flowfiles", new Object[]{flowFiles.size()});
|
||||
logger.debug("Processing next {} flowfiles", flowFiles.size());
|
||||
|
||||
final String hostname = context.getProperty(HOSTNAME).getValue();
|
||||
final String shareName = context.getProperty(SHARE).getValue();
|
||||
|
@ -330,9 +330,7 @@ public class PutSmbFile extends AbstractProcessor {
|
|||
final Boolean createMissingDirectories = context.getProperty(CREATE_DIRS).asBoolean();
|
||||
if (!createMissingDirectories && !share.folderExists(destinationFileParentDirectory)) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
logger.warn(
|
||||
"Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist",
|
||||
new Object[]{flowFile, destinationFileParentDirectory});
|
||||
logger.warn("Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist", flowFile, destinationFileParentDirectory);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
continue;
|
||||
} else if (!share.folderExists(destinationFileParentDirectory)) {
|
||||
|
@ -344,11 +342,11 @@ public class PutSmbFile extends AbstractProcessor {
|
|||
if (share.fileExists(destinationFullPath)) {
|
||||
if (conflictResolution.equals(IGNORE_RESOLUTION)) {
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
logger.info("Transferring {} to success as configured because file with same name already exists", new Object[]{flowFile});
|
||||
logger.info("Transferring {} to success as configured because file with same name already exists", flowFile);
|
||||
continue;
|
||||
} else if (conflictResolution.equals(FAIL_RESOLUTION)) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
logger.warn("Penalizing {} and routing to failure as configured because file with the same name already exists", new Object[]{flowFile});
|
||||
logger.warn("Penalizing {} and routing to failure as configured because file with the same name already exists", flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
continue;
|
||||
}
|
||||
|
@ -376,7 +374,7 @@ public class PutSmbFile extends AbstractProcessor {
|
|||
} catch (Exception e) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
logger.error("Cannot transfer the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
|
||||
logger.error("Cannot transfer the file. Penalizing {} and routing to 'failure'", flowFile, e);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -398,7 +396,7 @@ public class PutSmbFile extends AbstractProcessor {
|
|||
} catch (Exception e) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
logger.error("Cannot rename the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
|
||||
logger.error("Cannot rename the file. Penalizing {} and routing to 'failure'", flowFile, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -412,7 +410,7 @@ public class PutSmbFile extends AbstractProcessor {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
session.transfer(flowFiles, REL_FAILURE);
|
||||
logger.error("Could not establish smb connection because of error {}", new Object[]{e});
|
||||
logger.error("Could not establish smb connection", e);
|
||||
smbClient.getServerList().unregister(hostname);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -179,7 +179,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
|
|||
success = true;
|
||||
} else {
|
||||
flowFile = session.putAttribute(flowFile, "splunk.response.code", String.valueOf(successResponse.getCode()));
|
||||
getLogger().error("Putting data into Splunk was not successful: ({}) {}", new Object[] {successResponse.getCode(), successResponse.getText()});
|
||||
getLogger().error("Putting data into Splunk was not successful: ({}) {}", successResponse.getCode(), successResponse.getText());
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -195,7 +195,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
|
|||
|
||||
if (responseMessage != null) {
|
||||
try {
|
||||
getLogger().error("The response content is: {}", new Object[]{IOUtils.toString(responseMessage.getContent(), "UTF-8")});
|
||||
getLogger().error("The response content is: {}", IOUtils.toString(responseMessage.getContent(), "UTF-8"));
|
||||
} catch (final IOException ioException) {
|
||||
getLogger().error("An error occurred during reading response content!");
|
||||
}
|
||||
|
|
|
@ -209,7 +209,7 @@ public class QuerySplunkIndexingStatus extends SplunkAPICall {
|
|||
}
|
||||
});
|
||||
} else {
|
||||
getLogger().error("Query index status was not successful because of ({}) {}", new Object[] {responseMessage.getStatus(), responseMessage.getContent()});
|
||||
getLogger().error("Query index status was not successful because of ({}) {}", responseMessage.getStatus(), responseMessage.getContent());
|
||||
context.yield();
|
||||
session.transfer(undetermined.values(), RELATIONSHIP_UNDETERMINED);
|
||||
}
|
||||
|
|
|
@ -218,7 +218,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
|
|||
return splunkService.send(endpoint, request);
|
||||
//Catch Stale connection exception, reinitialize, and retry
|
||||
} catch (final HttpException e) {
|
||||
getLogger().error("Splunk request status code: {}. Retrying the request.", new Object[] {e.getStatus()});
|
||||
getLogger().error("Splunk request status code: {}. Retrying the request.", e.getStatus());
|
||||
splunkService.logout();
|
||||
splunkService = getSplunkService(splunkServiceArguments);
|
||||
return splunkService.send(endpoint, request);
|
||||
|
|
|
@ -377,7 +377,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
|
|||
|
||||
st.setQueryTimeout(queryTimeout); // timeout in seconds
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Executing query {}", new Object[] {selectQuery});
|
||||
logger.debug("Executing query {}", selectQuery);
|
||||
}
|
||||
|
||||
final boolean originalAutoCommit = con.getAutoCommit();
|
||||
|
@ -520,7 +520,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
|
|||
// Update the state
|
||||
session.setState(statePropertyMap, Scope.CLUSTER);
|
||||
} catch (IOException ioe) {
|
||||
getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
|
||||
getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", this, ioe);
|
||||
}
|
||||
|
||||
session.commitAsync();
|
||||
|
|
|
@ -263,15 +263,14 @@ public class CompressContent extends AbstractProcessor {
|
|||
if (compressionFormatValue.equals(COMPRESSION_FORMAT_ATTRIBUTE)) {
|
||||
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
|
||||
if (mimeType == null) {
|
||||
logger.error("No {} attribute exists for {}; routing to failure", new Object[]{CoreAttributes.MIME_TYPE.key(), flowFile});
|
||||
logger.error("No {} attribute exists for {}; routing to failure", CoreAttributes.MIME_TYPE.key(), flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
compressionFormatValue = compressionFormatMimeTypeMap.get(mimeType);
|
||||
if (compressionFormatValue == null) {
|
||||
logger.info("Mime Type of {} is '{}', which does not indicate a supported Compression Format; routing to success without decompressing",
|
||||
new Object[]{flowFile, mimeType});
|
||||
logger.info("Mime Type of {} is '{}', which does not indicate a supported Compression Format; routing to success without decompressing", flowFile, mimeType);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ public class CryptographicHashContent extends AbstractProcessor {
|
|||
|
||||
// Determine the algorithm to use
|
||||
final String algorithmName = context.getProperty(HASH_ALGORITHM).getValue();
|
||||
logger.debug("Using algorithm {}", new Object[]{algorithmName});
|
||||
logger.debug("Using algorithm {}", algorithmName);
|
||||
HashAlgorithm algorithm = HashAlgorithm.fromName(algorithmName);
|
||||
|
||||
if (flowFile.getSize() == 0) {
|
||||
|
@ -129,13 +129,13 @@ public class CryptographicHashContent extends AbstractProcessor {
|
|||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
} else {
|
||||
logger.debug("Flowfile content is empty; hashing with {} anyway", new Object[]{algorithmName});
|
||||
logger.debug("Flowfile content is empty; hashing with {} anyway", algorithmName);
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a hash with the configured algorithm for the content
|
||||
// and create a new attribute with the configured name
|
||||
logger.debug("Generating {} hash of content", new Object[]{algorithmName});
|
||||
logger.debug("Generating {} hash of content", algorithmName);
|
||||
final AtomicReference<String> hashValueHolder = new AtomicReference<>(null);
|
||||
|
||||
try {
|
||||
|
@ -144,17 +144,17 @@ public class CryptographicHashContent extends AbstractProcessor {
|
|||
|
||||
// Determine the destination attribute name
|
||||
final String attributeName = "content_" + algorithmName;
|
||||
logger.debug("Writing {} hash to attribute '{}'", new Object[]{algorithmName, attributeName});
|
||||
logger.debug("Writing {} hash to attribute '{}'", algorithmName, attributeName);
|
||||
|
||||
// Write the attribute
|
||||
flowFile = session.putAttribute(flowFile, attributeName, hashValueHolder.get());
|
||||
logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()});
|
||||
logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", attributeName, flowFile, hashValueHolder.get());
|
||||
|
||||
// Update provenance and route to success
|
||||
session.getProvenanceReporter().modifyAttributes(flowFile);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
} catch (ProcessException e) {
|
||||
logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
|
||||
logger.error("Routing to failure since failed to process {}", flowFile, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,7 +163,7 @@ public class DetectDuplicate extends AbstractProcessor {
|
|||
final ComponentLog logger = getLogger();
|
||||
final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (StringUtils.isBlank(cacheKey)) {
|
||||
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
|
||||
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", flowFile);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
|
@ -187,7 +187,7 @@ public class DetectDuplicate extends AbstractProcessor {
|
|||
boolean duplicate = originalCacheValue != null;
|
||||
if (duplicate && durationMS != null && (now >= originalCacheValue.getEntryTimeMS() + durationMS)) {
|
||||
boolean status = cache.remove(cacheKey, keySerializer);
|
||||
logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status});
|
||||
logger.debug("Removal of expired cached entry with key {} returned {}", cacheKey, status);
|
||||
|
||||
// both should typically result in duplicate being false...but, better safe than sorry
|
||||
if (shouldCacheIdentifier) {
|
||||
|
@ -202,18 +202,18 @@ public class DetectDuplicate extends AbstractProcessor {
|
|||
String originalFlowFileDescription = originalCacheValue.getDescription();
|
||||
flowFile = session.putAttribute(flowFile, ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME, originalFlowFileDescription);
|
||||
session.transfer(flowFile, REL_DUPLICATE);
|
||||
logger.info("Found {} to be a duplicate of FlowFile with description {}", new Object[]{flowFile, originalFlowFileDescription});
|
||||
logger.info("Found {} to be a duplicate of FlowFile with description {}", flowFile, originalFlowFileDescription);
|
||||
session.adjustCounter("Duplicates Detected", 1L, false);
|
||||
} else {
|
||||
session.getProvenanceReporter().route(flowFile, REL_NON_DUPLICATE);
|
||||
session.transfer(flowFile, REL_NON_DUPLICATE);
|
||||
logger.info("Could not find a duplicate entry in cache for {}; routing to non-duplicate", new Object[]{flowFile});
|
||||
logger.info("Could not find a duplicate entry in cache for {}; routing to non-duplicate", flowFile);
|
||||
session.adjustCounter("Non-Duplicate Files Processed", 1L, false);
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
|
||||
logger.error("Unable to communicate with cache when processing {}", flowFile, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -481,7 +481,7 @@ public class EnforceOrder extends AbstractProcessor {
|
|||
|
||||
} else {
|
||||
final String msg = String.format("Skipped, FlowFile order was %d but current target is %d", order, targetOrder.get());
|
||||
logger.warn(msg + ". {}", new Object[]{f});
|
||||
logger.warn("{}. {}", msg, f);
|
||||
transferResult(f, REL_SKIPPED, msg, targetOrder.get());
|
||||
}
|
||||
|
||||
|
@ -521,9 +521,9 @@ public class EnforceOrder extends AbstractProcessor {
|
|||
|
||||
private void transferToFailure(final FlowFile flowFile, final String message, final Throwable cause) {
|
||||
if (cause != null) {
|
||||
getLogger().warn(message + " {}", flowFile, cause);
|
||||
getLogger().warn("{} {}", message, flowFile, cause);
|
||||
} else {
|
||||
getLogger().warn(message + " {}", new Object[]{flowFile});
|
||||
getLogger().warn("{} {}", message, flowFile);
|
||||
}
|
||||
transferResult(flowFile, REL_FAILURE, message, null);
|
||||
}
|
||||
|
|
|
@ -244,7 +244,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
try {
|
||||
longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to create process due to {}", new Object[] {ioe});
|
||||
getLogger().error("Failed to create process", ioe);
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
} catch (final InterruptedException ie) {
|
||||
// Ignore
|
||||
} catch (final ExecutionException ee) {
|
||||
getLogger().error("Process execution failed due to {}", new Object[] {ee.getCause()});
|
||||
getLogger().error("Process execution failed", ee.getCause());
|
||||
}
|
||||
} else {
|
||||
// wait the allotted amount of time.
|
||||
|
@ -350,7 +350,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
builder.environment().putAll(environment);
|
||||
}
|
||||
|
||||
getLogger().info("Start creating new Process > {} ", new Object[] {commandStrings});
|
||||
getLogger().info("Start creating new Process > {} ", commandStrings);
|
||||
this.externalProcess = builder.redirectErrorStream(redirectErrorStream).start();
|
||||
|
||||
// Submit task to read error stream from process
|
||||
|
@ -423,7 +423,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
// In the future consider exposing it via configuration.
|
||||
boolean terminated = externalProcess.waitFor(1000, TimeUnit.MILLISECONDS);
|
||||
int exitCode = terminated ? externalProcess.exitValue() : -9999;
|
||||
getLogger().info("Process finished with exit code {} ", new Object[] {exitCode});
|
||||
getLogger().info("Process finished with exit code {} ", exitCode);
|
||||
} catch (InterruptedException e1) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
@ -454,7 +454,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
public void setDelegate(final OutputStream delegate) {
|
||||
lock.lock();
|
||||
try {
|
||||
logger.trace("Switching delegate from {} to {}", new Object[]{this.delegate, delegate});
|
||||
logger.trace("Switching delegate from {} to {}", this.delegate, delegate);
|
||||
this.delegate = delegate;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
@ -475,7 +475,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
try {
|
||||
while (true) {
|
||||
if (delegate != null) {
|
||||
logger.trace("Writing to {}", new Object[]{delegate});
|
||||
logger.trace("Writing to {}", delegate);
|
||||
|
||||
delegate.write(b);
|
||||
return;
|
||||
|
@ -496,7 +496,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
try {
|
||||
while (true) {
|
||||
if (delegate != null) {
|
||||
logger.trace("Writing to {}", new Object[]{delegate});
|
||||
logger.trace("Writing to {}", delegate);
|
||||
delegate.write(b, off, len);
|
||||
return;
|
||||
} else {
|
||||
|
|
|
@ -472,10 +472,10 @@ public class ExtractText extends AbstractProcessor {
|
|||
flowFile = session.putAllAttributes(flowFile, regexResults);
|
||||
session.getProvenanceReporter().modifyAttributes(flowFile);
|
||||
session.transfer(flowFile, REL_MATCH);
|
||||
logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
|
||||
logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", regexResults.size(), flowFile);
|
||||
} else {
|
||||
session.transfer(flowFile, REL_NO_MATCH);
|
||||
logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile});
|
||||
logger.info("Did not match any Regular Expressions for FlowFile {}", flowFile);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ public class FetchDistributedMapCache extends AbstractProcessor {
|
|||
final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
// This block retains the previous behavior when only one Cache Entry Identifier was allowed, so as not to change the expected error message
|
||||
if (StringUtils.isBlank(cacheKey)) {
|
||||
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
|
||||
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", flowFile);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
|
@ -211,7 +211,7 @@ public class FetchDistributedMapCache extends AbstractProcessor {
|
|||
for (int i = 0; i < cacheKeys.size(); i++) {
|
||||
if (StringUtils.isBlank(cacheKeys.get(i))) {
|
||||
// Log first missing identifier, route to failure, and return
|
||||
logger.error("FlowFile {} has no attribute for Cache Entry Identifier in position {}", new Object[]{flowFile, i});
|
||||
logger.error("FlowFile {} has no attribute for Cache Entry Identifier in position {}", flowFile, i);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
|
@ -234,7 +234,7 @@ public class FetchDistributedMapCache extends AbstractProcessor {
|
|||
final byte[] cacheValue = cacheValueEntry.getValue();
|
||||
|
||||
if (cacheValue == null) {
|
||||
logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile});
|
||||
logger.info("Could not find an entry in cache for {}; routing to not-found", flowFile);
|
||||
notFound = true;
|
||||
break;
|
||||
} else {
|
||||
|
@ -262,9 +262,9 @@ public class FetchDistributedMapCache extends AbstractProcessor {
|
|||
}
|
||||
|
||||
if (putInAttribute) {
|
||||
logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile});
|
||||
logger.info("Found a cache key of {} and added an attribute to {} with it's value.", cacheKey, flowFile);
|
||||
} else {
|
||||
logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile});
|
||||
logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", cacheKey, flowFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ public class FetchDistributedMapCache extends AbstractProcessor {
|
|||
} catch (final IOException e) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
|
||||
logger.error("Unable to communicate with cache when processing {}", flowFile, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -252,7 +252,7 @@ public class FetchFile extends AbstractProcessor {
|
|||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
return;
|
||||
} else if (!Files.exists(filePath)) {
|
||||
getLogger().log(levelFileNotFound, "Could not fetch file {} from file system for {} because the file does not exist; routing to not.found", new Object[] {file, flowFile});
|
||||
getLogger().log(levelFileNotFound, "Could not fetch file {} from file system for {} because the file does not exist; routing to not.found", file, flowFile);
|
||||
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
|
||||
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
|
||||
return;
|
||||
|
|
|
@ -380,7 +380,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
|
|||
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
st.setQueryTimeout(queryTimeout); // timeout in seconds
|
||||
|
||||
logger.debug("Executing {}", new Object[]{selectQuery});
|
||||
logger.debug("Executing {}", selectQuery);
|
||||
ResultSet resultSet;
|
||||
|
||||
resultSet = st.executeQuery(selectQuery);
|
||||
|
@ -536,12 +536,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
|
|||
}
|
||||
} catch (SQLException e) {
|
||||
if (fileToProcess != null) {
|
||||
logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
|
||||
logger.error("Routing {} to failure since unable to execute SQL select query {}", fileToProcess, selectQuery, e);
|
||||
fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
|
||||
session.transfer(fileToProcess, REL_FAILURE);
|
||||
|
||||
} else {
|
||||
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
|
||||
logger.error("Unable to execute SQL select query {}", selectQuery, e);
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ public class HandleHttpResponse extends AbstractProcessor {
|
|||
|
||||
final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (!isNumber(statusCodeValue)) {
|
||||
getLogger().error("Failed to respond to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue});
|
||||
getLogger().error("Failed to respond to HTTP request for {} because status code was '{}', which is not a valid number", flowFile, statusCodeValue);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -190,16 +190,16 @@ public class HandleHttpResponse extends AbstractProcessor {
|
|||
session.exportTo(flowFile, response.getOutputStream());
|
||||
response.flushBuffer();
|
||||
} catch (final ProcessException e) {
|
||||
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
|
||||
getLogger().error("Failed to respond to HTTP request for {}", flowFile, e);
|
||||
try {
|
||||
contextMap.complete(contextIdentifier);
|
||||
} catch (final RuntimeException ce) {
|
||||
getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
|
||||
getLogger().error("Failed to complete HTTP Transaction for {}", flowFile, ce);
|
||||
}
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
|
||||
getLogger().error("Failed to respond to HTTP request for {}", flowFile, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -207,13 +207,13 @@ public class HandleHttpResponse extends AbstractProcessor {
|
|||
try {
|
||||
contextMap.complete(contextIdentifier);
|
||||
} catch (final RuntimeException ce) {
|
||||
getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
|
||||
getLogger().error("Failed to complete HTTP Transaction for {}", flowFile, ce);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
session.getProvenanceReporter().send(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode});
|
||||
getLogger().info("Successfully responded to HTTP Request for {} with status code {}", flowFile, statusCode);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
|
||||
|
|
|
@ -509,7 +509,7 @@ public class JoinEnrichment extends BinFiles {
|
|||
try {
|
||||
binProcessingResult = this.processBin(bin, context);
|
||||
} catch (final ProcessException e) {
|
||||
logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e});
|
||||
logger.error("Failed to process bundle of {} files", bin.getContents().size(), e);
|
||||
|
||||
final ProcessSession binSession = bin.getSession();
|
||||
for (final FlowFile flowFile : bin.getContents()) {
|
||||
|
@ -518,7 +518,7 @@ public class JoinEnrichment extends BinFiles {
|
|||
binSession.commitAsync();
|
||||
continue;
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e});
|
||||
logger.error("Rolling back sessions since failed to process bundle of {} files", bin.getContents().size(), e);
|
||||
|
||||
bin.getSession().rollback();
|
||||
continue;
|
||||
|
|
|
@ -341,21 +341,21 @@ public class ListDatabaseTables extends AbstractProcessor {
|
|||
}
|
||||
|
||||
if (refreshTable) {
|
||||
logger.info("Found {}: {}", new Object[] {tableType, fqn});
|
||||
logger.info("Found {}: {}", tableType, fqn);
|
||||
final Map<String, String> tableInformation = new HashMap<>();
|
||||
|
||||
if (includeCount) {
|
||||
try (Statement st = con.createStatement()) {
|
||||
final String countQuery = "SELECT COUNT(1) FROM " + fqn;
|
||||
|
||||
logger.debug("Executing query: {}", new Object[] {countQuery});
|
||||
logger.debug("Executing query: {}", countQuery);
|
||||
try (ResultSet countResult = st.executeQuery(countQuery)) {
|
||||
if (countResult.next()) {
|
||||
tableInformation.put(DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
|
||||
}
|
||||
}
|
||||
} catch (final SQLException se) {
|
||||
logger.error("Couldn't get row count for {}", new Object[] {fqn});
|
||||
logger.error("Couldn't get row count for {}", fqn);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -475,7 +475,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
|||
});
|
||||
} catch (IOException ioe) {
|
||||
// well then this FlowFile gets none of these attributes
|
||||
getLogger().warn("Error collecting attributes for file {}, message is {}", new Object[] {absPathString, ioe.getMessage()});
|
||||
getLogger().warn("Error collecting attributes for file {}", absPathString, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -588,7 +588,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
|||
if (Files.isReadable(dir)) {
|
||||
return FileVisitResult.CONTINUE;
|
||||
} else {
|
||||
getLogger().debug("The following directory is not readable: {}", new Object[]{dir.toString()});
|
||||
getLogger().debug("The following directory is not readable: {}", dir);
|
||||
return FileVisitResult.SKIP_SUBTREE;
|
||||
}
|
||||
}
|
||||
|
@ -615,10 +615,10 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
|||
@Override
|
||||
public FileVisitResult visitFileFailed(final Path path, final IOException e) {
|
||||
if (e instanceof AccessDeniedException) {
|
||||
getLogger().debug("The following file is not readable: {}", new Object[]{path.toString()});
|
||||
getLogger().debug("The following file is not readable: {}", path);
|
||||
return FileVisitResult.SKIP_SUBTREE;
|
||||
} else {
|
||||
getLogger().error("Error during visiting file {}: {}", path.toString(), e.getMessage(), e);
|
||||
getLogger().error("Error during visiting file {}", path, e);
|
||||
return FileVisitResult.TERMINATE;
|
||||
}
|
||||
}
|
||||
|
@ -635,7 +635,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
|||
|
||||
final long millis = System.currentTimeMillis() - start;
|
||||
|
||||
getLogger().debug("Took {} milliseconds to perform listing and gather {} entries", new Object[] {millis, result.size()});
|
||||
getLogger().debug("Took {} milliseconds to perform listing and gather {} entries", millis, result.size());
|
||||
return result;
|
||||
} catch (final ProcessorStoppedException pse) {
|
||||
getLogger().info("Processor was stopped so will not complete listing of Files");
|
||||
|
@ -924,7 +924,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
|||
|
||||
@Override
|
||||
public synchronized void purgeTimingInfo(final long cutoff) {
|
||||
logger.debug("Purging any entries from Performance Tracker that is older than {}", new Object[] {new Date(cutoff)});
|
||||
logger.debug("Purging any entries from Performance Tracker that is older than {}", new Date(cutoff));
|
||||
final Iterator<Map.Entry<Tuple<String, String>, TimingInfo>> itr = directoryToTimingInfo.entrySet().iterator();
|
||||
|
||||
int purgedCount = 0;
|
||||
|
@ -945,7 +945,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
|||
}
|
||||
|
||||
this.earliestTimestamp = earliestTimestamp;
|
||||
logger.debug("Purged {} entries from Performance Tracker; now holding {} entries", new Object[] {purgedCount, directoryToTimingInfo.size()});
|
||||
logger.debug("Purged {} entries from Performance Tracker; now holding {} entries", purgedCount, directoryToTimingInfo.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1065,7 +1065,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
|||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Performing operation {} on {} took {} milliseconds", new Object[] {operation, getFullPath(), duration});
|
||||
logger.trace("Performing operation {} on {} took {} milliseconds", operation, getFullPath(), duration);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -382,7 +382,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||
toShutdown.destroy();
|
||||
clearInit();
|
||||
} catch (final Exception ex) {
|
||||
getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[] {ex});
|
||||
getLogger().warn("unable to cleanly shutdown embedded server", ex);
|
||||
this.server = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -343,7 +343,7 @@ public class ListenTCPRecord extends AbstractProcessor {
|
|||
}
|
||||
|
||||
if (socketRecordReader.isClosed()) {
|
||||
getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
|
||||
getLogger().warn("Unable to read records from {}, socket already closed", getRemoteAddress(socketRecordReader));
|
||||
IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
|
||||
return;
|
||||
}
|
||||
|
@ -390,7 +390,7 @@ public class ListenTCPRecord extends AbstractProcessor {
|
|||
}
|
||||
|
||||
if (record == null) {
|
||||
getLogger().debug("No records available from {}, closing connection", new Object[]{getRemoteAddress(socketRecordReader)});
|
||||
getLogger().debug("No records available from {}, closing connection", getRemoteAddress(socketRecordReader));
|
||||
IOUtils.closeQuietly(socketRecordReader);
|
||||
session.remove(flowFile);
|
||||
return;
|
||||
|
|
|
@ -225,7 +225,7 @@ public class LookupAttribute extends AbstractProcessor {
|
|||
matched = putAttribute(attributeName, attributeValue, attributes, includeEmptyValues, logger) || matched;
|
||||
|
||||
if (!matched && logger.isDebugEnabled()) {
|
||||
logger.debug("No such value for key: {}", new Object[]{lookupKey});
|
||||
logger.debug("No such value for key: {}", lookupKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -435,7 +435,7 @@ public class LookupRecord extends AbstractProcessor {
|
|||
try {
|
||||
writer.close();
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
|
||||
getLogger().warn("Failed to close Writer for {}", childFlowFile);
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
@ -452,7 +452,7 @@ public class LookupRecord extends AbstractProcessor {
|
|||
}
|
||||
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to process {}", new Object[]{flowFile, e});
|
||||
getLogger().error("Failed to process {}", flowFile, e);
|
||||
|
||||
for (final Relationship relationship : lookupContext.getRelationshipsUsed()) {
|
||||
final RecordSetWriter writer = lookupContext.getExistingRecordWriterForRelationship(relationship);
|
||||
|
|
|
@ -539,7 +539,7 @@ public class MergeContent extends BinFiles {
|
|||
// Fail the flow files and commit them
|
||||
if (error != null) {
|
||||
final String binDescription = contents.size() <= 10 ? contents.toString() : contents.size() + " FlowFiles";
|
||||
getLogger().error(error + "; routing {} to failure", new Object[]{binDescription});
|
||||
getLogger().error("{}; routing {} to failure", error, binDescription);
|
||||
binSession.transfer(contents, REL_FAILURE);
|
||||
binSession.commitAsync();
|
||||
|
||||
|
@ -568,7 +568,7 @@ public class MergeContent extends BinFiles {
|
|||
|
||||
final String inputDescription = contents.size() < 10 ? contents.toString() : contents.size() + " FlowFiles";
|
||||
|
||||
getLogger().info("Merged {} into {}. Reason for merging: {}", new Object[] {inputDescription, bundle, bin.getEvictionReason()});
|
||||
getLogger().info("Merged {} into {}. Reason for merging: {}", inputDescription, bundle, bin.getEvictionReason());
|
||||
|
||||
binSession.transfer(bundle, REL_MERGED);
|
||||
binProcessingResult.getAttributes().put(MERGE_UUID_ATTRIBUTE, bundle.getAttribute(CoreAttributes.UUID.key()));
|
||||
|
|
|
@ -143,7 +143,7 @@ public class ModifyBytes extends AbstractProcessor {
|
|||
});
|
||||
}
|
||||
|
||||
logger.info("Transferred {} to 'success'", new Object[]{ff});
|
||||
logger.info("Transferred {} to 'success'", ff);
|
||||
session.getProvenanceReporter().modifyContent(ff, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
session.transfer(ff, REL_SUCCESS);
|
||||
}
|
||||
|
|
|
@ -213,7 +213,7 @@ public class Notify extends AbstractProcessor {
|
|||
|
||||
// if the computed value is null, or empty, we transfer the flow file to failure relationship
|
||||
if (StringUtils.isBlank(signalId)) {
|
||||
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
|
||||
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", flowFile);
|
||||
// set 'notified' attribute
|
||||
session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
|
||||
continue;
|
||||
|
@ -251,7 +251,7 @@ public class Notify extends AbstractProcessor {
|
|||
signalBuffer.flowFiles.add(flowFile);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile});
|
||||
logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", signalId, counterName, flowFile);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -274,7 +274,7 @@ public class ParseCEF extends AbstractProcessor {
|
|||
// ParCEFone returns null every time it cannot parse an
|
||||
// event, so we test
|
||||
if (event == null) {
|
||||
getLogger().error("Failed to parse {} as a CEF message: it does not conform to the CEF standard; routing to failure", new Object[] {flowFile});
|
||||
getLogger().error("Failed to parse {} as a CEF message: it does not conform to the CEF standard; routing to failure", flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -132,13 +132,13 @@ public class ParseSyslog extends AbstractProcessor {
|
|||
try {
|
||||
event = parser.parseEvent(buffer, null);
|
||||
} catch (final ProcessException pe) {
|
||||
getLogger().error("Failed to parse {} as a Syslog message due to {}; routing to failure", new Object[] {flowFile, pe});
|
||||
getLogger().error("Routing to failure since failed to parse {} as a Syslog message", flowFile, pe);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event == null || !event.isValid()) {
|
||||
getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile});
|
||||
getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue