mirror of https://github.com/apache/nifi.git
NIFI-12075 Deprecated ComponentLog methods with Object[] and Throwable
- Updated component references to remove use of Object[] wrapping for log methods This closes #7748 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
66524ab267
commit
ebd0d60e49
|
@ -27,7 +27,7 @@ public class NopComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(final String msg, final Object[] os) {
|
public void warn(final String msg, final Object... os) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ public class NopComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(final String msg, final Object[] os) {
|
public void trace(final String msg, final Object... os) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ public class NopComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(final String msg, final Object[] os) {
|
public void info(final String msg, final Object... os) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ public class NopComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(final String msg, final Object[] os) {
|
public void error(final String msg, final Object... os) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,7 +157,7 @@ public class NopComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(final String msg, final Object[] os) {
|
public void debug(final String msg, final Object... os) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ public class NopComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void log(final LogLevel level, final String msg, final Object[] os) {
|
public void log(final LogLevel level, final String msg, final Object... os) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ package org.apache.nifi.logging;
|
||||||
* within the same NiFi instance.
|
* within the same NiFi instance.
|
||||||
* </li>
|
* </li>
|
||||||
* <li>
|
* <li>
|
||||||
* If the last value in an Object[] argument that is passed to the logger is a
|
* If the last value in an Object... argument that is passed to the logger is a
|
||||||
* Throwable, then the logged message will include a <code>toString()</code> of
|
* Throwable, then the logged message will include a <code>toString()</code> of
|
||||||
* the Throwable; in addition, if the component's logger is set to DEBUG level
|
* the Throwable; in addition, if the component's logger is set to DEBUG level
|
||||||
* via the logback configuration, the Stacktrace will also be logged. This
|
* via the logback configuration, the Stacktrace will also be logged. This
|
||||||
|
@ -53,6 +53,7 @@ public interface ComponentLog {
|
||||||
|
|
||||||
void warn(String msg, Object... os);
|
void warn(String msg, Object... os);
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
void warn(String msg, Object[] os, Throwable t);
|
void warn(String msg, Object[] os, Throwable t);
|
||||||
|
|
||||||
void warn(String msg);
|
void warn(String msg);
|
||||||
|
@ -67,6 +68,7 @@ public interface ComponentLog {
|
||||||
|
|
||||||
void trace(String msg);
|
void trace(String msg);
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
void trace(String msg, Object[] os, Throwable t);
|
void trace(String msg, Object[] os, Throwable t);
|
||||||
|
|
||||||
default void trace(LogMessage logMessage) {
|
default void trace(LogMessage logMessage) {
|
||||||
|
@ -89,6 +91,7 @@ public interface ComponentLog {
|
||||||
|
|
||||||
void info(String msg);
|
void info(String msg);
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
void info(String msg, Object[] os, Throwable t);
|
void info(String msg, Object[] os, Throwable t);
|
||||||
|
|
||||||
default void info(LogMessage logMessage) {
|
default void info(LogMessage logMessage) {
|
||||||
|
@ -103,6 +106,7 @@ public interface ComponentLog {
|
||||||
|
|
||||||
void error(String msg);
|
void error(String msg);
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
void error(String msg, Object[] os, Throwable t);
|
void error(String msg, Object[] os, Throwable t);
|
||||||
|
|
||||||
default void error(LogMessage logMessage) {
|
default void error(LogMessage logMessage) {
|
||||||
|
@ -113,6 +117,7 @@ public interface ComponentLog {
|
||||||
|
|
||||||
void debug(String msg, Object... os);
|
void debug(String msg, Object... os);
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
void debug(String msg, Object[] os, Throwable t);
|
void debug(String msg, Object[] os, Throwable t);
|
||||||
|
|
||||||
void debug(String msg);
|
void debug(String msg);
|
||||||
|
@ -184,6 +189,7 @@ public interface ComponentLog {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
default void log(LogLevel level, String msg, Object[] os, Throwable t) {
|
default void log(LogLevel level, String msg, Object[] os, Throwable t) {
|
||||||
switch (level) {
|
switch (level) {
|
||||||
case DEBUG:
|
case DEBUG:
|
||||||
|
|
|
@ -44,7 +44,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(Marker marker, String string, Object[] os) {
|
public void warn(Marker marker, String string, Object... os) {
|
||||||
logger.warn(marker, string, os);
|
logger.warn(marker, string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(String string, Object[] os) {
|
public void warn(String string, Object... os) {
|
||||||
logger.warn(string, os);
|
logger.warn(string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(Marker marker, String string, Object[] os) {
|
public void trace(Marker marker, String string, Object... os) {
|
||||||
logger.trace(marker, string, os);
|
logger.trace(marker, string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,7 +123,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(String string, Object[] os) {
|
public void trace(String string, Object... os) {
|
||||||
logger.trace(string, os);
|
logger.trace(string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,7 +202,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(Marker marker, String string, Object[] os) {
|
public void info(Marker marker, String string, Object... os) {
|
||||||
logger.info(marker, string, os);
|
logger.info(marker, string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,7 +231,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(String string, Object[] os) {
|
public void info(String string, Object... os) {
|
||||||
logger.info(string, os);
|
logger.info(string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(Marker marker, String string, Object[] os) {
|
public void error(Marker marker, String string, Object... os) {
|
||||||
logger.error(marker, string, os);
|
logger.error(marker, string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,7 +294,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(String string, Object[] os) {
|
public void error(String string, Object... os) {
|
||||||
logger.error(string, os);
|
logger.error(string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,7 +319,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(Marker marker, String string, Object[] os) {
|
public void debug(Marker marker, String string, Object... os) {
|
||||||
logger.debug(marker, string, os);
|
logger.debug(marker, string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,7 +344,7 @@ public class NiFiLog implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(String string, Object[] os) {
|
public void debug(String string, Object... os) {
|
||||||
logger.debug(string, os);
|
logger.debug(string, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,9 +124,9 @@ public class CapturingLogger implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(Marker marker, String format, Object... argArray) {
|
public void trace(Marker marker, String format, Object... arguments) {
|
||||||
traceMessages.add(new LogMessage(marker, format, null, argArray));
|
traceMessages.add(new LogMessage(marker, format, null, arguments));
|
||||||
logger.trace(marker, format, argArray);
|
logger.trace(marker, format, arguments);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -197,9 +197,9 @@ public class CapturingLogger implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(Marker marker, String format, Object... argArray) {
|
public void debug(Marker marker, String format, Object... arguments) {
|
||||||
debugMessages.add(new LogMessage(marker, format, null, argArray));
|
debugMessages.add(new LogMessage(marker, format, null, arguments));
|
||||||
logger.debug(marker, format, argArray);
|
logger.debug(marker, format, arguments);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -268,9 +268,9 @@ public class CapturingLogger implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(Marker marker, String format, Object... argArray) {
|
public void info(Marker marker, String format, Object... arguments) {
|
||||||
infoMessages.add(new LogMessage(marker, format, null, argArray));
|
infoMessages.add(new LogMessage(marker, format, null, arguments));
|
||||||
logger.info(marker, format, argArray);
|
logger.info(marker, format, arguments);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -338,9 +338,9 @@ public class CapturingLogger implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(Marker marker, String format, Object... argArray) {
|
public void warn(Marker marker, String format, Object... arguments) {
|
||||||
warnMessages.add(new LogMessage(marker, format, null, argArray));
|
warnMessages.add(new LogMessage(marker, format, null, arguments));
|
||||||
logger.warn(marker, format, argArray);
|
logger.warn(marker, format, arguments);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -420,10 +420,10 @@ public class CapturingLogger implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(Marker marker, String format, Object... argArray) {
|
public void error(Marker marker, String format, Object... arguments) {
|
||||||
final String message = MessageFormatter.arrayFormat(format, argArray).getMessage();
|
final String message = MessageFormatter.arrayFormat(format, arguments).getMessage();
|
||||||
errorMessages.add(new LogMessage(marker, message, null, argArray));
|
errorMessages.add(new LogMessage(marker, message, null, arguments));
|
||||||
logger.error(marker, format, argArray);
|
logger.error(marker, format, arguments);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -121,7 +121,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(String msg, Object[] os) {
|
public void warn(String msg, Object... os) {
|
||||||
if (lastArgIsException(os)) {
|
if (lastArgIsException(os)) {
|
||||||
warn(msg, translateException(os), (Throwable) os[os.length - 1]);
|
warn(msg, translateException(os), (Throwable) os[os.length - 1]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -156,7 +156,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(String msg, Object[] os) {
|
public void trace(String msg, Object... os) {
|
||||||
msg = "{} " + msg;
|
msg = "{} " + msg;
|
||||||
os = addProcessor(os);
|
os = addProcessor(os);
|
||||||
logger.trace(msg, os);
|
logger.trace(msg, os);
|
||||||
|
@ -215,7 +215,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(String msg, Object[] os) {
|
public void info(String msg, Object... os) {
|
||||||
msg = "{} " + msg;
|
msg = "{} " + msg;
|
||||||
os = addProcessor(os);
|
os = addProcessor(os);
|
||||||
|
|
||||||
|
@ -258,7 +258,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(String msg, Object[] os) {
|
public void error(String msg, Object... os) {
|
||||||
if (lastArgIsException(os)) {
|
if (lastArgIsException(os)) {
|
||||||
error(msg, translateException(os), (Throwable) os[os.length - 1]);
|
error(msg, translateException(os), (Throwable) os[os.length - 1]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -293,7 +293,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(String msg, Object[] os) {
|
public void debug(String msg, Object... os) {
|
||||||
os = addProcessor(os);
|
os = addProcessor(os);
|
||||||
msg = "{} " + msg;
|
msg = "{} " + msg;
|
||||||
|
|
||||||
|
|
|
@ -234,7 +234,7 @@ public class SplitAvro extends AbstractProcessor {
|
||||||
final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size());
|
final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size());
|
||||||
session.transfer(originalFlowFile, REL_ORIGINAL);
|
session.transfer(originalFlowFile, REL_ORIGINAL);
|
||||||
} catch (ProcessException e) {
|
} catch (ProcessException e) {
|
||||||
getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
|
getLogger().error("Failed to split {} due to {}", flowFile, e.getMessage(), e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -553,7 +553,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
|
||||||
|
|
||||||
writer.finishListing();
|
writer.finishListing();
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e);
|
getLogger().error("Failed to list contents of bucket due to {}", e, e);
|
||||||
writer.finishListingExceptionally(e);
|
writer.finishListingExceptionally(e);
|
||||||
session.rollback();
|
session.rollback();
|
||||||
context.yield();
|
context.yield();
|
||||||
|
@ -664,7 +664,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
|
||||||
|
|
||||||
writer.finishListing();
|
writer.finishListing();
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Failed to list contents of bucket due to {}", new Object[]{e}, e);
|
getLogger().error("Failed to list contents of bucket due to {}", e, e);
|
||||||
writer.finishListingExceptionally(e);
|
writer.finishListingExceptionally(e);
|
||||||
session.rollback();
|
session.rollback();
|
||||||
context.yield();
|
context.yield();
|
||||||
|
@ -680,7 +680,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
|
||||||
taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
|
taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
|
getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
|
||||||
new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
|
versionSummary.getKey(), versionSummary.getBucketName(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return taggingResult;
|
return taggingResult;
|
||||||
|
@ -693,7 +693,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
|
||||||
objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
|
objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
|
getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
|
||||||
new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
|
versionSummary.getKey(), versionSummary.getBucketName(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return objectMetadata;
|
return objectMetadata;
|
||||||
|
@ -1003,7 +1003,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
|
||||||
try {
|
try {
|
||||||
recordWriter.close();
|
recordWriter.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e);
|
logger.error("Failed to write listing as Records due to {}", e, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
|
|
|
@ -330,8 +330,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
// penalize or yield
|
// penalize or yield
|
||||||
if (requestFlowFile != null) {
|
if (requestFlowFile != null) {
|
||||||
logger.error("Routing to {} due to exception: {}",
|
logger.error("Routing to {} due to exception: {}", REL_FAILURE.getName(), e, e);
|
||||||
new Object[]{REL_FAILURE.getName(), e}, e);
|
|
||||||
requestFlowFile = session.penalize(requestFlowFile);
|
requestFlowFile = session.penalize(requestFlowFile);
|
||||||
requestFlowFile = session
|
requestFlowFile = session
|
||||||
.putAttribute(requestFlowFile, EXCEPTION_CLASS, e.getClass().getName());
|
.putAttribute(requestFlowFile, EXCEPTION_CLASS, e.getClass().getName());
|
||||||
|
@ -351,8 +350,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
|
||||||
session.remove(responseFlowFile);
|
session.remove(responseFlowFile);
|
||||||
}
|
}
|
||||||
} catch (final Exception e1) {
|
} catch (final Exception e1) {
|
||||||
logger.error("Could not cleanup response flowfile due to exception: {}",
|
logger.error("Could not cleanup response flowfile due to exception: {}", e1, e1);
|
||||||
new Object[]{e1}, e1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,7 +202,7 @@ public abstract class AbstractAzureCosmosDBProcessor extends AbstractProcessor {
|
||||||
this.container = null;
|
this.container = null;
|
||||||
this.cosmosClient.close();
|
this.cosmosClient.close();
|
||||||
}catch(CosmosException e) {
|
}catch(CosmosException e) {
|
||||||
logger.error("Error closing Cosmos DB client due to {}", new Object[] { e.getMessage() }, e);
|
logger.error("Error closing Cosmos DB client due to {}", e.getMessage(), e);
|
||||||
} finally {
|
} finally {
|
||||||
this.cosmosClient = null;
|
this.cosmosClient = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,7 +195,7 @@ public class PutAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
|
||||||
bulkInsert(batch);
|
bulkInsert(batch);
|
||||||
}
|
}
|
||||||
} catch (SchemaNotFoundException | MalformedRecordException | IOException | CosmosException e) {
|
} catch (SchemaNotFoundException | MalformedRecordException | IOException | CosmosException e) {
|
||||||
logger.error("PutAzureCosmoDBRecord failed with error: {}", new Object[]{e.getMessage()}, e);
|
logger.error("PutAzureCosmoDBRecord failed with error: {}", e.getMessage(), e);
|
||||||
error = true;
|
error = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (!error) {
|
if (!error) {
|
||||||
|
|
|
@ -257,8 +257,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
|
||||||
|
|
||||||
} catch (final QueryValidationException qve) {
|
} catch (final QueryValidationException qve) {
|
||||||
logger.error("The CQL statement {} is invalid due to syntax error, authorization issue, or another "
|
logger.error("The CQL statement {} is invalid due to syntax error, authorization issue, or another "
|
||||||
+ "validation problem; routing {} to failure",
|
+ "validation problem; routing {} to failure", cql, flowFile, qve);
|
||||||
new Object[]{cql, flowFile}, qve);
|
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
|
||||||
|
|
|
@ -113,7 +113,7 @@ public class GeoEnrichIP extends AbstractEnrichIP {
|
||||||
getLogger().warn("Could not resolve the IP for value '{}', contained within the attribute '{}' in " +
|
getLogger().warn("Could not resolve the IP for value '{}', contained within the attribute '{}' in " +
|
||||||
"FlowFile '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
|
"FlowFile '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
|
||||||
"providing the processor with an invalid IP address ",
|
"providing the processor with an invalid IP address ",
|
||||||
new Object[]{ipAttributeValue, IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
|
ipAttributeValue, IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile, ioe);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,7 +136,7 @@ public class GeoEnrichIP extends AbstractEnrichIP {
|
||||||
// Most name or IP resolutions failure should have been triggered in the try loop above but
|
// Most name or IP resolutions failure should have been triggered in the try loop above but
|
||||||
// environmental conditions may trigger errors during the second resolution as well.
|
// environmental conditions may trigger errors during the second resolution as well.
|
||||||
session.transfer(flowFile, REL_NOT_FOUND);
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex);
|
getLogger().warn("Failure while trying to find enrichment data for {} due to {}", flowFile, ex, ex);
|
||||||
return;
|
return;
|
||||||
} finally {
|
} finally {
|
||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class ISPEnrichIP extends AbstractEnrichIP {
|
||||||
getLogger().warn("Could not resolve the IP for value '{}', contained within the attribute '{}' in " +
|
getLogger().warn("Could not resolve the IP for value '{}', contained within the attribute '{}' in " +
|
||||||
"FlowFile '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
|
"FlowFile '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
|
||||||
"providing the processor with an invalid IP address ",
|
"providing the processor with an invalid IP address ",
|
||||||
new Object[]{ipAttributeValue, IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
|
ipAttributeValue, IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile, ioe);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final StopWatch stopWatch = new StopWatch(true);
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
|
@ -97,7 +97,7 @@ public class ISPEnrichIP extends AbstractEnrichIP {
|
||||||
// Most name or IP resolutions failure should have been triggered in the try loop above but
|
// Most name or IP resolutions failure should have been triggered in the try loop above but
|
||||||
// environmental conditions may trigger errors during the second resolution as well.
|
// environmental conditions may trigger errors during the second resolution as well.
|
||||||
session.transfer(flowFile, REL_NOT_FOUND);
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex);
|
getLogger().warn("Failure while trying to find enrichment data for {} due to {}", flowFile, ex, ex);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -327,7 +327,7 @@ public class QueryWhois extends AbstractEnrichProcessor {
|
||||||
if (whoisClient.isConnected()) whoisClient.disconnect();
|
if (whoisClient.isConnected()) whoisClient.disconnect();
|
||||||
}
|
}
|
||||||
} catch ( IOException e) {
|
} catch ( IOException e) {
|
||||||
getLogger().error("Query failed due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Query failed due to {}", e.getMessage(), e);
|
||||||
throw new ProcessException("Error performing Whois Lookup", e);
|
throw new ProcessException("Error performing Whois Lookup", e);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class ResultProcessor {
|
||||||
if (exception == null) {
|
if (exception == null) {
|
||||||
session.transfer(updated, successRelationship);
|
session.transfer(updated, successRelationship);
|
||||||
} else {
|
} else {
|
||||||
logger.error(UNABLE_TO_PROCESS_DUE_TO, new Object[]{name, exception}, exception);
|
logger.error(UNABLE_TO_PROCESS_DUE_TO, name, exception, exception);
|
||||||
session.transfer(updated, failureRelationship);
|
session.transfer(updated, failureRelationship);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,6 +81,6 @@ public class ResultProcessorTest {
|
||||||
verify(processSession).putAttribute(flowFile, CoreAttributes.FILENAME.key(), name);
|
verify(processSession).putAttribute(flowFile, CoreAttributes.FILENAME.key(), name);
|
||||||
verify(processSession).putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MediaType.APPLICATION_XML_UTF_8.toString());
|
verify(processSession).putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MediaType.APPLICATION_XML_UTF_8.toString());
|
||||||
verify(processSession).transfer(flowFile, failureRelationship);
|
verify(processSession).transfer(flowFile, failureRelationship);
|
||||||
verify(componentLog).error(eq(ResultProcessor.UNABLE_TO_PROCESS_DUE_TO), any(Object[].class), eq(exception));
|
verify(componentLog).error(eq(ResultProcessor.UNABLE_TO_PROCESS_DUE_TO), any(Object[].class));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -288,7 +288,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
||||||
final String groupingIdentifier = getGroupId(context, flowFile, session);
|
final String groupingIdentifier = getGroupId(context, flowFile, session);
|
||||||
flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
|
flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[] {flowFile}, e);
|
getLogger().error("Could not determine which Bin to add {} to; will route to failure", flowFile, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
session.commitAsync();
|
session.commitAsync();
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,8 +200,7 @@ public abstract class AbstractListenEventBatchingProcessor<E extends Event> exte
|
||||||
batch.setFlowFile(appendedFlowFile);
|
batch.setFlowFile(appendedFlowFile);
|
||||||
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
|
getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", e.getMessage(), e);
|
||||||
new Object[] {e.getMessage()}, e);
|
|
||||||
errorEvents.offer(event);
|
errorEvents.offer(event);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -555,7 +555,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
||||||
context.getStateManager().clear(getStateScope(context));
|
context.getStateManager().clear(getStateScope(context));
|
||||||
|
|
||||||
} catch (final IOException re) {
|
} catch (final IOException re) {
|
||||||
getLogger().error("Failed to remove previous state from the State Manager.", new Object[]{re.getMessage()}, re);
|
getLogger().error("Failed to remove previous state from the State Manager.", re.getMessage(), re);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -565,7 +565,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
||||||
// comparision in lastModifiedMap to the same entity.
|
// comparision in lastModifiedMap to the same entity.
|
||||||
entityList = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.EXECUTION);
|
entityList = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.EXECUTION);
|
||||||
} catch (final IOException pe) {
|
} catch (final IOException pe) {
|
||||||
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, pe);
|
getLogger().error("Failed to perform listing on remote host due to {}", pe.getMessage(), pe);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -670,7 +670,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Failed to perform listing on remote host due to {}", e.getMessage(), e);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -757,7 +757,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
||||||
// track of when this last executed for consideration of the lag nanos
|
// track of when this last executed for consideration of the lag nanos
|
||||||
entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
|
entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Failed to perform listing on remote host due to {}", e.getMessage(), e);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1125,7 +1125,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
||||||
try {
|
try {
|
||||||
return performListing(context, minTimestampToList, ListingMode.EXECUTION);
|
return performListing(context, minTimestampToList, ListingMode.EXECUTION);
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Failed to perform listing on remote host due to {}", e.getMessage(), e);
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
}, entity -> createAttributes(entity, context));
|
}, entity -> createAttributes(entity, context));
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class RollbackOnFailure {
|
||||||
// However, keeping failed FlowFile in the incoming relationship would retry it too often.
|
// However, keeping failed FlowFile in the incoming relationship would retry it too often.
|
||||||
// So, administratively yield the process.
|
// So, administratively yield the process.
|
||||||
if (functionContext.isRollbackOnFailure()) {
|
if (functionContext.isRollbackOnFailure()) {
|
||||||
logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t);
|
logger.warn("Administratively yielding {} after rolling back due to {}", context.getName(), t, t);
|
||||||
context.yield();
|
context.yield();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -247,7 +247,7 @@ public class ProvenanceEventConsumer {
|
||||||
stateManager.setState(newMapOfState, Scope.LOCAL);
|
stateManager.setState(newMapOfState, Scope.LOCAL);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
logger.error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}",
|
logger.error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}",
|
||||||
new Object[]{lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
|
lastEventId, ioe, ioe, ioe.getMessage(), ioe);
|
||||||
}
|
}
|
||||||
|
|
||||||
return lastEvent.getEventId() + 1;
|
return lastEvent.getEventId() + 1;
|
||||||
|
|
|
@ -1767,7 +1767,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
|
|
||||||
return scheduler.runProcessorOnce(processor, stopCallback);
|
return scheduler.runProcessorOnce(processor, stopCallback);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
processor.getLogger().error("Error while running processor {} once.", new Object[]{processor}, e);
|
processor.getLogger().error("Error while running processor {} once.", processor, e);
|
||||||
return stopProcessor(processor);
|
return stopProcessor(processor);
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class SimpleProcessLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(final String msg, final Object[] os) {
|
public void warn(final String msg, final Object... os) {
|
||||||
if (isWarnEnabled()) {
|
if (isWarnEnabled()) {
|
||||||
final String componentMessage = getComponentMessage(msg);
|
final String componentMessage = getComponentMessage(msg);
|
||||||
final Object[] arguments = insertComponent(os);
|
final Object[] arguments = insertComponent(os);
|
||||||
|
@ -132,7 +132,7 @@ public class SimpleProcessLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(final String msg, final Object[] os) {
|
public void trace(final String msg, final Object... os) {
|
||||||
if (isTraceEnabled()) {
|
if (isTraceEnabled()) {
|
||||||
final String componentMessage = getComponentMessage(msg);
|
final String componentMessage = getComponentMessage(msg);
|
||||||
final Object[] arguments = insertComponent(os);
|
final Object[] arguments = insertComponent(os);
|
||||||
|
@ -219,7 +219,7 @@ public class SimpleProcessLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(final String msg, final Object[] os) {
|
public void info(final String msg, final Object... os) {
|
||||||
if (isInfoEnabled()) {
|
if (isInfoEnabled()) {
|
||||||
final String componentMessage = getComponentMessage(msg);
|
final String componentMessage = getComponentMessage(msg);
|
||||||
final Object[] arguments = insertComponent(os);
|
final Object[] arguments = insertComponent(os);
|
||||||
|
@ -291,7 +291,7 @@ public class SimpleProcessLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(final String msg, final Object[] os) {
|
public void error(final String msg, final Object... os) {
|
||||||
if (isErrorEnabled()) {
|
if (isErrorEnabled()) {
|
||||||
final String componentMessage = getComponentMessage(msg);
|
final String componentMessage = getComponentMessage(msg);
|
||||||
final Object[] arguments = insertComponent(os);
|
final Object[] arguments = insertComponent(os);
|
||||||
|
@ -348,7 +348,7 @@ public class SimpleProcessLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(final String msg, final Object[] os) {
|
public void debug(final String msg, final Object... os) {
|
||||||
if (isDebugEnabled()) {
|
if (isDebugEnabled()) {
|
||||||
final String componentMessage = getComponentMessage(msg);
|
final String componentMessage = getComponentMessage(msg);
|
||||||
final Object[] arguments = insertComponent(os);
|
final Object[] arguments = insertComponent(os);
|
||||||
|
@ -416,7 +416,7 @@ public class SimpleProcessLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void log(final LogLevel level, final String msg, final Object[] os) {
|
public void log(final LogLevel level, final String msg, final Object... os) {
|
||||||
switch (level) {
|
switch (level) {
|
||||||
case DEBUG:
|
case DEBUG:
|
||||||
debug(msg, os);
|
debug(msg, os);
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class TerminationAwareLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(String msg, Object[] os) {
|
public void warn(String msg, Object... os) {
|
||||||
if (isTerminated()) {
|
if (isTerminated()) {
|
||||||
logger.debug(getMessage(msg, LogLevel.WARN), os);
|
logger.debug(getMessage(msg, LogLevel.WARN), os);
|
||||||
return;
|
return;
|
||||||
|
@ -94,7 +94,7 @@ public class TerminationAwareLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(String msg, Object[] os) {
|
public void trace(String msg, Object... os) {
|
||||||
if (isTerminated()) {
|
if (isTerminated()) {
|
||||||
logger.trace(getMessage(msg, LogLevel.TRACE), os);
|
logger.trace(getMessage(msg, LogLevel.TRACE), os);
|
||||||
return;
|
return;
|
||||||
|
@ -159,7 +159,7 @@ public class TerminationAwareLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(String msg, Object[] os) {
|
public void info(String msg, Object... os) {
|
||||||
if (isTerminated()) {
|
if (isTerminated()) {
|
||||||
logger.debug(getMessage(msg, LogLevel.INFO), os);
|
logger.debug(getMessage(msg, LogLevel.INFO), os);
|
||||||
return;
|
return;
|
||||||
|
@ -204,7 +204,7 @@ public class TerminationAwareLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(String msg, Object[] os) {
|
public void error(String msg, Object... os) {
|
||||||
if (isTerminated()) {
|
if (isTerminated()) {
|
||||||
logger.debug(getMessage(msg, LogLevel.ERROR), os);
|
logger.debug(getMessage(msg, LogLevel.ERROR), os);
|
||||||
return;
|
return;
|
||||||
|
@ -244,7 +244,7 @@ public class TerminationAwareLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(String msg, Object[] os) {
|
public void debug(String msg, Object... os) {
|
||||||
if (isTerminated()) {
|
if (isTerminated()) {
|
||||||
logger.debug(getMessage(msg, LogLevel.DEBUG), os);
|
logger.debug(getMessage(msg, LogLevel.DEBUG), os);
|
||||||
return;
|
return;
|
||||||
|
@ -284,7 +284,7 @@ public class TerminationAwareLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void log(LogLevel level, String msg, Object[] os) {
|
public void log(LogLevel level, String msg, Object... os) {
|
||||||
if (isTerminated()) {
|
if (isTerminated()) {
|
||||||
logger.debug(getMessage(msg, level), os);
|
logger.debug(getMessage(msg, level), os);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -291,7 +291,7 @@ public class ConnectableTask {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
rawSession.commitAsync(null, t -> {
|
rawSession.commitAsync(null, t -> {
|
||||||
procLog.error("Failed to commit session {} due to {}; rolling back", new Object[]{rawSession, t.toString()}, t);
|
procLog.error("Failed to commit session {} due to {}; rolling back", rawSession, t.toString(), t);
|
||||||
});
|
});
|
||||||
} catch (final TerminatedTaskException tte) {
|
} catch (final TerminatedTaskException tte) {
|
||||||
procLog.debug("Cannot commit Batch Process Session because the Task was forcefully terminated", tte);
|
procLog.debug("Cannot commit Batch Process Session because the Task was forcefully terminated", tte);
|
||||||
|
|
|
@ -35,7 +35,7 @@ public class MockComponentLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(String msg, Object[] os) {
|
public void warn(String msg, Object... os) {
|
||||||
logger.warn(msg, os);
|
logger.warn(msg, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ public class MockComponentLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(String msg, Object[] os) {
|
public void trace(String msg, Object... os) {
|
||||||
logger.trace(msg, os);
|
logger.trace(msg, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ public class MockComponentLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(String msg, Object[] os) {
|
public void info(String msg, Object... os) {
|
||||||
logger.info(msg, os);
|
logger.info(msg, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ public class MockComponentLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(String msg, Object[] os) {
|
public void error(String msg, Object... os) {
|
||||||
logger.error(msg, os);
|
logger.error(msg, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +150,7 @@ public class MockComponentLogger implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(String msg, Object[] os) {
|
public void debug(String msg, Object... os) {
|
||||||
logger.debug(msg, os);
|
logger.debug(msg, os);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -233,7 +233,7 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
|
||||||
successfulFlowFiles.addAll(flowFiles);
|
successfulFlowFiles.addAll(flowFiles);
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
getLogger().error("Failed to publish the messages to Google Cloud PubSub Lite topic '{}' due to {}, "
|
getLogger().error("Failed to publish the messages to Google Cloud PubSub Lite topic '{}' due to {}, "
|
||||||
+ "routing all messages from the batch to failure", new Object[]{topicName, e.getLocalizedMessage()}, e);
|
+ "routing all messages from the batch to failure", topicName, e.getLocalizedMessage(), e);
|
||||||
session.transfer(flowFiles, REL_FAILURE);
|
session.transfer(flowFiles, REL_FAILURE);
|
||||||
context.yield();
|
context.yield();
|
||||||
}
|
}
|
||||||
|
|
|
@ -263,7 +263,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
|
||||||
final Map<String, String> attributes = StorageAttributes.createAttributes(blob.blob);
|
final Map<String, String> attributes = StorageAttributes.createAttributes(blob.blob);
|
||||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
} catch (final StorageException | IOException e) {
|
} catch (final StorageException | IOException e) {
|
||||||
getLogger().error("Failed to fetch GCS Object due to {}", new Object[] {e}, e);
|
getLogger().error("Failed to fetch GCS Object due to {}", e, e);
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -526,7 +526,7 @@ public class PutGCSObject extends AbstractGCSProcessor {
|
||||||
}
|
}
|
||||||
} catch (StorageException e) {
|
} catch (StorageException e) {
|
||||||
getLogger().error("Failure completing upload flowfile={} bucket={} key={} reason={}",
|
getLogger().error("Failure completing upload flowfile={} bucket={} key={} reason={}",
|
||||||
new Object[]{ffFilename, bucket, key, e.getMessage()}, e);
|
ffFilename, bucket, key, e.getMessage(), e);
|
||||||
throw (e);
|
throw (e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -547,7 +547,7 @@ public class PutGCSObject extends AbstractGCSProcessor {
|
||||||
new Object[]{ff, millis});
|
new Object[]{ff, millis});
|
||||||
|
|
||||||
} catch (final ProcessException | StorageException e) {
|
} catch (final ProcessException | StorageException e) {
|
||||||
getLogger().error("Failed to put {} to Google Cloud Storage due to {}", new Object[]{flowFile, e.getMessage()}, e);
|
getLogger().error("Failed to put {} to Google Cloud Storage due to {}", flowFile, e.getMessage(), e);
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,8 +142,7 @@ public class ExecuteGraphQuery extends AbstractGraphExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
getLogger().error("Failed to execute graph statement due to {}",
|
getLogger().error("Failed to execute graph statement due to {}", exception.getLocalizedMessage(), exception);
|
||||||
new Object[]{exception.getLocalizedMessage()}, exception);
|
|
||||||
session.remove(output);
|
session.remove(output);
|
||||||
if (flowFile != null) {
|
if (flowFile != null) {
|
||||||
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, String.valueOf(exception.getMessage()));
|
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, String.valueOf(exception.getMessage()));
|
||||||
|
|
|
@ -182,7 +182,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
|
||||||
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
|
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
|
||||||
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
|
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
|
||||||
} catch (ProcessException e) {
|
} catch (ProcessException e) {
|
||||||
getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile}, e);
|
getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", flowFile, e);
|
||||||
session.transfer(flowFile, RELATIONSHIP_FAILURE);
|
session.transfer(flowFile, RELATIONSHIP_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -201,7 +201,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
|
getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e);
|
||||||
session.transfer(flowFile, getFailureRelationship());
|
session.transfer(flowFile, getFailureRelationship());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -284,7 +284,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().warn("Could not add to processing queue due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().warn("Could not add to processing queue due to {}", e.getMessage(), e);
|
||||||
} finally {
|
} finally {
|
||||||
queueLock.unlock();
|
queueLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -470,7 +470,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
|
||||||
hdfs.setOwner(name, owner, group);
|
hdfs.setOwner(name, owner, group);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e.getMessage()}, e);
|
getLogger().warn("Could not change owner or group of {} on HDFS due to {}", name, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -492,7 +492,7 @@ class TestListHDFS {
|
||||||
// check that there are no throwables that are not of JobConf CNFE exceptions
|
// check that there are no throwables that are not of JobConf CNFE exceptions
|
||||||
.allMatch(throwable -> throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf")));
|
.allMatch(throwable -> throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf")));
|
||||||
verify(mockLogger, never()).error(anyString(), any(Object[].class));
|
verify(mockLogger, never()).error(anyString(), any(Object[].class));
|
||||||
verify(mockLogger, never()).error(anyString(), any(Object[].class), any(Throwable.class));
|
verify(mockLogger, never()).error(anyString(), any(Object[].class));
|
||||||
|
|
||||||
// assert that no files were listed
|
// assert that no files were listed
|
||||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
|
||||||
|
|
|
@ -189,7 +189,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (final ProcessException pe) {
|
} catch (final ProcessException pe) {
|
||||||
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString()}, pe);
|
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", flowFile, pe.toString(), pe);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -157,10 +157,10 @@ public class NiFiRecordSerDe extends AbstractSerDe {
|
||||||
populateRecord(result, record.getValue(field), field, schema);
|
populateRecord(result, record.getValue(field), field, schema);
|
||||||
}
|
}
|
||||||
} catch(SerDeException se) {
|
} catch(SerDeException se) {
|
||||||
log.error("Error [{}] parsing Record [{}].", new Object[]{se.toString(), record}, se);
|
log.error("Error [{}] parsing Record [{}].", se.toString(), record, se);
|
||||||
throw se;
|
throw se;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error [{}] parsing Record [{}].", new Object[]{e.toString(), record}, e);
|
log.error("Error [{}] parsing Record [{}].", e.toString(), record, e);
|
||||||
throw new SerDeException(e);
|
throw new SerDeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -225,7 +225,7 @@ public abstract class AbstractHive3QLProcessor extends AbstractSessionFactoryPro
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
// Log which attribute/parameter had an error, then rethrow to be handled at the top level
|
// Log which attribute/parameter had an error, then rethrow to be handled at the top level
|
||||||
getLogger().error("Error setting parameter {} to value from {} ({})", new Object[]{parameterIndex, attrName, parameterValue}, e);
|
getLogger().error("Error setting parameter {} to value from {} ({})", parameterIndex, attrName, parameterValue, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -286,7 +286,7 @@ public abstract class AbstractHive3QLProcessor extends AbstractSessionFactoryPro
|
||||||
node = new ParseDriver().parse(normalize(query));
|
node = new ParseDriver().parse(normalize(query));
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
// If failed to parse the query, just log a message, but continue.
|
// If failed to parse the query, just log a message, but continue.
|
||||||
getLogger().debug("Failed to parse query: {} due to {}", new Object[]{query, e}, e);
|
getLogger().debug("Failed to parse query: {} due to {}", query, e, e);
|
||||||
return Collections.emptySet();
|
return Collections.emptySet();
|
||||||
}
|
}
|
||||||
final HashSet<TableName> tableNames = new HashSet<>();
|
final HashSet<TableName> tableNames = new HashSet<>();
|
||||||
|
|
|
@ -248,7 +248,7 @@ public class PutHive3QL extends AbstractHive3QLProcessor {
|
||||||
tableNames.addAll(findTableNames(hiveQL));
|
tableNames.addAll(findTableNames(hiveQL));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// If failed to parse the query, just log a warning message, but continue.
|
// If failed to parse the query, just log a warning message, but continue.
|
||||||
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
|
getLogger().warn("Failed to parse hiveQL: {} due to {}", hiveQL, e, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
stmt.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(flowFile).asInteger());
|
stmt.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(flowFile).asInteger());
|
||||||
|
@ -276,14 +276,13 @@ public class PutHive3QL extends AbstractHive3QLProcessor {
|
||||||
onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
|
onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
|
||||||
switch (r.destination()) {
|
switch (r.destination()) {
|
||||||
case Failure:
|
case Failure:
|
||||||
getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[] {i, e}, e);
|
getLogger().error("Failed to update Hive for {} due to {}; routing to failure", i, e, e);
|
||||||
break;
|
break;
|
||||||
case Retry:
|
case Retry:
|
||||||
getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
|
getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", i, e, e);
|
||||||
new Object[] {i, e}, e);
|
|
||||||
break;
|
break;
|
||||||
case Self:
|
case Self:
|
||||||
getLogger().error("Failed to update Hive for {} due to {};", new Object[] {i, e}, e);
|
getLogger().error("Failed to update Hive for {} due to {};", i, e, e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -517,7 +517,7 @@ public class PutHive3Streaming extends AbstractProcessor {
|
||||||
} else {
|
} else {
|
||||||
log.error(
|
log.error(
|
||||||
"Failed to create {} for {} - routing to failure",
|
"Failed to create {} for {} - routing to failure",
|
||||||
new Object[]{RecordReader.class.getSimpleName(), flowFile},
|
RecordReader.class.getSimpleName(), flowFile,
|
||||||
rrfe
|
rrfe
|
||||||
);
|
);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
@ -539,14 +539,14 @@ public class PutHive3Streaming extends AbstractProcessor {
|
||||||
flowFile = session.putAllAttributes(flowFile, updateAttributes);
|
flowFile = session.putAllAttributes(flowFile, updateAttributes);
|
||||||
log.error(
|
log.error(
|
||||||
"Exception while processing {} - routing to failure",
|
"Exception while processing {} - routing to failure",
|
||||||
new Object[]{flowFile},
|
flowFile,
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
} catch (DiscontinuedException e) {
|
} catch (DiscontinuedException e) {
|
||||||
// The input FlowFile processing is discontinued. Keep it in the input queue.
|
// The input FlowFile processing is discontinued. Keep it in the input queue.
|
||||||
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
|
getLogger().warn("Discontinued processing for {} due to {}", flowFile, e, e);
|
||||||
session.transfer(flowFile, Relationship.SELF);
|
session.transfer(flowFile, Relationship.SELF);
|
||||||
} catch (ConnectionError ce) {
|
} catch (ConnectionError ce) {
|
||||||
// If we can't connect to the metastore, yield the processor
|
// If we can't connect to the metastore, yield the processor
|
||||||
|
@ -579,7 +579,7 @@ public class PutHive3Streaming extends AbstractProcessor {
|
||||||
flowFile = session.putAllAttributes(flowFile, updateAttributes);
|
flowFile = session.putAllAttributes(flowFile, updateAttributes);
|
||||||
log.error(
|
log.error(
|
||||||
"Exception while trying to stream {} to hive - routing to failure",
|
"Exception while trying to stream {} to hive - routing to failure",
|
||||||
new Object[]{flowFile},
|
flowFile,
|
||||||
se
|
se
|
||||||
);
|
);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
|
|
@ -379,7 +379,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
|
||||||
st.setFetchSize(fetchSize);
|
st.setFetchSize(fetchSize);
|
||||||
} catch (SQLException se) {
|
} catch (SQLException se) {
|
||||||
// Not all drivers support this, just log the error (at debug level) and move on
|
// Not all drivers support this, just log the error (at debug level) and move on
|
||||||
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
|
logger.debug("Cannot set fetch size to {} due to {}", fetchSize, se.getLocalizedMessage(), se);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -455,7 +455,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
|
||||||
attributes.putAll(toQueryTableAttributes(findTableNames(hqlStatement)));
|
attributes.putAll(toQueryTableAttributes(findTableNames(hqlStatement)));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// If failed to parse the query, just log a warning message, but continue.
|
// If failed to parse the query, just log a warning message, but continue.
|
||||||
getLogger().warn("Failed to parse query: {} due to {}", new Object[]{hqlStatement, e}, e);
|
getLogger().warn("Failed to parse query: {} due to {}", hqlStatement, e, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set MIME type on output document and add extension to filename
|
// Set MIME type on output document and add extension to filename
|
||||||
|
|
|
@ -357,7 +357,7 @@ public class UpdateHive3Table extends AbstractProcessor {
|
||||||
} catch (RecordReaderFactoryException rrfe) {
|
} catch (RecordReaderFactoryException rrfe) {
|
||||||
log.error(
|
log.error(
|
||||||
"Failed to create {} for {} - routing to failure",
|
"Failed to create {} for {} - routing to failure",
|
||||||
new Object[]{RecordReader.class.getSimpleName(), flowFile},
|
RecordReader.class.getSimpleName(), flowFile,
|
||||||
rrfe
|
rrfe
|
||||||
);
|
);
|
||||||
// Since we are wrapping the exceptions above there should always be a cause
|
// Since we are wrapping the exceptions above there should always be a cause
|
||||||
|
@ -467,11 +467,11 @@ public class UpdateHive3Table extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
} catch (IOException | SQLException e) {
|
} catch (IOException | SQLException e) {
|
||||||
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
|
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
|
||||||
log.error("Exception while processing {} - routing to failure", new Object[]{flowFile}, e);
|
log.error("Exception while processing {} - routing to failure", flowFile, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
} catch (DiscontinuedException e) {
|
} catch (DiscontinuedException e) {
|
||||||
// The input FlowFile processing is discontinued. Keep it in the input queue.
|
// The input FlowFile processing is discontinued. Keep it in the input queue.
|
||||||
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
|
getLogger().warn("Discontinued processing for {} due to {}", flowFile, e, e);
|
||||||
session.transfer(flowFile, Relationship.SELF);
|
session.transfer(flowFile, Relationship.SELF);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
|
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class GetHTMLElement
|
||||||
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
|
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
|
||||||
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
|
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
} catch (final Exception ex) {
|
} catch (final Exception ex) {
|
||||||
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex, REL_INVALID_HTML}, ex);
|
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", flowFile, ex, REL_INVALID_HTML, ex);
|
||||||
session.transfer(flowFile, REL_INVALID_HTML);
|
session.transfer(flowFile, REL_INVALID_HTML);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,7 +147,7 @@ public class ModifyHTMLElement extends AbstractHTMLProcessor {
|
||||||
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
|
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
|
||||||
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
|
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex.toString(), REL_INVALID_HTML.getName()}, ex);
|
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", flowFile, ex.toString(), REL_INVALID_HTML.getName(), ex);
|
||||||
session.transfer(flowFile, REL_INVALID_HTML);
|
session.transfer(flowFile, REL_INVALID_HTML);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,7 +136,7 @@ public class PutHTMLElement extends AbstractHTMLProcessor {
|
||||||
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
|
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
|
||||||
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
|
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex.toString(), REL_INVALID_HTML.getName()}, ex);
|
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", flowFile, ex.toString(), REL_INVALID_HTML.getName(), ex);
|
||||||
session.transfer(flowFile, REL_INVALID_HTML);
|
session.transfer(flowFile, REL_INVALID_HTML);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,7 +120,7 @@ public abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
|
influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e);
|
getLogger().error("Error while getting connection {}", e.getLocalizedMessage(), e);
|
||||||
throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e);
|
throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e);
|
||||||
}
|
}
|
||||||
getLogger().info("InfluxDB connection created for host {}",
|
getLogger().info("InfluxDB connection created for host {}",
|
||||||
|
|
|
@ -238,11 +238,11 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
|
||||||
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, exception.getMessage());
|
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, exception.getMessage());
|
||||||
if ( exception.getCause() instanceof SocketTimeoutException ) {
|
if ( exception.getCause() instanceof SocketTimeoutException ) {
|
||||||
getLogger().error("Failed to read from InfluxDB due SocketTimeoutException to {} and retrying",
|
getLogger().error("Failed to read from InfluxDB due SocketTimeoutException to {} and retrying",
|
||||||
new Object[]{exception.getCause().getLocalizedMessage()}, exception.getCause());
|
exception.getCause().getLocalizedMessage(), exception.getCause());
|
||||||
session.transfer(outgoingFlowFile, REL_RETRY);
|
session.transfer(outgoingFlowFile, REL_RETRY);
|
||||||
} else {
|
} else {
|
||||||
getLogger().error("Failed to read from InfluxDB due to {}",
|
getLogger().error("Failed to read from InfluxDB due to {}",
|
||||||
new Object[]{exception.getLocalizedMessage()}, exception);
|
exception.getLocalizedMessage(), exception);
|
||||||
session.transfer(outgoingFlowFile, REL_FAILURE);
|
session.transfer(outgoingFlowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
context.yield();
|
context.yield();
|
||||||
|
|
|
@ -178,17 +178,17 @@ public class PutInfluxDB extends AbstractInfluxDBProcessor {
|
||||||
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
|
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
|
||||||
if ( exception.getCause() instanceof SocketTimeoutException ) {
|
if ( exception.getCause() instanceof SocketTimeoutException ) {
|
||||||
getLogger().error("Failed to insert into influxDB due SocketTimeoutException to {} and retrying",
|
getLogger().error("Failed to insert into influxDB due SocketTimeoutException to {} and retrying",
|
||||||
new Object[]{exception.getLocalizedMessage()}, exception);
|
exception.getLocalizedMessage(), exception);
|
||||||
session.transfer(flowFile, REL_RETRY);
|
session.transfer(flowFile, REL_RETRY);
|
||||||
} else {
|
} else {
|
||||||
getLogger().error("Failed to insert into influxDB due to {}",
|
getLogger().error("Failed to insert into influxDB due to {}",
|
||||||
new Object[]{exception.getLocalizedMessage()}, exception);
|
exception.getLocalizedMessage(), exception);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
context.yield();
|
context.yield();
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
getLogger().error("Failed to insert into influxDB due to {}",
|
getLogger().error("Failed to insert into influxDB due to {}",
|
||||||
new Object[]{exception.getLocalizedMessage()}, exception);
|
exception.getLocalizedMessage(), exception);
|
||||||
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
|
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
context.yield();
|
context.yield();
|
||||||
|
|
|
@ -204,8 +204,7 @@ class JMSConsumer extends JMSWorker {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
} catch (final MessageConversionException mce) {
|
} catch (final MessageConversionException mce) {
|
||||||
processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.",
|
processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", message, mce);
|
||||||
new Object[] {message}, mce);
|
|
||||||
acknowledge(message, session);
|
acknowledge(message, session);
|
||||||
|
|
||||||
if (errorQueueName != null) {
|
if (errorQueueName != null) {
|
||||||
|
|
|
@ -540,11 +540,9 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl
|
||||||
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
|
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
|
||||||
+ "Will roll back session and discard any partially received data.", lease);
|
+ "Will roll back session and discard any partially received data.", lease);
|
||||||
} catch (final KafkaException kex) {
|
} catch (final KafkaException kex) {
|
||||||
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
|
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
|
||||||
new Object[]{lease, kex}, kex);
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
|
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
|
||||||
new Object[]{lease, t}, t);
|
|
||||||
} finally {
|
} finally {
|
||||||
activeLeases.remove(lease);
|
activeLeases.remove(lease);
|
||||||
}
|
}
|
||||||
|
|
|
@ -483,11 +483,9 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
|
||||||
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
|
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
|
||||||
+ "Will roll back session and discard any partially received data.", lease);
|
+ "Will roll back session and discard any partially received data.", lease);
|
||||||
} catch (final KafkaException kex) {
|
} catch (final KafkaException kex) {
|
||||||
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
|
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
|
||||||
new Object[]{lease, kex}, kex);
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
|
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
|
||||||
new Object[]{lease, t}, t);
|
|
||||||
} finally {
|
} finally {
|
||||||
activeLeases.remove(lease);
|
activeLeases.remove(lease);
|
||||||
}
|
}
|
||||||
|
|
|
@ -222,7 +222,7 @@ public class KafkaRecordSink_2_6 extends AbstractControllerService implements Ka
|
||||||
try {
|
try {
|
||||||
producer = createProducer(kafkaProperties);
|
producer = createProducer(kafkaProperties);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Could not create Kafka producer due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Could not create Kafka producer due to {}", e.getMessage(), e);
|
||||||
throw new InitializationException(e);
|
throw new InitializationException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -510,7 +510,7 @@ public class PutKudu extends AbstractKuduProcessor {
|
||||||
record = recordSet.next();
|
record = recordSet.next();
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
getLogger().error("Failed to push {} to Kudu", new Object[] {flowFile}, ex);
|
getLogger().error("Failed to push {} to Kudu", flowFile, ex);
|
||||||
flowFileFailures.put(flowFile, ex);
|
flowFileFailures.put(flowFile, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,7 +235,7 @@ public class PutMongo extends AbstractMongoProcessor {
|
||||||
session.getProvenanceReporter().send(flowFile, getURI(context));
|
session.getProvenanceReporter().send(flowFile, getURI(context));
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Failed to insert {} into MongoDB due to {}", new Object[] {flowFile, e}, e);
|
logger.error("Failed to insert {} into MongoDB due to {}", flowFile, e, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
context.yield();
|
context.yield();
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class MongoDBControllerService extends AbstractControllerService implemen
|
||||||
final MongoClientSettings clientSettings = builder.build();
|
final MongoClientSettings clientSettings = builder.build();
|
||||||
return MongoClients.create(clientSettings);
|
return MongoClients.create(clientSettings);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
|
getLogger().error("Failed to schedule {} due to {}", this.getClass().getName(), e, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,7 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
|
||||||
return Optional.ofNullable(new MapRecord(schema, result));
|
return Optional.ofNullable(new MapRecord(schema, result));
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
getLogger().error("Error during lookup {}", new Object[]{ query.toJson() }, ex);
|
getLogger().error("Error during lookup {}", query.toJson(), ex);
|
||||||
throw new LookupFailureException(ex);
|
throw new LookupFailureException(ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,8 +101,7 @@ public class ExpressionHandler extends AbstractActionHandlerService {
|
||||||
executeSPEL(command, facts);
|
executeSPEL(command, facts);
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
getLogger().warn("Error occurred when attempting to execute expression. Action: {}, Facts - {}",
|
getLogger().warn("Error occurred when attempting to execute expression. Action: {}, Facts - {}", action, facts, ex);
|
||||||
new Object[]{action, facts}, ex);
|
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
getLogger().warn("Command attribute was not provided. Action: {}, Facts - {}",
|
getLogger().warn("Command attribute was not provided. Action: {}, Facts - {}",
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void warn(String msg, Object[] os) {
|
public void warn(String msg, Object... os) {
|
||||||
warn(msg);
|
warn(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void trace(String msg, Object[] os) {
|
public void trace(String msg, Object... os) {
|
||||||
trace(msg);
|
trace(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void info(String msg, Object[] os) {
|
public void info(String msg, Object... os) {
|
||||||
info(msg);
|
info(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,8 +147,8 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void error(String msg, Object[] os) {
|
public void error(String msg, Object... os) {
|
||||||
error(convertMessage(msg,os));
|
error(convertMessage(msg, os));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -172,8 +172,8 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void debug(String msg, Object[] os) {
|
public void debug(String msg, Object... os) {
|
||||||
debug(convertMessage(msg,os));
|
debug(convertMessage(msg, os));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -197,7 +197,7 @@ public class MockComponentLog implements ComponentLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void log(LogLevel level, String msg, Object[] os) {
|
public void log(LogLevel level, String msg, Object... os) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -227,10 +227,10 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
|
||||||
final long millis = System.currentTimeMillis() - startMillis;
|
final long millis = System.currentTimeMillis() - startMillis;
|
||||||
session.getProvenanceReporter().modifyContent(flowFile, "Transformed " + transformCount + " Records, Dropped " + counts.getDroppedCount() + " Records", millis);
|
session.getProvenanceReporter().modifyContent(flowFile, "Transformed " + transformCount + " Records, Dropped " + counts.getDroppedCount() + " Records", millis);
|
||||||
} catch (final ProcessException e) {
|
} catch (final ProcessException e) {
|
||||||
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {counts.getRecordCount(), flowFile}, e.getCause());
|
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", counts.getRecordCount(), flowFile, e.getCause());
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {counts.getRecordCount(), flowFile}, e);
|
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", counts.getRecordCount(), flowFile, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class ScriptRunnerFactory {
|
||||||
try {
|
try {
|
||||||
additionalClasspath.add(modulePath.toURI().toURL());
|
additionalClasspath.add(modulePath.toURI().toURL());
|
||||||
} catch (MalformedURLException mue) {
|
} catch (MalformedURLException mue) {
|
||||||
log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
|
log.warn("{} is not a valid file/folder, ignoring", modulePath.getAbsolutePath(), mue);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the path is a directory, we need to scan for JARs and add them to the classpath
|
// If the path is a directory, we need to scan for JARs and add them to the classpath
|
||||||
|
@ -109,7 +109,7 @@ public class ScriptRunnerFactory {
|
||||||
additionalClasspath.add(jarFile.toURI().toURL());
|
additionalClasspath.add(jarFile.toURI().toURL());
|
||||||
|
|
||||||
} catch (MalformedURLException mue) {
|
} catch (MalformedURLException mue) {
|
||||||
log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
|
log.warn("{} is not a valid file/folder, ignoring", modulePath.getAbsolutePath(), mue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -288,7 +288,7 @@ public class GetSolr extends SolrProcessor {
|
||||||
|
|
||||||
return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString());
|
return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString());
|
||||||
} catch (SolrServerException | IOException e) {
|
} catch (SolrServerException | IOException e) {
|
||||||
getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", new Object[]{solrQuery.toString(), e}, e);
|
getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", solrQuery.toString(), e, e);
|
||||||
throw new ProcessException(e);
|
throw new ProcessException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -420,12 +420,12 @@ public class GetSolr extends SolrProcessor {
|
||||||
} catch (final SolrServerException | SchemaNotFoundException | IOException e) {
|
} catch (final SolrServerException | SchemaNotFoundException | IOException e) {
|
||||||
context.yield();
|
context.yield();
|
||||||
session.rollback();
|
session.rollback();
|
||||||
logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), e}, e);
|
logger.error("Failed to execute query {} due to {}", solrQuery.toString(), e, e);
|
||||||
throw new ProcessException(e);
|
throw new ProcessException(e);
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
context.yield();
|
context.yield();
|
||||||
session.rollback();
|
session.rollback();
|
||||||
logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), t}, t);
|
logger.error("Failed to execute query {} due to {}", solrQuery.toString(), t, t);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -511,7 +511,7 @@ public class QuerySolr extends SolrProcessor {
|
||||||
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION, e.getClass().getName());
|
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION, e.getClass().getName());
|
||||||
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION_MESSAGE, e.getMessage());
|
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION_MESSAGE, e.getMessage());
|
||||||
session.transfer(flowFileResponse, FAILURE);
|
session.transfer(flowFileResponse, FAILURE);
|
||||||
logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", new Object[]{solrQuery.toString(), e}, e);
|
logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", solrQuery.toString(), e, e);
|
||||||
if (flowFileOriginal != null) {
|
if (flowFileOriginal != null) {
|
||||||
flowFileOriginal = session.penalize(flowFileOriginal);
|
flowFileOriginal = session.penalize(flowFileOriginal);
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,7 +194,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
|
||||||
// If no code was provided, assume it is in the content of the incoming flow file
|
// If no code was provided, assume it is in the content of the incoming flow file
|
||||||
code = IOUtils.toString(inputStream, charset);
|
code = IOUtils.toString(inputStream, charset);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
|
log.error("Error reading input flowfile, penalizing and routing to failure", flowFile, ioe.getMessage(), ioe);
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
|
@ -224,7 +224,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException | SessionManagerException e) {
|
} catch (IOException | SessionManagerException e) {
|
||||||
log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, e.getMessage()}, e);
|
log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", flowFile, e.getMessage(), e);
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -380,7 +380,7 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
context.getStateManager().clear(Scope.CLUSTER);
|
context.getStateManager().clear(Scope.CLUSTER);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Unable to clear processor state due to {}", new Object[] {e.getMessage()}, e);
|
getLogger().error("Unable to clear processor state due to {}", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,7 +449,7 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Unable to load data from State Manager due to {}", new Object[] {e.getMessage()}, e);
|
getLogger().error("Unable to load data from State Manager due to {}", e.getMessage(), e);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -517,7 +517,7 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
saveState(session, new TimeRange(earliestTime, latestTime));
|
saveState(session, new TimeRange(earliestTime, latestTime));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Unable to load data from State Manager due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Unable to load data from State Manager due to {}", e.getMessage(), e);
|
||||||
session.rollback();
|
session.rollback();
|
||||||
context.yield();
|
context.yield();
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,7 +191,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
|
||||||
new Object[] {responseMessage.getStatus(), IOUtils.toString(responseMessage.getContent(), "UTF-8")});
|
new Object[] {responseMessage.getStatus(), IOUtils.toString(responseMessage.getContent(), "UTF-8")});
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Error during communication with Splunk: {}", new Object[] {e.getMessage()}, e);
|
getLogger().error("Error during communication with Splunk: {}", e.getMessage(), e);
|
||||||
|
|
||||||
if (responseMessage != null) {
|
if (responseMessage != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -264,7 +264,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
|
||||||
parameters.add(URLEncoder.encode(parameter.getKey(), "UTF-8") + '=' + URLEncoder.encode(parameter.getValue(), "UTF-8"));
|
parameters.add(URLEncoder.encode(parameter.getKey(), "UTF-8") + '=' + URLEncoder.encode(parameter.getValue(), "UTF-8"));
|
||||||
}
|
}
|
||||||
} catch (final UnsupportedEncodingException e) {
|
} catch (final UnsupportedEncodingException e) {
|
||||||
getLogger().error("Could not be initialized because of: {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Could not be initialized because of: {}", e.getMessage(), e);
|
||||||
throw new ProcessException(e);
|
throw new ProcessException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class MetricsEventReportingTask extends AbstractReportingTask implements
|
||||||
|
|
||||||
fireRules(context, actionHandler, rulesEngineService, sql);
|
fireRules(context, actionHandler, rulesEngineService, sql);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Error opening loading rules: {}", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ public class MetricsSqlQueryService implements MetricsQueryService {
|
||||||
try {
|
try {
|
||||||
recordSet = new ResultSetRecordSet(rs, writerSchema, defaultPrecision, defaultScale);
|
recordSet = new ResultSetRecordSet(rs, writerSchema, defaultPrecision, defaultScale);
|
||||||
} catch (final SQLException e) {
|
} catch (final SQLException e) {
|
||||||
getLogger().error("Error creating record set from query results due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Error creating record set from query results due to {}", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return recordSet;
|
return recordSet;
|
||||||
|
|
|
@ -103,7 +103,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask implements Que
|
||||||
try {
|
try {
|
||||||
recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
|
recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Error creating record set from query results due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Error creating record set from query results due to {}", e.getMessage(), e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask implements Que
|
||||||
attributes.put("reporting.task.type", this.getClass().getSimpleName());
|
attributes.put("reporting.task.type", this.getClass().getSimpleName());
|
||||||
recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
|
recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Error during transmission of query results due to {}", e.getMessage(), e);
|
||||||
return;
|
return;
|
||||||
} finally {
|
} finally {
|
||||||
metricsQueryService.closeQuietly(queryResult);
|
metricsQueryService.closeQuietly(queryResult);
|
||||||
|
@ -124,7 +124,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask implements Que
|
||||||
final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
|
final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
|
||||||
getLogger().debug("Successfully queried and sent in {} millis", elapsedMillis);
|
getLogger().debug("Successfully queried and sent in {} millis", elapsedMillis);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Error processing the query due to {}", new Object[]{e.getMessage()}, e);
|
getLogger().error("Error processing the query due to {}", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -272,7 +272,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
|
||||||
st.setFetchSize(fetchSize);
|
st.setFetchSize(fetchSize);
|
||||||
} catch (SQLException se) {
|
} catch (SQLException se) {
|
||||||
// Not all drivers support this, just log the error (at debug level) and move on
|
// Not all drivers support this, just log the error (at debug level) and move on
|
||||||
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
|
logger.debug("Cannot set fetch size to {} due to {}", fetchSize, se.getLocalizedMessage(), se);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
st.setQueryTimeout(queryTimeout); // timeout in seconds
|
st.setQueryTimeout(queryTimeout); // timeout in seconds
|
||||||
|
|
|
@ -321,7 +321,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
|
||||||
st.setFetchSize(fetchSize);
|
st.setFetchSize(fetchSize);
|
||||||
} catch (SQLException se) {
|
} catch (SQLException se) {
|
||||||
// Not all drivers support this, just log the error (at debug level) and move on
|
// Not all drivers support this, just log the error (at debug level) and move on
|
||||||
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
|
logger.debug("Cannot set fetch size to {} due to {}", fetchSize, se.getLocalizedMessage(), se);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -327,7 +327,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (ProcessException e) {
|
} catch (ProcessException e) {
|
||||||
getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[]{flowFile, e.toString()}, e);
|
getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", flowFile, e.toString(), e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -345,7 +345,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (final ProcessException pe) {
|
} catch (final ProcessException pe) {
|
||||||
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
|
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", flowFile, pe.toString(), pe);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -398,7 +398,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
} catch (final ProcessException pe) {
|
} catch (final ProcessException pe) {
|
||||||
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
|
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
|
||||||
new Object[] { flowFile, statementType, pe.toString() }, pe);
|
flowFile, statementType, pe.toString(), pe);
|
||||||
session.remove(created);
|
session.remove(created);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -521,7 +521,7 @@ public class EnforceOrder extends AbstractProcessor {
|
||||||
|
|
||||||
private void transferToFailure(final FlowFile flowFile, final String message, final Throwable cause) {
|
private void transferToFailure(final FlowFile flowFile, final String message, final Throwable cause) {
|
||||||
if (cause != null) {
|
if (cause != null) {
|
||||||
getLogger().warn(message + " {}", new Object[]{flowFile}, cause);
|
getLogger().warn(message + " {}", flowFile, cause);
|
||||||
} else {
|
} else {
|
||||||
getLogger().warn(message + " {}", new Object[]{flowFile});
|
getLogger().warn(message + " {}", new Object[]{flowFile});
|
||||||
}
|
}
|
||||||
|
|
|
@ -302,7 +302,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
} catch (final PathNotFoundException e) {
|
} catch (final PathNotFoundException e) {
|
||||||
if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) {
|
if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) {
|
||||||
logger.warn("FlowFile {} could not find path {} for attribute key {}.",
|
logger.warn("FlowFile {} could not find path {} for attribute key {}.",
|
||||||
new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
|
flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (destinationIsAttribute) {
|
if (destinationIsAttribute) {
|
||||||
|
|
|
@ -242,7 +242,7 @@ public class FetchFile extends AbstractProcessor {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, "
|
getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, "
|
||||||
+ "but that directory does not exist and could not be created due to: {}",
|
+ "but that directory does not exist and could not be created due to: {}",
|
||||||
new Object[] {file, flowFile, targetDir, e.getMessage()}, e);
|
file, flowFile, targetDir, e.getMessage(), e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -267,7 +267,7 @@ public class FetchFile extends AbstractProcessor {
|
||||||
try (final FileInputStream fis = new FileInputStream(file)) {
|
try (final FileInputStream fis = new FileInputStream(file)) {
|
||||||
flowFile = session.importFrom(fis, flowFile);
|
flowFile = session.importFrom(fis, flowFile);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Could not fetch file {} from file system for {} due to {}; routing to failure", new Object[] {file, flowFile, ioe.toString()}, ioe);
|
getLogger().error("Could not fetch file {} from file system for {} due to {}; routing to failure", file, flowFile, ioe.toString(), ioe);
|
||||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -328,7 +328,7 @@ public class FetchFile extends AbstractProcessor {
|
||||||
// Handle completion failures
|
// Handle completion failures
|
||||||
if (completionFailureException != null) {
|
if (completionFailureException != null) {
|
||||||
getLogger().warn("Successfully fetched the content from {} for {} but failed to perform Completion Action due to {}; routing to success",
|
getLogger().warn("Successfully fetched the content from {} for {} but failed to perform Completion Action due to {}; routing to success",
|
||||||
new Object[] {file, flowFile, completionFailureException}, completionFailureException);
|
file, flowFile, completionFailureException, completionFailureException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -186,7 +186,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
wrapper.getFileTransfer().close();
|
wrapper.getFileTransfer().close();
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Failed to close Idle Connection due to {}", new Object[] {ioe}, ioe);
|
getLogger().warn("Failed to close Idle Connection due to {}", ioe, ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,7 +275,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
||||||
return;
|
return;
|
||||||
} catch (final ProcessException | IOException e) {
|
} catch (final ProcessException | IOException e) {
|
||||||
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
|
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
|
||||||
new Object[]{flowFile, filename, host, port, e.toString()}, e);
|
flowFile, filename, host, port, e.toString(), e);
|
||||||
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
|
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
|
||||||
cleanupTransfer(transfer, true, transferQueue, host, port);
|
cleanupTransfer(transfer, true, transferQueue, host, port);
|
||||||
return;
|
return;
|
||||||
|
@ -325,7 +325,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
transfer.close();
|
transfer.close();
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[]{host, port, e.getMessage()}, e);
|
getLogger().warn("Failed to close connection to {}:{} due to {}", host, port, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
getLogger().debug("Returning FileTransfer to pool...");
|
getLogger().debug("Returning FileTransfer to pool...");
|
||||||
|
@ -342,7 +342,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
||||||
// file doesn't exist -- effectively the same as removing it. Move on.
|
// file doesn't exist -- effectively the same as removing it. Move on.
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
|
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
|
||||||
new Object[]{flowFile, host, port, filename, ioe}, ioe);
|
flowFile, host, port, filename, ioe, ioe);
|
||||||
}
|
}
|
||||||
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
|
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
|
||||||
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
|
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
@ -360,7 +360,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
||||||
|
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
|
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
|
||||||
new Object[]{flowFile, host, port, filename, ioe}, ioe);
|
flowFile, host, port, filename, ioe, ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -559,7 +559,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
|
||||||
} catch (final ProcessException pe) {
|
} catch (final ProcessException pe) {
|
||||||
// Log the cause of the ProcessException if it is available
|
// Log the cause of the ProcessException if it is available
|
||||||
Throwable t = (pe.getCause() == null ? pe : pe.getCause());
|
Throwable t = (pe.getCause() == null ? pe : pe.getCause());
|
||||||
logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
|
logger.error("Error during processing: {}", t.getMessage(), t);
|
||||||
session.rollback();
|
session.rollback();
|
||||||
context.yield();
|
context.yield();
|
||||||
}
|
}
|
||||||
|
|
|
@ -223,7 +223,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
|
||||||
return;
|
return;
|
||||||
} catch (final FlowFileAccessException e) {
|
} catch (final FlowFileAccessException e) {
|
||||||
context.yield();
|
context.yield();
|
||||||
logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e.getCause()}, e);
|
logger.error("Unable to retrieve file {} due to {}", file.getFullPathFileName(), e.getCause(), e);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
transfer.close();
|
transfer.close();
|
||||||
|
@ -261,7 +261,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
fileTransfer.deleteFile(receivedFlowFile, null, remoteFilename);
|
fileTransfer.deleteFile(receivedFlowFile, null, remoteFilename);
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
getLogger().error("Failed to remove remote file {} due to {}. This file may be duplicated in a subsequent run", new Object[] {remoteFilename, e}, e);
|
getLogger().error("Failed to remove remote file {} due to {}. This file may be duplicated in a subsequent run", remoteFilename, e, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -631,7 +631,7 @@ public class HandleHttpRequest extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
part.delete();
|
part.delete();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Couldn't delete underlying storage for {}", new Object[]{part}, e);
|
getLogger().error("Couldn't delete underlying storage for {}", part, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -468,7 +468,7 @@ public class ListDatabaseTables extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
recordWriter.close();
|
recordWriter.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e);
|
logger.error("Failed to write listing as Records due to {}", e, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
|
|
|
@ -617,7 +617,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
||||||
getLogger().debug("The following file is not readable: {}", new Object[]{path.toString()});
|
getLogger().debug("The following file is not readable: {}", new Object[]{path.toString()});
|
||||||
return FileVisitResult.SKIP_SUBTREE;
|
return FileVisitResult.SKIP_SUBTREE;
|
||||||
} else {
|
} else {
|
||||||
getLogger().error("Error during visiting file {}: {}", new Object[]{path.toString(), e.getMessage()}, e);
|
getLogger().error("Error during visiting file {}: {}", path.toString(), e.getMessage(), e);
|
||||||
return FileVisitResult.TERMINATE;
|
return FileVisitResult.TERMINATE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -625,7 +625,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|
||||||
@Override
|
@Override
|
||||||
public FileVisitResult postVisitDirectory(final Path dir, final IOException e) {
|
public FileVisitResult postVisitDirectory(final Path dir, final IOException e) {
|
||||||
if (e != null) {
|
if (e != null) {
|
||||||
getLogger().error("Error during visiting directory {}: {}", new Object[]{dir.toString(), e.getMessage()}, e);
|
getLogger().error("Error during visiting directory {}: {}", dir.toString(), e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return FileVisitResult.CONTINUE;
|
return FileVisitResult.CONTINUE;
|
||||||
|
|
|
@ -937,7 +937,7 @@ public class MergeContent extends BinFiles {
|
||||||
out.closeEntry();
|
out.closeEntry();
|
||||||
unmerged.remove(flowFile);
|
unmerged.remove(flowFile);
|
||||||
} catch (ZipException e) {
|
} catch (ZipException e) {
|
||||||
getLogger().error("Encountered exception merging {}", new Object[] {flowFile}, e);
|
getLogger().error("Encountered exception merging {}", flowFile, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,7 +230,7 @@ public class Notify extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
delta = Integer.parseInt(deltaStr);
|
delta = Integer.parseInt(deltaStr);
|
||||||
} catch (final NumberFormatException e) {
|
} catch (final NumberFormatException e) {
|
||||||
logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e);
|
logger.error("Failed to calculate delta for FlowFile {} due to {}", flowFile, e, e);
|
||||||
session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
|
session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,27 +161,27 @@ public class PutRecord extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (RetryableIOException rioe) {
|
} catch (RetryableIOException rioe) {
|
||||||
getLogger().warn("Error during transmission of records due to {}, routing to retry", new Object[]{rioe.getMessage()}, rioe);
|
getLogger().warn("Error during transmission of records due to {}, routing to retry", rioe.getMessage(), rioe);
|
||||||
session.transfer(flowFile, REL_RETRY);
|
session.transfer(flowFile, REL_RETRY);
|
||||||
return;
|
return;
|
||||||
} catch (SchemaNotFoundException snfe) {
|
} catch (SchemaNotFoundException snfe) {
|
||||||
throw new ProcessException("Error determining schema of flowfile records: " + snfe.getMessage(), snfe);
|
throw new ProcessException("Error determining schema of flowfile records: " + snfe.getMessage(), snfe);
|
||||||
} catch (MalformedRecordException e) {
|
} catch (MalformedRecordException e) {
|
||||||
getLogger().error("Error reading records from {} due to {}, routing to failure", new Object[]{flowFile, e.getMessage()}, e);
|
getLogger().error("Error reading records from {} due to {}, routing to failure", flowFile, e.getMessage(), e);
|
||||||
session.penalize(flowFile);
|
session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
// The cause might be a MalformedRecordException (RecordReader wraps it in an IOException), send to failure in that case
|
// The cause might be a MalformedRecordException (RecordReader wraps it in an IOException), send to failure in that case
|
||||||
if (ioe.getCause() instanceof MalformedRecordException) {
|
if (ioe.getCause() instanceof MalformedRecordException) {
|
||||||
getLogger().error("Error reading records from {} due to {}, routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
|
getLogger().error("Error reading records from {} due to {}, routing to failure", flowFile, ioe.getMessage(), ioe);
|
||||||
session.penalize(flowFile);
|
session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw new ProcessException("Error reading from flowfile input stream: " + ioe.getMessage(), ioe);
|
throw new ProcessException("Error reading from flowfile input stream: " + ioe.getMessage(), ioe);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Error during transmission of records due to {}, routing to failure", new Object[]{e.getMessage()}, e);
|
getLogger().error("Error during transmission of records due to {}, routing to failure", e.getMessage(), e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -368,7 +368,7 @@ public class ReplaceText extends AbstractProcessor {
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
} catch (IllegalAttributeException | AttributeExpressionLanguageException e) {
|
} catch (IllegalAttributeException | AttributeExpressionLanguageException e) {
|
||||||
logger.warn("Transferred {} to 'failure' due to {}", new Object[] { flowFile, e.toString() }, e);
|
logger.warn("Transferred {} to 'failure' due to {}", flowFile, e.toString(), e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,7 +285,7 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
|
attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
|
||||||
attributes.putAll(writeResult.getAttributes());
|
attributes.putAll(writeResult.getAttributes());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Error during transmission of records due to {}, routing to failure", new Object[]{e.getMessage()}, e);
|
getLogger().error("Error during transmission of records due to {}, routing to failure", e.getMessage(), e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
session.remove(sampledFlowFile);
|
session.remove(sampledFlowFile);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -799,7 +799,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
cleanup(context);
|
cleanup(context);
|
||||||
tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
|
tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[]{file, ioe.toString()}, ioe);
|
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", file, ioe.toString(), ioe);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -983,11 +983,11 @@ public class TailFile extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
reader.position(newPosition);
|
reader.position(newPosition);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
getLogger().warn("Couldn't reposition the reader for {} due to {}", new Object[]{ file, ex }, ex);
|
getLogger().warn("Couldn't reposition the reader for {} due to {}", file, ex, ex);
|
||||||
try {
|
try {
|
||||||
reader.close();
|
reader.close();
|
||||||
} catch (IOException ex2) {
|
} catch (IOException ex2) {
|
||||||
getLogger().warn("Failed to close reader for {} due to {}", new Object[]{ file, ex2 }, ex2);
|
getLogger().warn("Failed to close reader for {} due to {}", file, ex2, ex2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -353,7 +353,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
|
||||||
} catch (ProcessException rrfe) {
|
} catch (ProcessException rrfe) {
|
||||||
log.error(
|
log.error(
|
||||||
"Failed to create {} for {} - routing to failure",
|
"Failed to create {} for {} - routing to failure",
|
||||||
new Object[]{RecordReader.class.getSimpleName(), flowFile},
|
RecordReader.class.getSimpleName(), flowFile,
|
||||||
rrfe
|
rrfe
|
||||||
);
|
);
|
||||||
// Since we are wrapping the exceptions above there should always be a cause
|
// Since we are wrapping the exceptions above there should always be a cause
|
||||||
|
@ -447,11 +447,11 @@ public class UpdateDatabaseTable extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
} catch (IOException | SQLException e) {
|
} catch (IOException | SQLException e) {
|
||||||
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
|
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
|
||||||
log.error("Exception while processing {} - routing to failure", new Object[]{flowFile}, e);
|
log.error("Exception while processing {} - routing to failure", flowFile, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
} catch (DiscontinuedException e) {
|
} catch (DiscontinuedException e) {
|
||||||
// The input FlowFile processing is discontinued. Keep it in the input queue.
|
// The input FlowFile processing is discontinued. Keep it in the input queue.
|
||||||
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
|
getLogger().warn("Discontinued processing for {} due to {}", flowFile, e, e);
|
||||||
session.transfer(flowFile, Relationship.SELF);
|
session.transfer(flowFile, Relationship.SELF);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
|
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
|
||||||
|
|
|
@ -516,7 +516,7 @@ public class ValidateCsv extends AbstractProcessor {
|
||||||
valid.set(false);
|
valid.set(false);
|
||||||
if(isWholeFFValidation) {
|
if(isWholeFFValidation) {
|
||||||
validationError.set(e.getLocalizedMessage());
|
validationError.set(e.getLocalizedMessage());
|
||||||
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
|
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", flowFile, e);
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
// we append the invalid line to the flow file that will be routed to invalid relationship
|
// we append the invalid line to the flow file that will be routed to invalid relationship
|
||||||
|
@ -544,7 +544,7 @@ public class ValidateCsv extends AbstractProcessor {
|
||||||
|
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
valid.set(false);
|
valid.set(false);
|
||||||
logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
|
logger.error("Failed to validate {} against schema due to {}", flowFile, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -473,14 +473,14 @@ public class Wait extends AbstractProcessor {
|
||||||
targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
|
targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
} catch (final NumberFormatException e) {
|
} catch (final NumberFormatException e) {
|
||||||
transferToFailure.accept(flowFile);
|
transferToFailure.accept(flowFile);
|
||||||
logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}, e);
|
logger.error("Failed to parse targetCount when processing {} due to {}", flowFile, e, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
|
releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
} catch (final NumberFormatException e) {
|
} catch (final NumberFormatException e) {
|
||||||
transferToFailure.accept(flowFile);
|
transferToFailure.accept(flowFile);
|
||||||
logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[] {flowFile, e}, e);
|
logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", flowFile, e, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -183,7 +183,7 @@ public class FTPTransfer implements FileTransfer {
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
}
|
}
|
||||||
} catch (final Exception ex) {
|
} catch (final Exception ex) {
|
||||||
logger.warn("Failed to close FTPClient due to {}", new Object[] {ex.toString()}, ex);
|
logger.warn("Failed to close FTPClient due to {}", ex.toString(), ex);
|
||||||
}
|
}
|
||||||
client = null;
|
client = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -658,7 +658,7 @@ public class SFTPTransfer implements FileTransfer {
|
||||||
sftpClient.close();
|
sftpClient.close();
|
||||||
}
|
}
|
||||||
} catch (final Exception ex) {
|
} catch (final Exception ex) {
|
||||||
logger.warn("Failed to close SFTPClient due to {}", new Object[] {ex.toString()}, ex);
|
logger.warn("Failed to close SFTPClient due to {}", ex.toString(), ex);
|
||||||
}
|
}
|
||||||
sftpClient = null;
|
sftpClient = null;
|
||||||
|
|
||||||
|
@ -667,7 +667,7 @@ public class SFTPTransfer implements FileTransfer {
|
||||||
sshClient.disconnect();
|
sshClient.disconnect();
|
||||||
}
|
}
|
||||||
} catch (final Exception ex) {
|
} catch (final Exception ex) {
|
||||||
logger.warn("Failed to close SSHClient due to {}", new Object[] {ex.toString()}, ex);
|
logger.warn("Failed to close SSHClient due to {}", ex.toString(), ex);
|
||||||
}
|
}
|
||||||
sshClient = null;
|
sshClient = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class HBase_2_ListLookupService extends AbstractHBaseLookupService implem
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
|
getLogger().error("Error occurred loading {}", coordinates.get("rowKey"), e);
|
||||||
throw new LookupFailureException(e);
|
throw new LookupFailureException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class HBase_2_RecordLookupService extends AbstractHBaseLookupService impl
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
|
getLogger().error("Error occurred loading {}", coordinates.get("rowKey"), e);
|
||||||
throw new LookupFailureException(e);
|
throw new LookupFailureException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -251,7 +251,7 @@ public class IPLookupService extends AbstractControllerService implements Record
|
||||||
inetAddress = InetAddress.getByName(ipAddress);
|
inetAddress = InetAddress.getByName(ipAddress);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Could not resolve the IP for value '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
|
getLogger().warn("Could not resolve the IP for value '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
|
||||||
"providing the service with an invalid IP address", new Object[] {coordinates}, ioe);
|
"providing the service with an invalid IP address", coordinates, ioe);
|
||||||
|
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -179,7 +179,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().error("Ran into an error while processing {}.", new Object[] { flowFile}, e);
|
getLogger().error("Ran into an error while processing {}.", flowFile, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -189,7 +189,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
state = new HashMap<>(session.getState(SCOPE).toMap());
|
state = new HashMap<>(session.getState(SCOPE).toMap());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{flowFile}, e);
|
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", flowFile, e);
|
||||||
session.transfer(flowFile);
|
session.transfer(flowFile);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
|
@ -237,7 +237,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
|
||||||
session.setState(state, SCOPE);
|
session.setState(state, SCOPE);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Failed to set the state after successfully processing {} due a failure when setting the state. Transferring to '{}'",
|
getLogger().error("Failed to set the state after successfully processing {} due a failure when setting the state. Transferring to '{}'",
|
||||||
new Object[]{flowFile, REL_FAILED_SET_STATE.getName()}, e);
|
flowFile, REL_FAILED_SET_STATE.getName(), e);
|
||||||
|
|
||||||
session.transfer(flowFile, REL_FAILED_SET_STATE);
|
session.transfer(flowFile, REL_FAILED_SET_STATE);
|
||||||
context.yield();
|
context.yield();
|
||||||
|
@ -261,7 +261,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
state = new HashMap<>(session.getState(SCOPE).toMap());
|
state = new HashMap<>(session.getState(SCOPE).toMap());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{flowFile}, e);
|
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", flowFile, e);
|
||||||
session.transfer(flowFile);
|
session.transfer(flowFile);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
|
@ -353,7 +353,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
session.setState(state, SCOPE);
|
session.setState(state, SCOPE);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{flowFile}, e);
|
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", flowFile, e);
|
||||||
session.transfer(flowFile);
|
session.transfer(flowFile);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -461,7 +461,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
||||||
stateWorkingAttributes = null;
|
stateWorkingAttributes = null;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{incomingFlowFile}, e);
|
logger.error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", incomingFlowFile, e);
|
||||||
session.transfer(incomingFlowFile);
|
session.transfer(incomingFlowFile);
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
|
@ -525,7 +525,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Failed to set the state after successfully processing {} due a failure when setting the state. This is normally due to multiple threads running at " +
|
logger.error("Failed to set the state after successfully processing {} due a failure when setting the state. This is normally due to multiple threads running at " +
|
||||||
"once; transferring to '{}'", new Object[]{incomingFlowFile, REL_FAILED_SET_STATE.getName()}, e);
|
"once; transferring to '{}'", incomingFlowFile, REL_FAILED_SET_STATE.getName(), e);
|
||||||
|
|
||||||
flowFilesToTransfer.remove(incomingFlowFile);
|
flowFilesToTransfer.remove(incomingFlowFile);
|
||||||
if (flowFilesToTransfer.size() > 0){
|
if (flowFilesToTransfer.size() > 0){
|
||||||
|
|
|
@ -189,7 +189,7 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
|
||||||
webSocketService.deregisterProcessor(endpointId, this);
|
webSocketService.deregisterProcessor(endpointId, this);
|
||||||
webSocketService = null;
|
webSocketService = null;
|
||||||
} catch (WebSocketConfigurationException e) {
|
} catch (WebSocketConfigurationException e) {
|
||||||
logger.warn("Failed to deregister processor {} due to: {}", new Object[]{this, e}, e);
|
logger.warn("Failed to deregister processor {} due to: {}", this, e, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -283,7 +283,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
|
||||||
try {
|
try {
|
||||||
maintainSessions();
|
maintainSessions();
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().warn("Failed to maintain sessions due to {}", new Object[]{e}, e);
|
getLogger().warn("Failed to maintain sessions due to {}", e, e);
|
||||||
}
|
}
|
||||||
}, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
|
}, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
@ -319,7 +319,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
|
||||||
try {
|
try {
|
||||||
sessionMaintenanceScheduler.shutdown();
|
sessionMaintenanceScheduler.shutdown();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
getLogger().warn("Failed to shutdown session maintainer due to {}", new Object[]{e}, e);
|
getLogger().warn("Failed to shutdown session maintainer due to {}", e, e);
|
||||||
}
|
}
|
||||||
sessionMaintenanceScheduler = null;
|
sessionMaintenanceScheduler = null;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue