mirror of https://github.com/apache/nifi.git
NIFI-13266 Removed String concatenation in logging messages (#8940)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
aac90fb375
commit
7deac6afac
|
@ -74,7 +74,7 @@ public class ProxyAwareC2UrlProvider implements C2UrlProvider {
|
|||
try {
|
||||
return Optional.of(c2RestPathBase.resolve(stripStart(path, SLASH)).toString()); // leading slash needs to be removed for proper URL creation
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unable to convert restBase=" + c2RestPathBase + " and restPath=" + path + " to absolute url", e);
|
||||
LOG.error("Unable to convert restBase={} and restPath={} to absolute url", c2RestPathBase, path, e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,7 +182,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
|
|||
gzipInputStream.transferTo(fileOutputStream);
|
||||
return targetFile;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error during filtering gzip file content: " + sourceFile.toAbsolutePath(), e);
|
||||
LOG.error("Error during filtering gzip file content: {}", sourceFile.toAbsolutePath(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -192,7 +192,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
|
|||
Files.write(targetFile, (Iterable<String>) fileStream.filter(contentFilter)::iterator);
|
||||
return targetFile;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error during filtering uncompressed file content: " + sourceFile.toAbsolutePath(), e);
|
||||
LOG.error("Error during filtering uncompressed file content: {}", sourceFile.toAbsolutePath(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ public class StopRunner implements CommandRunner {
|
|||
status = ERROR.getStatusCode();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CMD_LOGGER.warn("An error has occurred while stopping MiNiFi. Force killing process with pid=" + minifiPid, e);
|
||||
CMD_LOGGER.warn("An error has occurred while stopping MiNiFi. Force killing process with pid={}", minifiPid, e);
|
||||
killProcessTree(minifiPid);
|
||||
} finally {
|
||||
if (lockFile.exists() && !lockFile.delete()) {
|
||||
|
|
|
@ -302,10 +302,10 @@ public class RestChangeIngestor implements ChangeIngestor {
|
|||
|
||||
private void logRequest(Request request) {
|
||||
logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
|
||||
logger.info("request method = " + request.getMethod());
|
||||
logger.info("request url = " + request.getHttpURI());
|
||||
logger.info("context path = " + request.getContext().getContextPath());
|
||||
logger.info("request content type = " + request.getHeaders().get(HttpHeader.CONTENT_TYPE));
|
||||
logger.info("request method = {}", request.getMethod());
|
||||
logger.info("request url = {}", request.getHttpURI());
|
||||
logger.info("context path = {}", request.getContext().getContextPath());
|
||||
logger.info("request content type = {}", request.getHeaders().get(HttpHeader.CONTENT_TYPE));
|
||||
logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class PeriodicStatusReporterManager implements QueryableStatusAggregator
|
|||
try {
|
||||
periodicStatusReporter.stop();
|
||||
} catch (Exception exception) {
|
||||
LOGGER.error("Could not successfully stop periodic status reporter " + periodicStatusReporter.getClass() + " due to ", exception);
|
||||
LOGGER.error("Could not successfully stop periodic status reporter {}", periodicStatusReporter.getClass(), exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,12 +126,12 @@ public class UnixProcessUtils implements ProcessUtils {
|
|||
if (retries == 0) {
|
||||
throw new IOException("Failed to stop process. Process is still running after killing attempt with pid=" + pid);
|
||||
}
|
||||
LOGGER.warn("Process is still running after killing attempt with pid=" + pid);
|
||||
LOGGER.warn("Process is still running after killing attempt with pid={}", pid);
|
||||
retries--;
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
DEFAULT_LOGGER.warn("Thread interrupted while waiting for killing process with pid=" + pid);
|
||||
DEFAULT_LOGGER.warn("Thread interrupted while waiting for killing process with pid={}", pid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,9 +61,9 @@ public class LogUtil {
|
|||
int occurrences = 0;
|
||||
while ((line = bufferedReader.readLine()) != null) {
|
||||
if (expectedLogEntry.pattern.matcher(line).find()) {
|
||||
logger.info("Found expected: " + line);
|
||||
logger.info("Found expected: {}", line);
|
||||
if (++occurrences >= expectedLogEntry.numOccurrences) {
|
||||
logger.info("Found target " + occurrences + " times");
|
||||
logger.info("Found target {} times", occurrences);
|
||||
satisfied = true;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -65,9 +65,9 @@ public class UpdatePropertiesPropertyProvider implements OperandPropertiesProvid
|
|||
try (FileInputStream fis = new FileInputStream(bootstrapFile)) {
|
||||
props.load(fis);
|
||||
} catch (FileNotFoundException e) {
|
||||
LOGGER.error("The bootstrap configuration file " + bootstrapConfigFileLocation + " doesn't exists", e);
|
||||
LOGGER.error("The bootstrap configuration file {} doesn't exist", bootstrapConfigFileLocation, e);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Failed to load properties from " + bootstrapConfigFileLocation, e);
|
||||
LOGGER.error("Failed to load properties from {}", bootstrapConfigFileLocation, e);
|
||||
}
|
||||
return props.entrySet().stream()
|
||||
.collect(Collectors.toMap(entry -> (String) entry.getKey(), entry -> (String) entry.getValue()));
|
||||
|
|
|
@ -128,7 +128,7 @@ public final class StatusConfigReporter {
|
|||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Hit exception while requesting status for item '" + item + "'", e);
|
||||
logger.error("Hit exception while requesting status for item '{}'", item, e);
|
||||
errorsGeneratingReport.add("Unable to get status for request '" + item + "' due to:" + e);
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ public final class StatusConfigReporter {
|
|||
Collection<ValidationResult> validationResults = flowController.getFlowManager().getGroup(rootGroupId).getProcessor(processorStatus.getId()).getValidationErrors();
|
||||
processorStatusBeanList.add(parseProcessorStatusRequest(processorStatus, requestItem.options, flowController, validationResults));
|
||||
} else {
|
||||
logger.warn("Status for processor with key " + requestItem.identifier + " was requested but one does not exist");
|
||||
logger.warn("Status for processor with key {} was requested but one does not exist", requestItem.identifier);
|
||||
throw new StatusRequestException("No processor with key " + requestItem.identifier + " to report status on");
|
||||
}
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ public final class StatusConfigReporter {
|
|||
if (connectionStatusMap.containsKey(requestItem.identifier)) {
|
||||
connectionStatusList.add(parseConnectionStatusRequest(connectionStatusMap.get(requestItem.identifier), requestItem.options, logger));
|
||||
} else {
|
||||
logger.warn("Status for connection with key " + requestItem.identifier + " was requested but one does not exist");
|
||||
logger.warn("Status for connection with key {} was requested but one does not exist", requestItem.identifier);
|
||||
throw new StatusRequestException("No connection with key " + requestItem.identifier + " to report status on");
|
||||
}
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ public final class StatusConfigReporter {
|
|||
RemoteProcessGroupStatus remoteProcessGroupStatus = remoteProcessGroupStatusMap.get(requestItem.identifier);
|
||||
remoteProcessGroupStatusList.add(parseRemoteProcessGroupStatusRequest(remoteProcessGroupStatus, requestItem.options, flowController));
|
||||
} else {
|
||||
logger.warn("Status for Remote Process Group with key " + requestItem.identifier + " was requested but one does not exist");
|
||||
logger.warn("Status for Remote Process Group with key {} was requested but one does not exist", requestItem.identifier);
|
||||
throw new StatusRequestException("No Remote Process Group with key " + requestItem.identifier + " to report status on");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -227,7 +227,7 @@ public class MiNiFi {
|
|||
NiFiProperties properties = getValidatedMiNifiProperties();
|
||||
new MiNiFi(properties);
|
||||
} catch (final Throwable t) {
|
||||
logger.error("Failure to launch MiNiFi due to " + t, t);
|
||||
logger.error("Failure to launch MiNiFi", t);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -234,7 +234,7 @@ public class BootstrapListener implements BootstrapCommunicator {
|
|||
|
||||
executor.submit(() -> handleBootstrapRequest(socket));
|
||||
} catch (Throwable t) {
|
||||
logger.error("Failed to process request from Bootstrap due to " + t, t);
|
||||
logger.error("Failed to process request from Bootstrap", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -252,7 +252,7 @@ public class BootstrapListener implements BootstrapCommunicator {
|
|||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
logger.error("Failed to process request from Bootstrap due to " + t, t);
|
||||
logger.error("Failed to process request from Bootstrap", t);
|
||||
} finally {
|
||||
try {
|
||||
socket.close();
|
||||
|
|
|
@ -652,7 +652,7 @@ public class RunNiFi {
|
|||
|
||||
final Properties sysProps = (Properties) getSystemPropertiesMethod.invoke(virtualMachine);
|
||||
for (Entry<Object, Object> syspropEntry : sysProps.entrySet()) {
|
||||
logger.info(syspropEntry.getKey().toString() + " = " + syspropEntry.getValue().toString());
|
||||
logger.info("{} = {}", syspropEntry.getKey(), syspropEntry.getValue());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new RuntimeException(t);
|
||||
|
@ -1020,7 +1020,7 @@ public class RunNiFi {
|
|||
public void start(final boolean monitor) throws IOException {
|
||||
final Integer port = getCurrentPort(cmdLogger);
|
||||
if (port != null) {
|
||||
cmdLogger.info("Apache NiFi is already running, listening to Bootstrap on port " + port);
|
||||
cmdLogger.info("Apache NiFi is already running, listening to Bootstrap on port {}", port);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,13 +51,13 @@ public class JsonPathEvaluator extends JsonPathBaseEvaluator {
|
|||
// it is valid for a path not to be found, keys may not be there
|
||||
// do not spam the error log for this, instead we can log debug if enabled
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("PathNotFoundException for JsonPath " + compiledJsonPath.getPath(), pnf);
|
||||
LOGGER.debug("PathNotFoundException for JsonPath {}", compiledJsonPath.getPath(), pnf);
|
||||
}
|
||||
return EMPTY_RESULT;
|
||||
} catch (Exception e) {
|
||||
// a failure for something *other* than path not found however, should at least be
|
||||
// logged.
|
||||
LOGGER.error("Exception while reading JsonPath " + compiledJsonPath.getPath(), e);
|
||||
LOGGER.error("Exception while reading JsonPath {}", compiledJsonPath.getPath(), e);
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ public class JsonPathPutEvaluator extends JsonPathUpdateEvaluator {
|
|||
try {
|
||||
result = documentContext.put(compiledJsonPath, key, value).jsonString();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to put value " + value + " at key " + key + " at path " + compiledJsonPath + " with error " + e.getLocalizedMessage(), e);
|
||||
LOGGER.error("Failed to put value {} at key {} at path {}", value, key, compiledJsonPath, e);
|
||||
// assume the path did not match anything in the document
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public abstract class JsonPathUpdateEvaluator extends JsonPathBaseEvaluator {
|
|||
LOGGER.debug("JSON Path not found: {}", compiledJsonPath.getPath(), pnf);
|
||||
result = documentContext.jsonString();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to update attribute " + e.getLocalizedMessage(), e);
|
||||
LOGGER.error("Failed to update attribute", e);
|
||||
// assume the path did not match anything in the document
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
|
|
@ -411,7 +411,7 @@ public class DataTypeUtils {
|
|||
return Optional.of(type);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception thrown while checking if '" + valueAsString + "' is compatible with '" + type + "'", e);
|
||||
logger.error("Exception thrown while checking if '{}' is compatible with '{}'", valueAsString, type, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -256,7 +256,7 @@ public abstract class AbstractKerberosUser implements KerberosUser {
|
|||
|
||||
if (principal.getName().equals("krbtgt/" + principal.getRealm() + "@" + principal.getRealm())) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("Found TGS principal: " + principal.getName());
|
||||
LOGGER.trace("Found TGS principal: {}", principal.getName());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -341,7 +341,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
protocol.shutdown(peer);
|
||||
} catch (final TransmissionDisabledException e) {
|
||||
// User disabled transmission.... do nothing.
|
||||
logger.debug(this + " Transmission Disabled by User");
|
||||
logger.debug("{} Transmission Disabled by User", this);
|
||||
} catch (IOException e1) {
|
||||
}
|
||||
}
|
||||
|
@ -351,7 +351,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
peer.close();
|
||||
} catch (final TransmissionDisabledException e) {
|
||||
// User disabled transmission.... do nothing.
|
||||
logger.debug(this + " Transmission Disabled by User");
|
||||
logger.debug("{} Transmission Disabled by User", this);
|
||||
} catch (IOException e1) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -357,7 +357,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
return getController();
|
||||
} catch (IOException e) {
|
||||
lastException = e;
|
||||
logger.warn("Failed to get controller from " + clusterUrl + " due to " + e);
|
||||
logger.warn("Failed to get controller from {}", clusterUrl, e);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("", e);
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public abstract class AbstractChannelReader implements Runnable {
|
|||
key.cancel();
|
||||
key.channel().close();
|
||||
} catch (final IOException ioe) {
|
||||
LOGGER.warn("Unable to cleanly close stream due to " + ioe);
|
||||
LOGGER.warn("Unable to cleanly close stream", ioe);
|
||||
} finally {
|
||||
consumer.signalEndOfStream();
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ public abstract class AbstractChannelReader implements Runnable {
|
|||
}
|
||||
} catch (final Exception ioe) {
|
||||
closeStream();
|
||||
LOGGER.error("Closed channel reader " + this + " due to " + ioe);
|
||||
LOGGER.error("Closed channel reader {}", this, ioe);
|
||||
} finally {
|
||||
if (buffer != null) {
|
||||
buffer.clear();
|
||||
|
|
|
@ -110,7 +110,7 @@ public final class ChannelDispatcher implements Runnable {
|
|||
}
|
||||
itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0.
|
||||
if (reader != null && LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader);
|
||||
LOGGER.debug("{} New Connection established. Server channel: {} Reader: {}", this, channel, reader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ public final class ChannelDispatcher implements Runnable {
|
|||
reader.setScheduledFuture(readerFuture);
|
||||
}
|
||||
if (reader != null && LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader);
|
||||
LOGGER.debug("{} New Connection established. Server channel: {} Reader: {}", this, channel, reader);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -108,10 +108,8 @@ public final class ChannelListener {
|
|||
ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
|
||||
final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
if (actualReceiveBufSize < receiveBufferSize) {
|
||||
LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to "
|
||||
+ receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
|
||||
+ "bytes. You may want to consider changing the Operating System's "
|
||||
+ "maximum receive buffer");
|
||||
LOGGER.warn("{} attempted to set TCP Receive Buffer Size to {} bytes but could only set to {} bytes. You may want to consider changing the Operating System's maximum receive buffer",
|
||||
this, receiveBufferSize, actualReceiveBufSize);
|
||||
}
|
||||
}
|
||||
ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
|
@ -177,10 +175,8 @@ public final class ChannelListener {
|
|||
dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
|
||||
final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
if (actualReceiveBufSize < receiveBufferSize) {
|
||||
LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to "
|
||||
+ receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
|
||||
+ "bytes. You may want to consider changing the Operating System's "
|
||||
+ "maximum receive buffer");
|
||||
LOGGER.warn("{} attempted to set UDP Receive Buffer Size to {} bytes but could only set to {} bytes. You may want to consider changing the Operating System's maximum receive buffer",
|
||||
this, receiveBufferSize, actualReceiveBufSize);
|
||||
}
|
||||
}
|
||||
dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
|
@ -219,9 +215,11 @@ public final class ChannelListener {
|
|||
LOGGER.warn("Interrupted while trying to shutdown executor");
|
||||
}
|
||||
final int currentBufferPoolSize = bufferPool.size();
|
||||
final String warning = (currentBufferPoolSize != initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize
|
||||
+ " Current buffer count=" + currentBufferPoolSize
|
||||
+ " Could indicate a buffer leak. Ensure all consumers are executed until they complete." : "";
|
||||
LOGGER.info("Channel listener shutdown. " + warning);
|
||||
if (currentBufferPoolSize != initialBufferPoolSize) {
|
||||
LOGGER.info("Channel listener shutdown. Initial buffer count={} Current buffer count={} Could indicate a buffer leak. Ensure all consumers are executed until they complete.",
|
||||
initialBufferPoolSize, currentBufferPoolSize);
|
||||
} else {
|
||||
LOGGER.info("Channel listener shutdown.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,11 +114,11 @@ public abstract class SocketListener {
|
|||
SocketUtils.closeQuietly(socket);
|
||||
continue;
|
||||
} catch (final SocketException se) {
|
||||
logger.warn("Failed to communicate with " + (socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " due to " + se, se);
|
||||
logger.warn("Failed to communicate with {}", (socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()), se);
|
||||
SocketUtils.closeQuietly(socket);
|
||||
continue;
|
||||
} catch (final Throwable t) {
|
||||
logger.warn("Socket Listener encountered exception: " + t, t);
|
||||
logger.warn("Socket Listener encountered exception", t);
|
||||
SocketUtils.closeQuietly(socket);
|
||||
continue;
|
||||
}
|
||||
|
@ -130,14 +130,14 @@ public abstract class SocketListener {
|
|||
try {
|
||||
dispatchRequest(finalSocket);
|
||||
} catch (final Throwable t) {
|
||||
logger.warn("Dispatching socket request encountered exception due to: " + t, t);
|
||||
logger.warn("Dispatching socket request encountered exception", t);
|
||||
} finally {
|
||||
SocketUtils.closeQuietly(finalSocket);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (final Throwable t) {
|
||||
logger.error("Socket Listener encountered exception: " + t, t);
|
||||
logger.error("Socket Listener encountered exception", t);
|
||||
SocketUtils.closeQuietly(socket);
|
||||
}
|
||||
}
|
||||
|
@ -146,7 +146,7 @@ public abstract class SocketListener {
|
|||
t.setName("Cluster Socket Listener");
|
||||
t.start();
|
||||
|
||||
logger.info("Now listening for connections from nodes on port " + port);
|
||||
logger.info("Now listening for connections from nodes on port {}", port);
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
|
|
|
@ -124,12 +124,12 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
if (!isGone && logger != null) {
|
||||
logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
|
||||
logger.warn("File appears to exist but unable to delete file: {}", file.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
} catch (final Throwable t) {
|
||||
if (logger != null) {
|
||||
logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t);
|
||||
logger.warn("Unable to delete file: '{}'", file.getAbsolutePath(), t);
|
||||
}
|
||||
}
|
||||
return isGone;
|
||||
|
@ -167,11 +167,11 @@ public class FileUtils {
|
|||
}
|
||||
}
|
||||
if (!isGone && logger != null) {
|
||||
logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
|
||||
logger.warn("File appears to exist but unable to delete file: {}", file.getAbsolutePath());
|
||||
}
|
||||
} catch (final Throwable t) {
|
||||
if (null != logger) {
|
||||
logger.warn("Unable to delete file given from path: '" + file.getPath() + "' due to " + t);
|
||||
logger.warn("Unable to delete file given from path: '{}'", file.getPath(), t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -392,7 +392,7 @@ public class FileUtils {
|
|||
FileUtils.deleteFile(destination, null, 5);
|
||||
throw new IOException("Could not remove file " + source.getAbsolutePath());
|
||||
} else {
|
||||
logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: " + source.getAbsolutePath());
|
||||
logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: {}", source.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -116,7 +116,7 @@ public class ClassLoaderUtils {
|
|||
if (files != null) {
|
||||
for (File classpathResource : files) {
|
||||
if (classpathResource.isDirectory()) {
|
||||
LOGGER.warn("Recursive directories are not supported, skipping " + classpathResource.getAbsolutePath());
|
||||
LOGGER.warn("Recursive directories are not supported, skipping {}", classpathResource.getAbsolutePath());
|
||||
} else {
|
||||
additionalClasspath.add(classpathResource.toURI().toURL());
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ public class ClassLoaderUtils {
|
|||
lastModified = file.lastModified();
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
LOGGER.error("Error getting last modified date for " + url);
|
||||
LOGGER.error("Error getting last modified date for {}", url);
|
||||
}
|
||||
return lastModified;
|
||||
}
|
||||
|
|
|
@ -175,7 +175,7 @@ public final class WebUtils {
|
|||
verifyContextPath(allowedContextPaths, contextPath);
|
||||
return contextPath;
|
||||
} catch (UriBuilderException e) {
|
||||
logger.error("Error determining context path on " + jspDisplayName + ": " + e.getMessage());
|
||||
logger.error("Error determining context path on {}", jspDisplayName, e);
|
||||
return EMPTY;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -168,6 +168,7 @@ public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T
|
|||
// Record Identifier. It keeps only the most up-to-date version of the Record. This allows
|
||||
// us to write the snapshot very quickly without having to re-process the journal files.
|
||||
// For each update, then, we will update the record in the map.
|
||||
final String logMessage = "Received Record (ID={}) with UpdateType of {} but no indicator of where the Record is to be {}; these records may be {} when the repository is restored!";
|
||||
for (final T record : records) {
|
||||
final Object recordId = serdeFactory.getRecordIdentifier(record);
|
||||
final UpdateType updateType = serdeFactory.getUpdateType(record);
|
||||
|
@ -179,9 +180,7 @@ public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T
|
|||
case SWAP_OUT:
|
||||
final String location = serdeFactory.getLocation(record);
|
||||
if (location == null) {
|
||||
logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_OUT but "
|
||||
+ "no indicator of where the Record is to be Swapped Out to; these records may be "
|
||||
+ "lost when the repository is restored!");
|
||||
logger.error(logMessage, recordId, UpdateType.SWAP_OUT, "Swapped Out to", "lost");
|
||||
} else {
|
||||
recordMap.remove(recordId);
|
||||
this.swapLocations.add(location);
|
||||
|
@ -190,9 +189,7 @@ public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T
|
|||
case SWAP_IN:
|
||||
final String swapLocation = serdeFactory.getLocation(record);
|
||||
if (swapLocation == null) {
|
||||
logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_IN but no "
|
||||
+ "indicator of where the Record is to be Swapped In from; these records may be duplicated "
|
||||
+ "when the repository is restored!");
|
||||
logger.error(logMessage, recordId, UpdateType.SWAP_IN, "Swapped In from", "duplicated");
|
||||
} else {
|
||||
swapLocations.remove(swapLocation);
|
||||
}
|
||||
|
@ -297,7 +294,7 @@ public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T
|
|||
// If the snapshot file exists, delete it
|
||||
if (snapshotFile.exists()) {
|
||||
if (!snapshotFile.delete()) {
|
||||
logger.warn("Unable to delete existing Snapshot file " + snapshotFile);
|
||||
logger.warn("Unable to delete existing Snapshot file {}", snapshotFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -98,25 +98,24 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
|
|||
public void dispose() {
|
||||
logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", journalFile.getName());
|
||||
if (!journalFile.delete() && journalFile.exists()) {
|
||||
logger.warn("Unable to delete expired journal file " + journalFile + "; this file should be deleted manually.");
|
||||
logger.warn("Unable to delete expired journal file {}; this file should be deleted manually.", journalFile);
|
||||
}
|
||||
|
||||
if (overflowDirectory.exists()) {
|
||||
final File[] overflowFiles = overflowDirectory.listFiles();
|
||||
if (overflowFiles == null) {
|
||||
logger.warn("Unable to obtain listing of files that exist in 'overflow directory' " + overflowDirectory
|
||||
+ " - this directory and any files within it can now be safely removed manually");
|
||||
logger.warn("Unable to obtain listing of files that exist in 'overflow directory' {} - this directory and any files within it can now be safely removed manually", overflowDirectory);
|
||||
return;
|
||||
}
|
||||
|
||||
for (final File overflowFile : overflowFiles) {
|
||||
if (!overflowFile.delete() && overflowFile.exists()) {
|
||||
logger.warn("After expiring journal file " + journalFile + ", unable to remove 'overflow file' " + overflowFile + " - this file should be removed manually");
|
||||
logger.warn("After expiring journal file {}, unable to remove 'overflow file' {} - this file should be removed manually", journalFile, overflowFile);
|
||||
}
|
||||
}
|
||||
|
||||
if (!overflowDirectory.delete()) {
|
||||
logger.warn("After expiring journal file " + journalFile + ", unable to remove 'overflow directory' " + overflowDirectory + " - this file should be removed manually");
|
||||
logger.warn("After expiring journal file {}, unable to remove 'overflow directory' {} - this file should be removed manually", journalFile, overflowDirectory);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -334,7 +333,7 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
|
|||
|
||||
if (overflowFile != null) {
|
||||
if (!overflowFile.delete() && overflowFile.exists()) {
|
||||
logger.warn("Failed to cleanup temporary overflow file " + overflowFile + " - this file should be cleaned up manually.");
|
||||
logger.warn("Failed to cleanup temporary overflow file {} - this file should be cleaned up manually.", overflowFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ final class AMQPConsumer extends AMQPWorker {
|
|||
this.autoAcknowledge = autoAcknowledge;
|
||||
this.responseQueue = new LinkedBlockingQueue<>(10);
|
||||
|
||||
processorLog.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue");
|
||||
processorLog.info("Successfully connected AMQPConsumer to {} and '{}' queue", connection, queueName);
|
||||
|
||||
final Channel channel = getChannel();
|
||||
consumer = new DefaultConsumer(channel) {
|
||||
|
|
|
@ -44,7 +44,7 @@ final class AMQPPublisher extends AMQPWorker {
|
|||
getChannel().addReturnListener(new UndeliverableMessageLogger());
|
||||
this.connectionString = connection.toString();
|
||||
|
||||
processorLog.info("Successfully connected AMQPPublisher to " + this.connectionString);
|
||||
processorLog.info("Successfully connected AMQPPublisher to {}", this.connectionString);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -66,8 +66,7 @@ final class AMQPPublisher extends AMQPWorker {
|
|||
if (exchange.length() == 0) {
|
||||
processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
|
||||
}
|
||||
processorLog.debug("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
|
||||
+ "' exchange with '" + routingKey + "' as a routing key.");
|
||||
processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey);
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
|
@ -52,7 +52,7 @@ abstract class AMQPWorker implements AutoCloseable {
|
|||
try {
|
||||
this.channel = connection.createChannel();
|
||||
} catch (IOException e) {
|
||||
processorLog.error("Failed to create Channel for " + connection, e);
|
||||
processorLog.error("Failed to create Channel for {}", connection, e);
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ abstract class AMQPWorker implements AutoCloseable {
|
|||
|
||||
if (channel.isOpen()) {
|
||||
if (processorLog.isDebugEnabled()) {
|
||||
processorLog.debug("Closing AMQP channel for " + this.channel.getConnection().toString());
|
||||
processorLog.debug("Closing AMQP channel for {}", this.channel.getConnection());
|
||||
}
|
||||
|
||||
this.channel.close();
|
||||
|
|
|
@ -215,7 +215,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
|
|||
try {
|
||||
updater.accept(attributeValue);
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to update AMQP Message Property " + attribute, e);
|
||||
getLogger().warn("Failed to update AMQP Message Property {}", attribute, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -279,7 +279,7 @@ public class JASN1Reader extends AbstractConfigurableComponent implements Record
|
|||
|
||||
Exception parseException = null;
|
||||
for (String asn1File : asnFilePaths) {
|
||||
logger.info("Parsing " + asn1File);
|
||||
logger.info("Parsing {}", asn1File);
|
||||
try {
|
||||
AsnModel model = getJavaModelFromAsn1File(asn1File);
|
||||
modulesByName.putAll(model.modulesByName);
|
||||
|
|
|
@ -243,7 +243,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAwsSyncProcessor
|
|||
flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_UNPROCESSED, itemKeys.toString());
|
||||
session.transfer(flowFile, REL_UNPROCESSED);
|
||||
|
||||
getLogger().error("Unprocessed key " + itemKeys + " for flow file " + flowFile);
|
||||
getLogger().error("Unprocessed key {} for flow file {}", itemKeys, flowFile);
|
||||
|
||||
keysToFlowFileMap.remove(itemKeys);
|
||||
}
|
||||
|
@ -252,7 +252,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAwsSyncProcessor
|
|||
try {
|
||||
validateRangeKeyValue(rangeKeyName, rangeKeyValue);
|
||||
} catch (final IllegalArgumentException e) {
|
||||
getLogger().error(e.getMessage() + ": " + flowFile, e);
|
||||
getLogger().error("{}", flowFile, e);
|
||||
flowFile = session.putAttribute(flowFile, DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
|
||||
+ "'/value '" + rangeKeyValue + "' inconsistency error");
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
|
@ -283,7 +283,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAwsSyncProcessor
|
|||
try {
|
||||
validateHashKeyValue(hashKeyValue);
|
||||
} catch (final IllegalArgumentException e) {
|
||||
getLogger().error(e.getMessage() + ": " + flowFile, e);
|
||||
getLogger().error("{}", flowFile, e);
|
||||
flowFile = session.putAttribute(flowFile, DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName + "/value '" + hashKeyValue + "' inconsistency error");
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
isConsistent = false;
|
||||
|
|
|
@ -369,7 +369,7 @@ public class AWSCredentialsProviderControllerService extends AbstractControllerS
|
|||
this.context = context;
|
||||
|
||||
credentialsProvider = createCredentialsProvider(context);
|
||||
getLogger().debug("Using credentials provider: " + credentialsProvider.getClass());
|
||||
getLogger().debug("Using credentials provider: {}", credentialsProvider.getClass());
|
||||
}
|
||||
|
||||
private AWSCredentialsProvider createCredentialsProvider(final PropertyContext propertyContext) {
|
||||
|
|
|
@ -159,19 +159,19 @@ public class DeleteDynamoDB extends AbstractDynamoDBProcessor {
|
|||
|
||||
// All non unprocessed items are successful
|
||||
for (final FlowFile flowFile : keysToFlowFileMap.values()) {
|
||||
getLogger().debug("Successfully deleted item from dynamodb : " + table);
|
||||
getLogger().debug("Successfully deleted item from dynamodb : {}", table);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
} catch (final AwsServiceException exception) {
|
||||
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles due to service exception", exception);
|
||||
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
} catch (final SdkException exception) {
|
||||
getLogger().error("Could not process flowFiles due to SDK exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles due to SDK exception", exception);
|
||||
List<FlowFile> failedFlowFiles = processSdkException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
} catch (final Exception exception) {
|
||||
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles", exception);
|
||||
List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
}
|
||||
|
|
|
@ -269,15 +269,15 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
|
|||
}
|
||||
|
||||
} catch (final AwsServiceException exception) {
|
||||
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles due to service exception", exception);
|
||||
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
} catch (final SdkException exception) {
|
||||
getLogger().error("Could not process flowFiles due to SDK exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles due to SDK exception", exception);
|
||||
List<FlowFile> failedFlowFiles = processSdkException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
} catch (final Exception exception) {
|
||||
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles", exception);
|
||||
List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
}
|
||||
|
|
|
@ -185,19 +185,19 @@ public class PutDynamoDB extends AbstractDynamoDBProcessor {
|
|||
|
||||
// Handle any remaining flowfiles
|
||||
for (final FlowFile flowFile : keysToFlowFileMap.values()) {
|
||||
getLogger().debug("Successful posted items to dynamodb : " + table);
|
||||
getLogger().debug("Successful posted items to dynamodb : {}", table);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
} catch (final AwsServiceException exception) {
|
||||
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles due to service exception", exception);
|
||||
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
} catch (final SdkException exception) {
|
||||
getLogger().error("Could not process flowFiles due to SDK exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles due to SDK exception", exception);
|
||||
List<FlowFile> failedFlowFiles = processSdkException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
} catch (Exception exception) {
|
||||
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
|
||||
getLogger().error("Could not process flowFiles", exception);
|
||||
List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
|
||||
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
|
|||
final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
|
||||
result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Error while reading records: " + e.getMessage(), e);
|
||||
getLogger().error("Error while reading records", e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -234,7 +234,6 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
|
|||
) {
|
||||
final Throwable error = result.getThrowable();
|
||||
final Throwable cause = error.getCause();
|
||||
final String message = error.getMessage();
|
||||
|
||||
if (cause instanceof ProvisionedThroughputExceededException) {
|
||||
// When DynamoDB returns with {@code ProvisionedThroughputExceededException}, the client reached it's write limitation and
|
||||
|
@ -243,13 +242,13 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
|
|||
context.yield();
|
||||
session.transfer(outgoingFlowFile, REL_UNPROCESSED);
|
||||
} else if (cause instanceof AwsServiceException) {
|
||||
getLogger().error("Could not process FlowFile due to server exception: " + message, error);
|
||||
getLogger().error("Could not process FlowFile due to server exception", error);
|
||||
session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AwsServiceException) cause), REL_FAILURE);
|
||||
} else if (cause instanceof SdkException) {
|
||||
getLogger().error("Could not process FlowFile due to client exception: " + message, error);
|
||||
getLogger().error("Could not process FlowFile due to client exception", error);
|
||||
session.transfer(processSdkException(session, Collections.singletonList(outgoingFlowFile), (SdkException) cause), REL_FAILURE);
|
||||
} else {
|
||||
getLogger().error("Could not process FlowFile: " + message, error);
|
||||
getLogger().error("Could not process FlowFile", error);
|
||||
session.transfer(outgoingFlowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
@ -378,7 +377,7 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
|
|||
|
||||
sortKeyValue = itemCounter;
|
||||
} else if (SORT_NONE.getValue().equals(sortKeyStrategy)) {
|
||||
logger.debug("No " + SORT_KEY_STRATEGY.getDisplayName() + " was applied");
|
||||
logger.debug("No {} was applied", SORT_KEY_STRATEGY.getDisplayName());
|
||||
sortKeyValue = null;
|
||||
} else {
|
||||
throw new ProcessException("Unknown " + SORT_KEY_STRATEGY.getDisplayName() + " \"" + sortKeyStrategy + "\"");
|
||||
|
|
|
@ -271,7 +271,7 @@ public abstract class AbstractKinesisRecordProcessor implements ShardRecordProce
|
|||
}
|
||||
|
||||
private void checkpointWithRetries(final RecordProcessorCheckpointer checkpointer) {
|
||||
log.debug("Checkpointing shard " + kinesisShardId);
|
||||
log.debug("Checkpointing shard {}", kinesisShardId);
|
||||
try {
|
||||
for (int i = 0; i < numRetries; i++) {
|
||||
if (attemptCheckpoint(checkpointer, i)) {
|
||||
|
|
|
@ -684,8 +684,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
ffFilename, bucket, key);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
getLogger().error("IOException initiating cache state while processing flow files: " +
|
||||
e.getMessage());
|
||||
getLogger().error("IOException initiating cache state while processing flow files", e);
|
||||
throw (e);
|
||||
}
|
||||
|
||||
|
@ -720,12 +719,10 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
try {
|
||||
persistLocalState(cacheKey, currentState);
|
||||
} catch (Exception e) {
|
||||
getLogger().info("Exception saving cache state while processing flow file: " +
|
||||
e.getMessage());
|
||||
getLogger().info("Exception saving cache state while processing flow file", e);
|
||||
throw (new ProcessException("Exception saving cache state", e));
|
||||
}
|
||||
getLogger().info("Success initiating upload flowfile={} available={} position={} " +
|
||||
"length={} bucket={} key={} uploadId={}",
|
||||
getLogger().info("Success initiating upload flowfile={} available={} position={} length={} bucket={} key={} uploadId={}",
|
||||
ffFilename, in.available(), currentState.getFilePosition(),
|
||||
currentState.getContentLength(), bucket, key,
|
||||
currentState.getUploadId());
|
||||
|
@ -782,8 +779,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
try {
|
||||
persistLocalState(cacheKey, currentState);
|
||||
} catch (Exception e) {
|
||||
getLogger().info("Exception saving cache state processing flow file: " +
|
||||
e.getMessage());
|
||||
getLogger().info("Exception saving cache state processing flow file", e);
|
||||
}
|
||||
int available = 0;
|
||||
try {
|
||||
|
@ -838,7 +834,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
getLogger().error("Error during upload of flow files: " + e.getMessage());
|
||||
getLogger().error("Error during upload of flow files", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
|
|
@ -147,8 +147,7 @@ public class AzureKeyVaultSecretsParameterProvider extends AbstractParameterProv
|
|||
}
|
||||
final String parameterGroupName = tags.get(GROUP_NAME_TAG);
|
||||
if (parameterGroupName == null) {
|
||||
getLogger().debug("Secret with parameter name [{}] not recognized as a valid parameter since it " +
|
||||
"does not have the [{}] tag", parameterName, GROUP_NAME_TAG);
|
||||
getLogger().debug("Secret with parameter name [{}] not recognized as a valid parameter since it does not have the [{}] tag", parameterName, GROUP_NAME_TAG);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -594,7 +594,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
|
|||
errorMessage = "Receive Events failed";
|
||||
}
|
||||
|
||||
getLogger().error(errorMessage + ". Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]",
|
||||
getLogger().error("{}. Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]",
|
||||
errorMessage,
|
||||
partitionContext.getFullyQualifiedNamespace(),
|
||||
partitionContext.getEventHubName(),
|
||||
partitionContext.getConsumerGroup(),
|
||||
|
|
|
@ -295,7 +295,7 @@ public class ComponentStateCheckpointStore implements CheckpointStore {
|
|||
|
||||
private void debug(String message, Object... arguments) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("[clientId={}] " + message, ArrayUtils.addFirst(arguments, clientId));
|
||||
LOGGER.debug("[clientId={}] {}", ArrayUtils.addFirst(arguments, clientId), message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ abstract class AbstractStorageClientFactory<CREDENTIAL, CLIENT> {
|
|||
*/
|
||||
public CLIENT getStorageClient(final CREDENTIAL credentialsDetails) {
|
||||
return clientCache.get(credentialsDetails, __ -> {
|
||||
logger.debug(credentialsDetails.getClass().getSimpleName() + " is not found in the cache with the given credentials. Creating it.");
|
||||
logger.debug("{} is not found in the cache with the given credentials. Creating it.", credentialsDetails.getClass().getSimpleName());
|
||||
return createStorageClient(credentialsDetails, proxyOptions);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class AzureCosmosDBClientService extends AbstractControllerService implem
|
|||
try {
|
||||
cosmosClient.close();
|
||||
} catch (CosmosException e) {
|
||||
getLogger().error("Closing cosmosClient Failed: " + e.getMessage(), e);
|
||||
getLogger().error("Closing cosmosClient Failed", e);
|
||||
} finally {
|
||||
this.cosmosClient = null;
|
||||
}
|
||||
|
|
|
@ -330,7 +330,7 @@ public class AzureLogAnalyticsProvenanceReportingTask extends AbstractAzureLogAn
|
|||
final HttpPost httpPost = new HttpPost(dataCollectorEndpoint);
|
||||
httpPost.addHeader("Content-Type", "application/json");
|
||||
httpPost.addHeader("Log-Type", logName);
|
||||
getLogger().debug("Sending " + batchSize + " events of length " + str.length() + " to azure log analytics " + logName);
|
||||
getLogger().debug("Sending {} events of length {} to azure log analytics {}", batchSize, str.length(), logName);
|
||||
try {
|
||||
sendToLogAnalytics(httpPost, workspaceId, linuxPrimaryKey, str);
|
||||
|
||||
|
|
|
@ -714,7 +714,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
// If there's no exception, the listener callback might not have been executed yet, so try again later. Otherwise clean up and start over next time
|
||||
if (e != null) {
|
||||
// Communications failure, disconnect and try next time
|
||||
log.error("Binlog connector communications failure: " + e.getMessage(), e);
|
||||
log.error("Binlog connector communications failure", e);
|
||||
try {
|
||||
stop();
|
||||
} catch (CDCException ioe) {
|
||||
|
@ -1312,7 +1312,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
getLogger().trace("Closing the pooled JDBC connection.");
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
getLogger().warn("Failed to close JDBC connection due to " + e, e);
|
||||
getLogger().warn("Failed to close JDBC connection", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -144,14 +144,14 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
|
|||
completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
|
||||
break;
|
||||
} catch (SchemaNotFoundException e) {
|
||||
logger.debug("Could not find schema in registry by subject name " + searchName, e);
|
||||
logger.debug("Could not find schema in registry by subject name {}", searchName, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (SchemaNotFoundException e) {
|
||||
logger.debug("Could not find schema metadata in registry by id and subjects in: " + schemaPath);
|
||||
logger.debug("Could not find schema metadata in registry by id and subjects in: {}", schemaPath);
|
||||
}
|
||||
|
||||
// Get all couples (subject name, version) for a given schema ID
|
||||
|
@ -179,7 +179,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
|
|||
}
|
||||
}
|
||||
} catch (SchemaNotFoundException e) {
|
||||
logger.debug("Could not find schema metadata in registry by id and versions in: " + schemaPath);
|
||||
logger.debug("Could not find schema metadata in registry by id and versions in: {}", schemaPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -102,13 +102,13 @@ public class ChunkHeader extends Block {
|
|||
while (offset > 0) {
|
||||
int token = new BinaryReader(binaryReader, offset - 10).read();
|
||||
if (token != 0x0c) {
|
||||
log.warn("Unexpected token when parsing template at offset " + offset);
|
||||
log.warn("Unexpected token when parsing template at offset {}", offset);
|
||||
break;
|
||||
}
|
||||
BinaryReader templateReader = new BinaryReader(binaryReader, offset - 4);
|
||||
int pointer = NumberUtil.intValueMax(templateReader.readDWord(), Integer.MAX_VALUE, "Invalid pointer.");
|
||||
if (offset != pointer) {
|
||||
log.warn("Invalid pointer when parsing template at offset " + offset);
|
||||
log.warn("Invalid pointer when parsing template at offset {}", offset);
|
||||
break;
|
||||
}
|
||||
TemplateNode templateNode = new TemplateNode(templateReader, this);
|
||||
|
|
|
@ -183,7 +183,7 @@ public class Bin {
|
|||
final String index = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
|
||||
if (index == null || index.isEmpty() || !binIndexSet.add(index)) {
|
||||
// Do not accept flowfile with duplicate fragment index value
|
||||
logger.warn("Duplicate or missing value for '" + FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in Bin", flowFile);
|
||||
logger.warn("Duplicate or missing value for '{}' in defragment mode. Flowfile {} not allowed in Bin", FRAGMENT_INDEX_ATTRIBUTE, flowFile);
|
||||
successiveFailedOfferings++;
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -84,9 +84,8 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
|
|||
datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
|
||||
final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
if (actualReceiveBufSize < maxBufferSize) {
|
||||
logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
|
||||
+ actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
|
||||
+ "maximum receive buffer");
|
||||
logger.warn("Attempted to set Socket Buffer Size to {} bytes but could only set to {} bytes. You may want to consider changing the Operating System's maximum receive buffer",
|
||||
maxBufferSize, actualReceiveBufSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -251,7 +251,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
destinationRelationship = REL_REJECT;
|
||||
transferFile = false;
|
||||
penalizeFile = false;
|
||||
logger.warn("Could not determine a unique name after 99 attempts for. Switching resolution mode to REJECT for " + flowFile);
|
||||
logger.warn("Could not determine a unique name after 99 attempts for. Switching resolution mode to REJECT for {}", flowFile);
|
||||
}
|
||||
break;
|
||||
case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
|
||||
|
|
|
@ -406,7 +406,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen
|
|||
try {
|
||||
statsThread.interrupt();
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Error interrupting thread: " + e.getMessage(), e);
|
||||
getLogger().warn("Error interrupting thread", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,14 +34,14 @@ public final class HDFSResourceHelper {
|
|||
try {
|
||||
interruptStatisticsThread(fileSystem);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Error stopping FileSystem statistics thread: " + e.getMessage());
|
||||
LOGGER.warn("Error stopping FileSystem statistics thread", e);
|
||||
LOGGER.debug("", e);
|
||||
} finally {
|
||||
if (fileSystem != null) {
|
||||
try {
|
||||
fileSystem.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("Error close FileSystem: " + e.getMessage(), e);
|
||||
LOGGER.warn("Error close FileSystem", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public final class HDFSResourceHelper {
|
|||
try {
|
||||
statsThread.interrupt();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Error interrupting thread: " + e.getMessage(), e);
|
||||
LOGGER.warn("Error interrupting thread", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -645,8 +645,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
upperBoundExclusiveTimestamp = currentTime - listingLagMillis;
|
||||
|
||||
if (getLogger().isTraceEnabled()) {
|
||||
getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp);
|
||||
getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
|
||||
getLogger().trace("interval: {} - {}", lowerBoundInclusiveTimestamp, upperBoundExclusiveTimestamp);
|
||||
getLogger().trace("entityList: {}", entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
|
||||
}
|
||||
entityList
|
||||
.stream()
|
||||
|
@ -658,7 +658,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
);
|
||||
|
||||
if (getLogger().isTraceEnabled()) {
|
||||
getLogger().trace("orderedEntries: " +
|
||||
getLogger().trace("orderedEntries: {}",
|
||||
orderedEntries.values().stream()
|
||||
.flatMap(List::stream)
|
||||
.map(entity -> entity.getName() + "_" + entity.getTimestamp())
|
||||
|
@ -692,13 +692,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
|
||||
try {
|
||||
if (getLogger().isTraceEnabled()) {
|
||||
getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
|
||||
getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: {} = {}", lastListedLatestEntryTimestampMillis, upperBoundExclusiveTimestamp);
|
||||
}
|
||||
lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
|
||||
persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context));
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
|
||||
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
|
||||
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or if another node begins executing this Processor, data duplication may occur.", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -948,7 +948,7 @@ public class AvroTypeUtil {
|
|||
|
||||
values.put(fieldName, coercedValue);
|
||||
} catch (Exception ex) {
|
||||
logger.debug("fail to convert field " + fieldName, ex );
|
||||
logger.debug("fail to convert field {}", fieldName, ex );
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ public class SocketChannelRecordReaderDispatcher implements Runnable, Closeable
|
|||
recordReaders.offer(socketChannelRecordReader);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Error dispatching connection: " + e.getMessage(), e);
|
||||
logger.error("Error dispatching connection", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -166,7 +166,7 @@ public class ProvenanceEventConsumer {
|
|||
try {
|
||||
state = stateManager.getState(Scope.LOCAL).toMap();
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to get state at start up due to:" + e.getMessage(), e);
|
||||
logger.error("Failed to get state at start up", e);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -202,7 +202,7 @@ public class ProvenanceEventConsumer {
|
|||
rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
|
||||
filteredEvents = filterEvents(componentMapHolder, rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
logger.error("Failed to retrieve Provenance Events from repository", ioe);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ public class ProvenanceEventConsumer {
|
|||
rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
|
||||
filteredEvents = filterEvents(componentMapHolder, rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
logger.error("Failed to retrieve Provenance Events from repository", ioe);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -339,7 +339,7 @@ public class GeohashRecord extends AbstractProcessor {
|
|||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
//lat/lon/geohash values out of range or is not valid
|
||||
getLogger().warn("Unable to " + (encode ? "encode" : "decode"), e);
|
||||
getLogger().warn("Unable to {}", (encode ? "encode" : "decode"), e);
|
||||
}
|
||||
|
||||
routingStrategyExecutor.writeFlowFiles(record, writer, notMatchedWriter, updated);
|
||||
|
|
|
@ -249,7 +249,7 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
|
|||
graphOutputStream.close();
|
||||
session.transfer(graph, GRAPH);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Error processing record at index " + records, e);
|
||||
getLogger().error("Error processing record at index {}", records, e);
|
||||
// write failed records to a flowfile destined for the failure relationship
|
||||
failedWriter.write(record);
|
||||
session.remove(graph);
|
||||
|
|
|
@ -235,7 +235,7 @@ public class Neo4JCypherClientService extends AbstractControllerService implemen
|
|||
try {
|
||||
neo4JDriver = getDriver(context);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Error while getting connection " + e.getLocalizedMessage(), e);
|
||||
getLogger().error("Error while getting connection", e);
|
||||
throw new ProcessException("Error while getting connection" + e.getLocalizedMessage(), e);
|
||||
}
|
||||
getLogger().info("Neo4JCypherExecutor connection created for url {}", connectionUrl);
|
||||
|
|
|
@ -261,13 +261,13 @@ public class ExecuteGroovyScript extends AbstractProcessor {
|
|||
//compile if needed
|
||||
getGroovyScript();
|
||||
} catch (Throwable t) {
|
||||
getLogger().error("Load script failed: " + t);
|
||||
getLogger().error("Load script failed", t);
|
||||
throw new ProcessException("Load script failed: " + t, t);
|
||||
}
|
||||
try {
|
||||
callScriptStatic("onStart", context);
|
||||
} catch (Throwable t) {
|
||||
getLogger().error("onStart failed: " + t);
|
||||
getLogger().error("onStart failed", t);
|
||||
throw new ProcessException("onStart failed: " + t, t);
|
||||
}
|
||||
}
|
||||
|
@ -361,7 +361,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
} catch (Throwable ei) {
|
||||
getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei);
|
||||
getLogger().warn("Failed to set autocommit=false for `{}`", e.getKey(), ei);
|
||||
}
|
||||
e.setValue(sql);
|
||||
}
|
||||
|
@ -394,7 +394,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
} catch (Throwable ei) {
|
||||
getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei);
|
||||
getLogger().warn("Failed to set autocommit=true for `{}`", e.getKey(), ei);
|
||||
}
|
||||
try {
|
||||
sql.close();
|
||||
|
|
|
@ -120,7 +120,7 @@ public class HDFSExternalResourceProvider implements ExternalResourceProvider {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("The following NARs were found: " + String.join(", ", result.stream().map(d -> d.getLocation()).collect(Collectors.toList())));
|
||||
LOGGER.debug("The following NARs were found: {}", String.join(", ", result.stream().map(d -> d.getLocation()).collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -486,7 +486,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
files.add(canonicalFile);
|
||||
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
|
||||
getLogger().debug("{} selected file at path: {}", this, canonicalFile);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ public class GetHDFSSequenceFile extends GetHDFS {
|
|||
logger.debug("Reading file");
|
||||
flowFiles = getFlowFiles(conf, hdfs, reader, file);
|
||||
if (!keepSourceFiles && !hdfs.delete(file, false)) {
|
||||
logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over...");
|
||||
logger.warn("Unable to delete path {} from HDFS. Will likely be picked up over and over...", file);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
final String errorString = "Error retrieving file {} from HDFS due to {}";
|
||||
|
|
|
@ -514,7 +514,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
|
|||
files.add(canonicalFile);
|
||||
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
|
||||
getLogger().debug("{} selected file at path: {}", this, canonicalFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,14 +104,14 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
|
|||
cache = hazelcastCacheManager.getCache(
|
||||
context.getProperty(HAZELCAST_CACHE_NAME).evaluateAttributeExpressions().getValue(),
|
||||
context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS));
|
||||
getLogger().debug("Enable Hazelcast cache client for cache " + cache.name());
|
||||
getLogger().debug("Enable Hazelcast cache client for cache {}", cache.name());
|
||||
}
|
||||
|
||||
@OnDisabled
|
||||
public void onDisabled() {
|
||||
if (cache != null) {
|
||||
// The cache state will be preserved until the Service is not stopped!
|
||||
getLogger().debug("Disable Hazelcast cache client for cache " + cache.name());
|
||||
getLogger().debug("Disable Hazelcast cache client for cache {}", cache.name());
|
||||
cache = null;
|
||||
}
|
||||
}
|
||||
|
@ -135,11 +135,12 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
|
|||
|
||||
if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_REVISION)) {
|
||||
cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_REVISION));
|
||||
getLogger().debug("Entry with key " + key + " was added during replace");
|
||||
getLogger().debug("Entry with key {} was added during replace", key);
|
||||
return true;
|
||||
} else if (oldValue != null && Objects.equals(entry.getRevision().get(), parseRevision(oldValue))) {
|
||||
cache.put(key, serialize(entry.getValue(), valueSerializer, entry.getRevision().get() + 1));
|
||||
getLogger().debug("Entry with key " + key + " was updated during replace, with revision " + entry.getRevision().get() + 1);
|
||||
final long newRevision = entry.getRevision().get() + 1;
|
||||
cache.put(key, serialize(entry.getValue(), valueSerializer, newRevision));
|
||||
getLogger().debug("Entry with key {} was updated during replace, with revision {}", key, newRevision);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ abstract class IMapBasedHazelcastCacheManager extends AbstractControllerService
|
|||
try {
|
||||
instance = getInstance(context);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Could not create Hazelcast instance. Reason: " + e.getMessage(), e);
|
||||
getLogger().error("Could not create Hazelcast instance.", e);
|
||||
|
||||
// In case of bind exception, we provide a more specific error message to avoid ambiguity
|
||||
if (e.getCause() instanceof BindException && e.getCause().getMessage().equals("Address already in use")) {
|
||||
|
|
|
@ -86,7 +86,7 @@ public class PutHBaseCell extends AbstractPutHBase {
|
|||
try {
|
||||
timestamp = Long.valueOf(timestampValue);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Invalid timestamp value: " + timestampValue, e);
|
||||
getLogger().error("Invalid timestamp value: {}", timestampValue, e);
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -172,7 +172,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
|||
try {
|
||||
timestamp = Long.valueOf(timestampValue);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Invalid timestamp value: " + timestampValue, e);
|
||||
getLogger().error("Invalid timestamp value: {}", timestampValue, e);
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -402,7 +402,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
|
|||
}
|
||||
|
||||
if (timestamp == null) {
|
||||
getLogger().warn("The value of timestamp field " + timestampFieldName + " was null, record will be inserted with latest timestamp");
|
||||
getLogger().warn("The value of timestamp field {} was null, record will be inserted with latest timestamp", timestampFieldName);
|
||||
}
|
||||
} else {
|
||||
timestamp = null;
|
||||
|
|
|
@ -115,7 +115,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
|
|||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
|
||||
getLogger().error("Privileged action failed with kerberos user {}", kerberosUser, e);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -177,7 +177,7 @@ public class IcebergRecordConverter {
|
|||
if (mappedFieldName.isEmpty()) {
|
||||
if (UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) {
|
||||
if (logger != null) {
|
||||
logger.warn("Cannot find field with name '" + name + "' in the record schema, using the target schema for datatype and a null value");
|
||||
logger.warn("Cannot find field with name '{}' in the record schema, using the target schema for datatype and a null value", name);
|
||||
}
|
||||
}
|
||||
// If the field is missing, use the expected type from the schema (converted to a DataType)
|
||||
|
|
|
@ -62,9 +62,9 @@ public class JMSConnectionFactoryHandler extends CachedJMSConnectionFactoryHandl
|
|||
public ConnectionFactory createConnectionFactory() {
|
||||
try {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Configuring " + getClass().getSimpleName() + " for '"
|
||||
+ context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be connected to '"
|
||||
+ context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue() + "'");
|
||||
logger.info("Configuring {} for '{}' to be connected to '{}'", getClass().getSimpleName(),
|
||||
context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue(),
|
||||
context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue());
|
||||
}
|
||||
|
||||
final ConnectionFactory connectionFactory = createConnectionFactoryInstance();
|
||||
|
@ -72,7 +72,7 @@ public class JMSConnectionFactoryHandler extends CachedJMSConnectionFactoryHandl
|
|||
|
||||
return connectionFactory;
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to configure " + getClass().getSimpleName(), e);
|
||||
logger.error("Failed to configure {}", getClass().getSimpleName(), e);
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
|
|||
connectionFactoryProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
|
||||
worker = buildTargetResource(context);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Failed to rebuild: " + connectionFactoryProvider);
|
||||
getLogger().error("Failed to rebuild: {}", connectionFactoryProvider);
|
||||
worker = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,7 +277,7 @@ class JMSConsumer extends JMSWorker {
|
|||
destinationName = (destination instanceof Queue) ? ((Queue) destination).getQueueName()
|
||||
: ((Topic) destination).getTopicName();
|
||||
} catch (JMSException e) {
|
||||
this.processLog.warn("Failed to retrieve Destination name for '" + headerName + "' header", e);
|
||||
this.processLog.warn("Failed to retrieve Destination name for '{}' header", headerName, e);
|
||||
}
|
||||
}
|
||||
return destinationName;
|
||||
|
|
|
@ -132,8 +132,7 @@ class JMSPublisher extends JMSWorker {
|
|||
.setProperty(message, entry.getKey(), entry.getValue());
|
||||
}
|
||||
} catch (NumberFormatException ne) {
|
||||
this.processLog.warn("Incompatible value for attribute " + entry.getKey()
|
||||
+ " [" + entry.getValue() + "] is not a number. Ignoring this attribute.");
|
||||
this.processLog.warn("Incompatible value for attribute {} [{}] is not a number. Ignoring this attribute.", entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -283,7 +283,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
|
|||
|
||||
private void handleException(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile, Exception e) {
|
||||
processSession.transfer(flowFile, REL_FAILURE);
|
||||
this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
|
||||
this.getLogger().error("Failed while sending message to JMS via {}", publisher, e);
|
||||
context.yield();
|
||||
publisher.setValid(false);
|
||||
}
|
||||
|
|
|
@ -615,13 +615,13 @@ public class ConsumerPool implements Closeable {
|
|||
try {
|
||||
consumer.unsubscribe();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed while unsubscribing " + consumer, e);
|
||||
logger.warn("Failed while unsubscribing {}", consumer, e);
|
||||
}
|
||||
|
||||
try {
|
||||
consumer.close();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed while closing " + consumer, e);
|
||||
logger.warn("Failed while closing {}", consumer, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -116,8 +116,8 @@ public class KerberosProvider implements LoginIdentityProvider {
|
|||
|
||||
final Authentication authentication = provider.authenticate(token);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Ran provider.authenticate() and returned authentication for " +
|
||||
"principal {} with name {} and is authenticated {}", authentication.getPrincipal(), authentication.getName(), authentication.isAuthenticated());
|
||||
logger.debug("Ran provider.authenticate() and returned authentication for principal {} with name {} and is authenticated {}",
|
||||
authentication.getPrincipal(), authentication.getName(), authentication.isAuthenticated());
|
||||
}
|
||||
|
||||
return new AuthenticationResponse(authentication.getName(), identity, expiration, issuer);
|
||||
|
|
|
@ -199,7 +199,7 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
getLogger().debug("Kudu connection successfully initialized");
|
||||
}
|
||||
} catch (final Exception ex) {
|
||||
getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex);
|
||||
getLogger().error("Exception occurred while interacting with Kudu", ex);
|
||||
throw new InitializationException(ex);
|
||||
}
|
||||
|
||||
|
|
|
@ -680,10 +680,10 @@ public class LdapUserGroupProvider implements UserGroupProvider {
|
|||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("-------------------------------------");
|
||||
logger.debug("Loaded the following users from LDAP:");
|
||||
userList.forEach((user) -> logger.debug(" - " + user));
|
||||
userList.forEach((user) -> logger.debug(" - {}", user));
|
||||
logger.debug("--------------------------------------");
|
||||
logger.debug("Loaded the following groups from LDAP:");
|
||||
groupList.forEach((group) -> logger.debug(" - " + group));
|
||||
groupList.forEach((group) -> logger.debug(" - {}", group));
|
||||
logger.debug("--------------------------------------");
|
||||
}
|
||||
|
||||
|
|
|
@ -416,7 +416,7 @@ public class RepositoryConfiguration {
|
|||
// which must be kept intact
|
||||
if (maxAttrChars < 36) {
|
||||
maxAttrChars = 36;
|
||||
logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead");
|
||||
logger.warn("Found max attribute length property set to {} but minimum length is 36; using 36 instead", maxAttrLength);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
maxAttrChars = defaultMaxAttrChars;
|
||||
|
|
|
@ -439,8 +439,8 @@ public class IndexDirectoryManager {
|
|||
// We didn't find the old index directory. Just add the new index directory.
|
||||
final long timestamp = DirectoryUtils.getIndexTimestamp(indexDirectory);
|
||||
if (timestamp < 0) {
|
||||
logger.debug("Attempted to replace old index directory {} with new index directory but the old index directory did not " +
|
||||
"exist and could not determine timestamp for new index directory", indexDirectory);
|
||||
logger.debug("Attempted to replace old index directory {} with new index directory but the old index directory did not exist and could not determine timestamp for new index directory",
|
||||
indexDirectory);
|
||||
} else {
|
||||
final String partitionName = getPartitionName(indexDirectory);
|
||||
if (partitionName == null) {
|
||||
|
|
|
@ -43,11 +43,11 @@ public class LuceneCacheWarmer implements Runnable {
|
|||
try {
|
||||
final File[] indexDirs = storageDir.listFiles(DirectoryUtils.INDEX_FILE_FILTER);
|
||||
if (indexDirs == null) {
|
||||
logger.info("Cannot warm Lucene Index Cache for " + storageDir + " because the directory could not be read");
|
||||
logger.info("Cannot warm Lucene Index Cache for {} because the directory could not be read", storageDir);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("Beginning warming of Lucene Index Cache for " + storageDir);
|
||||
logger.info("Beginning warming of Lucene Index Cache for {}", storageDir);
|
||||
final long startNanos = System.nanoTime();
|
||||
for (final File indexDir : indexDirs) {
|
||||
final long indexStartNanos = System.nanoTime();
|
||||
|
@ -69,7 +69,7 @@ public class LuceneCacheWarmer implements Runnable {
|
|||
final long warmSecs = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos);
|
||||
logger.info("Finished warming all Lucene Indexes for {} in {} seconds", storageDir, warmSecs);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to warm Lucene Index Cache for " + storageDir, e);
|
||||
logger.error("Failed to warm Lucene Index Cache for {}", storageDir, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -495,7 +495,7 @@ public class LuceneEventIndex implements EventIndex {
|
|||
try {
|
||||
eventOption = eventStore.getEvent(eventId);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e);
|
||||
logger.error("Failed to retrieve Provenance Event with ID {} to calculate data lineage", eventId, e);
|
||||
final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, user == null ? null : user.getIdentity());
|
||||
result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information.");
|
||||
return result;
|
||||
|
@ -862,8 +862,8 @@ public class LuceneEventIndex implements EventIndex {
|
|||
FileUtils.deleteFile(indexDirectory, true);
|
||||
logger.debug("Successfully deleted directory {}", indexDirectory);
|
||||
} catch (final IOException e) {
|
||||
logger.warn("The Lucene Index located at " + indexDirectory + " has expired and contains no Provenance Events that still exist in the respository. "
|
||||
+ "However, the directory could not be deleted.", e);
|
||||
logger.warn("The Lucene Index located at {} has expired and contains no Provenance Events that still exist in the repository. However, the directory could not be deleted.",
|
||||
indexDirectory, e);
|
||||
}
|
||||
|
||||
directoryManager.removeDirectory(indexDirectory);
|
||||
|
|
|
@ -102,7 +102,7 @@ public class QueryTask implements Runnable {
|
|||
return;
|
||||
} catch (final IOException ioe) {
|
||||
queryResult.setError("Failed to query index " + indexDir + "; see logs for more details");
|
||||
logger.error("Failed to query index " + indexDir, ioe);
|
||||
logger.error("Failed to query index {}", indexDir, ioe);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -135,7 +135,7 @@ public class QueryTask implements Runnable {
|
|||
|
||||
topDocs = searcher.getIndexSearcher().search(query, maxResults, sort);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to query Lucene for index " + indexDir, e);
|
||||
logger.error("Failed to query Lucene for index {}", indexDir, e);
|
||||
queryResult.setError("Failed to query Lucene for index " + indexDir + " due to " + e);
|
||||
return;
|
||||
} finally {
|
||||
|
@ -170,7 +170,7 @@ public class QueryTask implements Runnable {
|
|||
indexDir, query, eventsAndTotalHits.getKey().size(), eventsAndTotalHits.getValue(), millis);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to query events against index " + indexDir, e);
|
||||
logger.error("Failed to query events against index {}", indexDir, e);
|
||||
queryResult.setError("Failed to complete query due to " + e);
|
||||
} finally {
|
||||
indexManager.returnIndexSearcher(searcher);
|
||||
|
|
|
@ -102,7 +102,7 @@ public class DocsReader {
|
|||
}
|
||||
|
||||
if (record == null) {
|
||||
logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
|
||||
logger.warn("Failed to read Provenance Event for '{}'. The event file may be missing or corrupted", d);
|
||||
}
|
||||
|
||||
return record;
|
||||
|
|
|
@ -293,7 +293,7 @@ public abstract class CompressableRecordReader implements RecordReader {
|
|||
// cause that would need to be sorted out in this case, the Provenance Repository should be
|
||||
// resilient enough to handle this. Otherwise, we end up throwing an Exception, which may
|
||||
// prevent iterating over additional events in the repository.
|
||||
logger.error("Failed to read Provenance Event from " + filename + "; will skip this event and continue reading subsequent events", e);
|
||||
logger.error("Failed to read Provenance Event from {}; will skip this event and continue reading subsequent events", filename, e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -248,7 +248,7 @@ public abstract class PartitionedEventStore implements EventStore {
|
|||
try {
|
||||
partition.purgeOldEvents(maxFileLife, TimeUnit.MILLISECONDS);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to purge expired events from " + partition, e);
|
||||
logger.error("Failed to purge expired events from {}", partition, e);
|
||||
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY,
|
||||
"Failed to purge expired events from Provenance Repository. See logs for more information.");
|
||||
}
|
||||
|
@ -271,7 +271,7 @@ public abstract class PartitionedEventStore implements EventStore {
|
|||
final long removed = partition.purgeOldestEvents();
|
||||
currentSize -= removed;
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to purge oldest events from " + partition, e);
|
||||
logger.error("Failed to purge oldest events from {}", partition, e);
|
||||
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY,
|
||||
"Failed to purge oldest events from Provenance Repository. See logs for more information.");
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ public class RecordWriterLease {
|
|||
try {
|
||||
writer.close();
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to close " + writer, e);
|
||||
logger.warn("Failed to close {}", writer, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ public class RecordWriterLease {
|
|||
try {
|
||||
writer.close();
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to close " + writer, e);
|
||||
logger.warn("Failed to close {}", writer, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -684,7 +684,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
|
|||
try {
|
||||
eventIndex.commitChanges(partitionName);
|
||||
} catch (final IOException e) {
|
||||
logger.error("Failed to re-index Provenance Events for partition " + partitionName, e);
|
||||
logger.error("Failed to re-index Provenance Events for partition {}", partitionName, e);
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
|
|
|
@ -245,7 +245,7 @@ public class SimpleRedisDistributedMapCacheClientService extends AbstractControl
|
|||
try {
|
||||
redisConnection.close();
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Error closing connection: " + e.getMessage(), e);
|
||||
getLogger().warn("Error closing connection", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -360,7 +360,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
try {
|
||||
redisConnection.close();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Error closing connection: " + e.getMessage(), e);
|
||||
logger.warn("Error closing connection", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -388,7 +388,7 @@ public class RedisUtils {
|
|||
JedisConnectionFactory connectionFactory;
|
||||
|
||||
if (RedisType.STANDALONE == redisMode) {
|
||||
LOGGER.info("Connecting to Redis in standalone mode at " + connectionString);
|
||||
LOGGER.info("Connecting to Redis in standalone mode at {}", connectionString);
|
||||
final String[] hostAndPortSplit = connectionString.split("[:]");
|
||||
final String host = hostAndPortSplit[0].trim();
|
||||
final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
|
||||
|
@ -404,10 +404,10 @@ public class RedisUtils {
|
|||
enrichRedisConfiguration(sentinelConfiguration, dbIndex, username, password, sentinelUsername, sentinelPassword);
|
||||
|
||||
LOGGER.info("Connecting to Redis in sentinel mode...");
|
||||
LOGGER.info("Redis master = " + sentinelMaster);
|
||||
LOGGER.info("Redis master = {}", sentinelMaster);
|
||||
|
||||
for (final String sentinel : sentinels) {
|
||||
LOGGER.info("Redis sentinel at " + sentinel);
|
||||
LOGGER.info("Redis sentinel at {}", sentinel);
|
||||
}
|
||||
|
||||
connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration);
|
||||
|
@ -422,7 +422,7 @@ public class RedisUtils {
|
|||
|
||||
LOGGER.info("Connecting to Redis in clustered mode...");
|
||||
for (final String clusterNode : clusterNodes) {
|
||||
LOGGER.info("Redis cluster node at " + clusterNode);
|
||||
LOGGER.info("Redis cluster node at {}", clusterNode);
|
||||
}
|
||||
|
||||
connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration);
|
||||
|
|
|
@ -429,7 +429,7 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
|
|||
}
|
||||
});
|
||||
} catch (final Exception e) {
|
||||
logger.error("Unable to initialize scripted LookupService: " + e.getLocalizedMessage(), e);
|
||||
logger.error("Unable to initialize scripted LookupService", e);
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -435,7 +435,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
|
|||
}
|
||||
});
|
||||
} catch (final Exception e) {
|
||||
logger.error("Unable to initialize scripted Processor: " + e.getLocalizedMessage(), e);
|
||||
logger.error("Unable to initialize scripted Processor", e);
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
@ -628,11 +628,11 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
|
|||
invocable.invokeMethod(obj, methodName, params);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Configured script Processor does not contain the method " + methodName);
|
||||
logger.debug("Configured script Processor does not contain the method {}", methodName);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
// An error occurred during onScheduled, propagate it up
|
||||
logger.error("Error while executing the scripted processor's method " + methodName, e);
|
||||
logger.error("Error while executing the scripted processor's method {}", methodName, e);
|
||||
if (e instanceof ProcessException) {
|
||||
throw (ProcessException) e;
|
||||
}
|
||||
|
|
|
@ -235,7 +235,7 @@ public class ScriptedPartitionRecord extends ScriptedRecordProcessor {
|
|||
|
||||
return true;
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to partition records due to: " + e.getMessage(), e);
|
||||
getLogger().error("Failed to partition records", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -197,7 +197,7 @@ public abstract class ScriptedRouterProcessor<T> extends ScriptedRecordProcessor
|
|||
session.adjustCounter("Records Processed", counts.getRecordCount(), true);
|
||||
return true;
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to route records due to: " + e.getMessage(), e);
|
||||
getLogger().error("Failed to route records", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue