mirror of https://github.com/apache/activemq.git
Fix slf4j logger use
This commit is contained in:
parent
64bf06f39b
commit
1a2de52c5f
|
@ -287,7 +287,7 @@ public class BrokerService implements Service {
|
||||||
} else {
|
} else {
|
||||||
ret = Security.addProvider(bouncycastle);
|
ret = Security.addProvider(bouncycastle);
|
||||||
}
|
}
|
||||||
LOG.info("Loaded the Bouncy Castle security provider at position: " + ret);
|
LOG.info("Loaded the Bouncy Castle security provider at position: {}", ret);
|
||||||
} catch(Throwable e) {
|
} catch(Throwable e) {
|
||||||
// No BouncyCastle found so we use the default Java Security Provider
|
// No BouncyCastle found so we use the default Java Security Provider
|
||||||
}
|
}
|
||||||
|
@ -309,7 +309,7 @@ public class BrokerService implements Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.warn("Error reading broker version ", ie);
|
LOG.warn("Error reading broker version", ie);
|
||||||
}
|
}
|
||||||
BROKER_VERSION = version;
|
BROKER_VERSION = version;
|
||||||
}
|
}
|
||||||
|
@ -651,7 +651,7 @@ public class BrokerService implements Service {
|
||||||
stop();
|
stop();
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
|
LOG.warn("Failed to stop broker after failure in start. This exception will be ignored", ex);
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -752,7 +752,7 @@ public class BrokerService implements Service {
|
||||||
brokerId = broker.getBrokerId();
|
brokerId = broker.getBrokerId();
|
||||||
|
|
||||||
// need to log this after creating the broker so we have its id and name
|
// need to log this after creating the broker so we have its id and name
|
||||||
LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
|
LOG.info("Apache ActiveMQ {} ({}, {}) is starting", getBrokerVersion(), getBrokerName(), brokerId);
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
if (isUseJmx()) {
|
if (isUseJmx()) {
|
||||||
|
@ -779,7 +779,7 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
startAllConnectors();
|
startAllConnectors();
|
||||||
|
|
||||||
LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
|
LOG.info("Apache ActiveMQ {} ({}, {}) started", getBrokerVersion(), getBrokerName(), brokerId);
|
||||||
LOG.info("For help or more information please see: http://activemq.apache.org");
|
LOG.info("For help or more information please see: http://activemq.apache.org");
|
||||||
|
|
||||||
getBroker().brokerServiceStarted();
|
getBroker().brokerServiceStarted();
|
||||||
|
@ -842,7 +842,7 @@ public class BrokerService implements Service {
|
||||||
}.start();
|
}.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
|
LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", getBrokerVersion(), getBrokerName(), brokerId);
|
||||||
|
|
||||||
removeShutdownHook();
|
removeShutdownHook();
|
||||||
if (this.scheduler != null) {
|
if (this.scheduler != null) {
|
||||||
|
@ -902,9 +902,9 @@ public class BrokerService implements Service {
|
||||||
this.destinationFactory = null;
|
this.destinationFactory = null;
|
||||||
|
|
||||||
if (startDate != null) {
|
if (startDate != null) {
|
||||||
LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
|
LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", getBrokerVersion(), getBrokerName(), brokerId, getUptime());
|
||||||
}
|
}
|
||||||
LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
|
LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", getBrokerVersion(), getBrokerName(), brokerId);
|
||||||
|
|
||||||
synchronized (shutdownHooks) {
|
synchronized (shutdownHooks) {
|
||||||
for (Runnable hook : shutdownHooks) {
|
for (Runnable hook : shutdownHooks) {
|
||||||
|
@ -966,9 +966,8 @@ public class BrokerService implements Service {
|
||||||
if (pollInterval <= 0) {
|
if (pollInterval <= 0) {
|
||||||
pollInterval = 30;
|
pollInterval = 30;
|
||||||
}
|
}
|
||||||
LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
|
LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}",
|
||||||
connectorName, queueName, timeout, pollInterval
|
connectorName, queueName, timeout, pollInterval);
|
||||||
});
|
|
||||||
TransportConnector connector;
|
TransportConnector connector;
|
||||||
for (int i = 0; i < transportConnectors.size(); i++) {
|
for (int i = 0; i < transportConnectors.size(); i++) {
|
||||||
connector = transportConnectors.get(i);
|
connector = transportConnectors.get(i);
|
||||||
|
@ -2061,10 +2060,8 @@ public class BrokerService implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
|
if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
|
||||||
LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
|
LOG.error("Store limit is {} mb, whilst the max journal file size for the store is {} mb, the store will not accept any data when used.",
|
||||||
" mb, whilst the max journal file size for the store is: " +
|
(storeLimit / (1024 * 1024)), (maxJournalFileSize / (1024 * 1024)));
|
||||||
maxJournalFileSize / (1024 * 1024) + " mb, " +
|
|
||||||
"the store will not accept any data when used.");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2094,10 +2091,8 @@ public class BrokerService implements Service {
|
||||||
long storeLimit = usage.getTempUsage().getLimit();
|
long storeLimit = usage.getTempUsage().getLimit();
|
||||||
|
|
||||||
if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
|
if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
|
||||||
LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
|
LOG.error("Temporary Store limit {} mb, whilst the max journal file size for the temporary store is {} mb, the temp store will not accept any data when used.",
|
||||||
" mb, whilst the max journal file size for the temporary store is: " +
|
(storeLimit / (1024 * 1024)), (maxJournalFileSize / (1024 * 1024)));
|
||||||
maxJournalFileSize / (1024 * 1024) + " mb, " +
|
|
||||||
"the temp store will not accept any data when used.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2112,8 +2107,8 @@ public class BrokerService implements Service {
|
||||||
long totalSpace = storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getTotalSpace();
|
long totalSpace = storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getTotalSpace();
|
||||||
long totalUsableSpace = (storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getUsableSpace()) + storeCurrent;
|
long totalUsableSpace = (storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getUsableSpace()) + storeCurrent;
|
||||||
if (totalUsableSpace < 0 || totalSpace < 0) {
|
if (totalUsableSpace < 0 || totalSpace < 0) {
|
||||||
|
LOG.error("File system space reported by {} was negative, possibly a huge file system, set a sane usage.total to provide some guidance", dir);
|
||||||
final String message = "File system space reported by: " + dir + " was negative, possibly a huge file system, set a sane usage.total to provide some guidance";
|
final String message = "File system space reported by: " + dir + " was negative, possibly a huge file system, set a sane usage.total to provide some guidance";
|
||||||
LOG.error(message);
|
|
||||||
throw new ConfigurationException(message);
|
throw new ConfigurationException(message);
|
||||||
}
|
}
|
||||||
//compute byte value of the percent limit
|
//compute byte value of the percent limit
|
||||||
|
@ -2134,11 +2129,11 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
//To prevent changing too often, check threshold
|
//To prevent changing too often, check threshold
|
||||||
if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) {
|
if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) {
|
||||||
LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to "
|
LOG.info("Usable disk space has been increased, attempting to regrow {} limit to {}% of the parition size",
|
||||||
+ percentLimit + "% of the partition size.");
|
storeName, percentLimit);
|
||||||
storeUsage.setLimit(newLimit);
|
storeUsage.setLimit(newLimit);
|
||||||
LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace
|
LOG.info("{} limit has been increase to {}% ({} mb) of the partition size.",
|
||||||
+ "% (" + newLimit / oneMeg + " mb) of the partition size.");
|
(newLimit * 100 / totalSpace), (newLimit / oneMeg));
|
||||||
}
|
}
|
||||||
|
|
||||||
//check if the limit is too large for the amount of usable space
|
//check if the limit is too large for the amount of usable space
|
||||||
|
@ -2155,16 +2150,17 @@ public class BrokerService implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (percentLimit > 0) {
|
if (percentLimit > 0) {
|
||||||
LOG.warn(storeName + " limit has been set to "
|
LOG.warn("{} limit has been set to {}% ({} mb) of the partition size but there is not enough usable space." +
|
||||||
+ percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)"
|
"The current store limit (which may have been adjusted by a previous usage limit check) is set to ({} mb) " +
|
||||||
+ " of the partition size but there is not enough usable space."
|
"but only {}% ({} mb) is available - resetting limit",
|
||||||
+ " The current store limit (which may have been adjusted by a"
|
storeName,
|
||||||
+ " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)"
|
percentLimit,
|
||||||
+ " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)"
|
(bytePercentLimit / oneMeg),
|
||||||
+ " is available - resetting limit");
|
(storeLimit / oneMeg),
|
||||||
|
(totalUsableSpace * 100 / totalSpace),
|
||||||
|
(totalUsableSpace / oneMeg));
|
||||||
} else {
|
} else {
|
||||||
LOG.warn(message + " - resetting to maximum available disk space: " +
|
LOG.warn("{} - resetting to maximum available disk space: {} mb", message, (totalUsableSpace / oneMeg));
|
||||||
totalUsableSpace / oneMeg + " mb");
|
|
||||||
}
|
}
|
||||||
storeUsage.setLimit(totalUsableSpace);
|
storeUsage.setLimit(totalUsableSpace);
|
||||||
}
|
}
|
||||||
|
@ -2210,7 +2206,8 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
if (adjustUsageLimits) {
|
if (adjustUsageLimits) {
|
||||||
usage.getMemoryUsage().setPercentOfJvmHeap(70);
|
usage.getMemoryUsage().setPercentOfJvmHeap(70);
|
||||||
LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
|
LOG.warn("{} mb - resetting to 70% of maximum available: {}",
|
||||||
|
message, (usage.getMemoryUsage().getLimit() / (1024 * 1024)));
|
||||||
} else {
|
} else {
|
||||||
LOG.error(message);
|
LOG.error(message);
|
||||||
throw new ConfigurationException(message);
|
throw new ConfigurationException(message);
|
||||||
|
@ -2244,10 +2241,12 @@ public class BrokerService implements Service {
|
||||||
long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
|
long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
|
||||||
long dirFreeSpace = schedulerDir.getUsableSpace();
|
long dirFreeSpace = schedulerDir.getUsableSpace();
|
||||||
if (schedulerLimit > dirFreeSpace) {
|
if (schedulerLimit > dirFreeSpace) {
|
||||||
LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
|
LOG.warn("Job Scheduler Store limit is {} mb, whilst the data directory: {} " +
|
||||||
" mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
|
"only has {} mb of usage space - resetting to {} mb.",
|
||||||
" only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
|
schedulerLimit / (1024 * 1024),
|
||||||
dirFreeSpace / (1024 * 1024) + " mb.");
|
schedulerDir.getAbsolutePath(),
|
||||||
|
dirFreeSpace / (1024 * 1024),
|
||||||
|
dirFreeSpace / (1042 * 1024));
|
||||||
usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
|
usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2339,7 +2338,7 @@ public class BrokerService implements Service {
|
||||||
ObjectName objectName = createNetworkConnectorObjectName(connector);
|
ObjectName objectName = createNetworkConnectorObjectName(connector);
|
||||||
getManagementContext().unregisterMBean(objectName);
|
getManagementContext().unregisterMBean(objectName);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
|
LOG.warn("Network Connector could not be unregistered from JMX due {}. This exception is ignored.", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2423,7 +2422,7 @@ public class BrokerService implements Service {
|
||||||
regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
|
regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
|
||||||
getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
|
getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
|
||||||
} catch(MalformedObjectNameException me){
|
} catch(MalformedObjectNameException me){
|
||||||
LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
|
LOG.warn("Cannot create ManagedRegionBroker due {}", me.getMessage(), me);
|
||||||
throw new IOException(me);
|
throw new IOException(me);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -2615,7 +2614,7 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
protected void logError(String message, Throwable e) {
|
protected void logError(String message, Throwable e) {
|
||||||
if (useLoggingForShutdownErrors) {
|
if (useLoggingForShutdownErrors) {
|
||||||
LOG.error("Failed to shut down: " + e);
|
LOG.error("Failed to shut down", e);
|
||||||
} else {
|
} else {
|
||||||
System.err.println("Failed to shut down: " + e);
|
System.err.println("Failed to shut down: " + e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,20 +144,17 @@ public class ProducerBrokerExchange {
|
||||||
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
|
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
|
||||||
if (producerSequenceId <= lastStoredForMessageProducer) {
|
if (producerSequenceId <= lastStoredForMessageProducer) {
|
||||||
canDispatch = false;
|
canDispatch = false;
|
||||||
LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
|
LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}",
|
||||||
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer
|
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
} else if (producerSequenceId <= lastSendSequenceNumber.get()) {
|
} else if (producerSequenceId <= lastSendSequenceNumber.get()) {
|
||||||
canDispatch = false;
|
canDispatch = false;
|
||||||
if (messageSend.isInTransaction()) {
|
if (messageSend.isInTransaction()) {
|
||||||
LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
|
LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}",
|
||||||
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
|
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber);
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
|
LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}",
|
||||||
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
|
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber);
|
||||||
});
|
|
||||||
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -238,9 +238,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
if (!stopping.get() && status.get() != PENDING_STOP) {
|
if (!stopping.get() && status.get() != PENDING_STOP) {
|
||||||
transportException.set(e);
|
transportException.set(e);
|
||||||
if (TRANSPORTLOG.isDebugEnabled()) {
|
if (TRANSPORTLOG.isDebugEnabled()) {
|
||||||
TRANSPORTLOG.debug(this + " failed: " + e, e);
|
TRANSPORTLOG.debug("{} failed: {}", this, e.getMessage(), e);
|
||||||
} else if (TRANSPORTLOG.isWarnEnabled() && !suppressed(e)) {
|
} else if (TRANSPORTLOG.isWarnEnabled() && !suppressed(e)) {
|
||||||
TRANSPORTLOG.warn(this + " failed: " + e);
|
TRANSPORTLOG.warn("{} failed", this, e);
|
||||||
}
|
}
|
||||||
stopAsync(e);
|
stopAsync(e);
|
||||||
}
|
}
|
||||||
|
@ -303,9 +303,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
inServiceException = true;
|
inServiceException = true;
|
||||||
try {
|
try {
|
||||||
if (SERVICELOG.isDebugEnabled()) {
|
if (SERVICELOG.isDebugEnabled()) {
|
||||||
SERVICELOG.debug("Async error occurred: " + e, e);
|
SERVICELOG.debug("Async error occurred: {}", e.getMessage(), e);
|
||||||
} else {
|
} else {
|
||||||
SERVICELOG.warn("Async error occurred: " + e);
|
SERVICELOG.warn("Async error occurred", e);
|
||||||
}
|
}
|
||||||
ConnectionError ce = new ConnectionError();
|
ConnectionError ce = new ConnectionError();
|
||||||
ce.setException(e);
|
ce.setException(e);
|
||||||
|
@ -334,12 +334,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
|
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
|
||||||
SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
|
SERVICELOG.debug("Error occurred while processing {} command: {}, exception: {}",
|
||||||
+ " command: " + command + ", exception: " + e, e);
|
(responseRequired ? "sync" : "async"),
|
||||||
|
command,
|
||||||
|
e.getMessage(),
|
||||||
|
e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
|
if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
|
||||||
LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
|
LOG.info("Suppressing reply to: {} on: {}, cause: {}", command, e, e.getCause());
|
||||||
responseRequired = false;
|
responseRequired = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,7 +380,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
|
if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
|
||||||
Transaction transaction = getActiveTransaction(command);
|
Transaction transaction = getActiveTransaction(command);
|
||||||
if (transaction != null && !transaction.isRollbackOnly()) {
|
if (transaction != null && !transaction.isRollbackOnly()) {
|
||||||
LOG.debug("on async exception, force rollback of transaction for: " + command, e);
|
LOG.debug("on async exception, force rollback of transaction for: {}", command, e);
|
||||||
transaction.setRollbackOnly(e);
|
transaction.setRollbackOnly(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -399,7 +402,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch(Exception ignored){
|
} catch(Exception ignored){
|
||||||
LOG.trace("failed to find active transaction for command: " + command, ignored);
|
LOG.trace("failed to find active transaction for command: {}", command, ignored);
|
||||||
}
|
}
|
||||||
return transaction;
|
return transaction;
|
||||||
}
|
}
|
||||||
|
@ -814,7 +817,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
registerConnectionState(info.getConnectionId(), state);
|
registerConnectionState(info.getConnectionId(), state);
|
||||||
LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
|
LOG.debug("Setting up new connection id: {}, address: {}, info: {}",
|
||||||
|
info.getConnectionId(), getRemoteAddress(), info);
|
||||||
this.faultTolerantConnection = info.isFaultTolerant();
|
this.faultTolerantConnection = info.isFaultTolerant();
|
||||||
// Setup the context.
|
// Setup the context.
|
||||||
String clientId = info.getClientId();
|
String clientId = info.getClientId();
|
||||||
|
@ -847,7 +851,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
brokerConnectionStates.remove(info.getConnectionId());
|
brokerConnectionStates.remove(info.getConnectionId());
|
||||||
}
|
}
|
||||||
unregisterConnectionState(info.getConnectionId());
|
unregisterConnectionState(info.getConnectionId());
|
||||||
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}", info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
|
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}",
|
||||||
|
info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
|
||||||
//AMQ-6561 - stop for all exceptions on addConnection
|
//AMQ-6561 - stop for all exceptions on addConnection
|
||||||
// close this down - in case the peer of this transport doesn't play nice
|
// close this down - in case the peer of this transport doesn't play nice
|
||||||
delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
|
delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
|
||||||
|
@ -982,7 +987,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
throw e;
|
throw e;
|
||||||
} else {
|
} else {
|
||||||
if (TRANSPORTLOG.isDebugEnabled()) {
|
if (TRANSPORTLOG.isDebugEnabled()) {
|
||||||
TRANSPORTLOG.debug("Unexpected exception on asyncDispatch, command of type: " + command.getDataStructureType(), e);
|
TRANSPORTLOG.debug("Unexpected exception on asyncDispatch, command of type: {}",
|
||||||
|
command.getDataStructureType(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -245,8 +245,8 @@ public abstract class AbstractRegion implements Region {
|
||||||
rc.add(sub);
|
rc.add(sub);
|
||||||
} catch (SecurityException e) {
|
} catch (SecurityException e) {
|
||||||
if (sub.isWildcard()) {
|
if (sub.isWildcard()) {
|
||||||
LOG.debug("Subscription denied for " + sub + " to destination " +
|
LOG.debug("Subscription denied for {} to destination {}: {}",
|
||||||
dest.getActiveMQDestination() + ": " + e.getMessage());
|
sub, dest.getActiveMQDestination(), e.getMessage());
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -338,7 +338,8 @@ public abstract class AbstractRegion implements Region {
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||||
LOG.debug("{} adding consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() });
|
LOG.debug("{} adding consumer: {} for destination: {}",
|
||||||
|
broker.getBrokerName(), info.getConsumerId(), info.getDestination());
|
||||||
ActiveMQDestination destination = info.getDestination();
|
ActiveMQDestination destination = info.getDestination();
|
||||||
if (destination != null && !destination.isPattern() && !destination.isComposite()) {
|
if (destination != null && !destination.isPattern() && !destination.isComposite()) {
|
||||||
// lets auto-create the destination
|
// lets auto-create the destination
|
||||||
|
@ -406,15 +407,16 @@ public abstract class AbstractRegion implements Region {
|
||||||
removeList.add(dest);
|
removeList.add(dest);
|
||||||
} catch (SecurityException e){
|
} catch (SecurityException e){
|
||||||
if (sub.isWildcard()) {
|
if (sub.isWildcard()) {
|
||||||
LOG.debug("Subscription denied for " + sub + " to destination " +
|
LOG.debug("Subscription denied for {} to destination {}: {}",
|
||||||
dest.getActiveMQDestination() + ": " + e.getMessage());
|
sub, dest.getActiveMQDestination(), e.getMessage());
|
||||||
} else {
|
} else {
|
||||||
// remove partial subscriptions
|
// remove partial subscriptions
|
||||||
for (Destination remove : removeList) {
|
for (Destination remove : removeList) {
|
||||||
try {
|
try {
|
||||||
remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
|
remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex);
|
LOG.error("Error unsubscribing {} from {}: {}",
|
||||||
|
sub, remove, ex.getMessage(), ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
subscriptions.remove(info.getConsumerId());
|
subscriptions.remove(info.getConsumerId());
|
||||||
|
@ -460,7 +462,8 @@ public abstract class AbstractRegion implements Region {
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||||
LOG.debug("{} removing consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() });
|
LOG.debug("{} removing consumer: {} for destination: {}",
|
||||||
|
broker.getBrokerName(), info.getConsumerId(), info.getDestination());
|
||||||
|
|
||||||
Subscription sub = subscriptions.remove(info.getConsumerId());
|
Subscription sub = subscriptions.remove(info.getConsumerId());
|
||||||
// The sub could be removed elsewhere - see ConnectionSplitBroker
|
// The sub could be removed elsewhere - see ConnectionSplitBroker
|
||||||
|
@ -685,7 +688,8 @@ public abstract class AbstractRegion implements Region {
|
||||||
entry.configurePrefetch(sub);
|
entry.configurePrefetch(sub);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()});
|
LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}",
|
||||||
|
control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize());
|
||||||
try {
|
try {
|
||||||
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
|
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -731,7 +731,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
if (node != QueueMessageReference.NULL_MESSAGE) {
|
if (node != QueueMessageReference.NULL_MESSAGE) {
|
||||||
nodeDest.getDestinationStatistics().getDispatched().increment();
|
nodeDest.getDestinationStatistics().getDispatched().increment();
|
||||||
incrementPrefetchCounter(node);
|
incrementPrefetchCounter(node);
|
||||||
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
|
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}",
|
||||||
|
info.getConsumerId(), message.getMessageId(), message.getDestination(),
|
||||||
|
getSubscriptionStatistics().getDispatched().getCount(), dispatched.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (node instanceof QueueMessageReference) {
|
if (node instanceof QueueMessageReference) {
|
||||||
|
@ -753,7 +755,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
if (node != QueueMessageReference.NULL_MESSAGE) {
|
if (node != QueueMessageReference.NULL_MESSAGE) {
|
||||||
nodeDest.getDestinationStatistics().getDispatched().increment();
|
nodeDest.getDestinationStatistics().getDispatched().increment();
|
||||||
incrementPrefetchCounter(node);
|
incrementPrefetchCounter(node);
|
||||||
LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
|
LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}",
|
||||||
|
info.getConsumerId(), message.getMessageId(), message.getDestination(),
|
||||||
|
getSubscriptionStatistics().getDispatched().getCount(), dispatched.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -232,7 +232,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
|
LOG.debug("{} Producer Flow Control Timeout Task is stopping", getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
public boolean recoverMessage(Message message) {
|
public boolean recoverMessage(Message message) {
|
||||||
recoveredAccumulator++;
|
recoveredAccumulator++;
|
||||||
if ((recoveredAccumulator % 10000) == 0) {
|
if ((recoveredAccumulator % 10000) == 0) {
|
||||||
LOG.info("cursor for {} has recovered {} messages. {}% complete", new Object[]{ getActiveMQDestination().getQualifiedName(), recoveredAccumulator, new Integer((int) (recoveredAccumulator * 100 / totalMessageCount))});
|
LOG.info("cursor for {} has recovered {} messages. {}% complete",
|
||||||
|
getActiveMQDestination().getQualifiedName(), recoveredAccumulator,
|
||||||
|
new Integer((int) (recoveredAccumulator * 100 / totalMessageCount)));
|
||||||
}
|
}
|
||||||
// Message could have expired while it was being
|
// Message could have expired while it was being
|
||||||
// loaded..
|
// loaded..
|
||||||
|
@ -421,7 +423,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
|
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
|
||||||
LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() });
|
LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}",
|
||||||
|
getActiveMQDestination().getQualifiedName(),
|
||||||
|
sub,
|
||||||
|
getDestinationStatistics().getDequeues().getCount(),
|
||||||
|
getDestinationStatistics().getDispatched().getCount(),
|
||||||
|
getDestinationStatistics().getInflight().getCount());
|
||||||
|
|
||||||
super.addSubscription(context, sub);
|
super.addSubscription(context, sub);
|
||||||
// synchronize with dispatch method so that no new messages are sent
|
// synchronize with dispatch method so that no new messages are sent
|
||||||
|
@ -1234,7 +1241,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
messagesLock.readLock().unlock();
|
messagesLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
|
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%",
|
||||||
|
max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage());
|
||||||
return (alreadyPagedIn == 0 || (alreadyPagedIn < max)
|
return (alreadyPagedIn == 0 || (alreadyPagedIn < max)
|
||||||
&& (alreadyPagedIn < messagesInQueue)
|
&& (alreadyPagedIn < messagesInQueue)
|
||||||
&& messages.hasSpace());
|
&& messages.hasSpace());
|
||||||
|
@ -1951,7 +1959,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
}finally {
|
}finally {
|
||||||
consumersLock.readLock().unlock();
|
consumersLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
LOG.debug("{} Message {} sent to {}", new Object[]{ broker.getBrokerName(), msg.getMessageId(), this.destination });
|
LOG.debug("{} Message {} sent to {}",broker.getBrokerName(), msg.getMessageId(), this.destination);
|
||||||
wakeup();
|
wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2023,18 +2031,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("{} toPageIn: {}, force:{}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}, maxPageSize:{}",
|
LOG.debug("{} toPageIn: {}, force:{}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}, maxPageSize:{}",
|
||||||
new Object[]{
|
this,
|
||||||
this,
|
toPageIn,
|
||||||
toPageIn,
|
force,
|
||||||
force,
|
destinationStatistics.getInflight().getCount(),
|
||||||
destinationStatistics.getInflight().getCount(),
|
pagedInMessages.size(),
|
||||||
pagedInMessages.size(),
|
pagedInPendingSize,
|
||||||
pagedInPendingSize,
|
destinationStatistics.getEnqueues().getCount(),
|
||||||
destinationStatistics.getEnqueues().getCount(),
|
destinationStatistics.getDequeues().getCount(),
|
||||||
destinationStatistics.getDequeues().getCount(),
|
getMemoryUsage().getUsage(),
|
||||||
getMemoryUsage().getUsage(),
|
maxPageSize);
|
||||||
maxPageSize
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toPageIn > 0 && (force || (haveRealConsumer() && pagedInPendingSize < maxPageSize))) {
|
if (toPageIn > 0 && (force || (haveRealConsumer() && pagedInPendingSize < maxPageSize))) {
|
||||||
|
|
|
@ -595,7 +595,8 @@ public class RegionBroker extends EmptyBroker {
|
||||||
brokerInfos.put(info.getBrokerId(), existing);
|
brokerInfos.put(info.getBrokerId(), existing);
|
||||||
}
|
}
|
||||||
existing.incrementRefCount();
|
existing.incrementRefCount();
|
||||||
LOG.debug("{} addBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size() });
|
LOG.debug("{} addBroker: {} brokerInfo size: {}",
|
||||||
|
getBrokerName(), info.getBrokerName(), brokerInfos.size());
|
||||||
addBrokerInClusterUpdate(info);
|
addBrokerInClusterUpdate(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -606,7 +607,8 @@ public class RegionBroker extends EmptyBroker {
|
||||||
if (existing != null && existing.decrementRefCount() == 0) {
|
if (existing != null && existing.decrementRefCount() == 0) {
|
||||||
brokerInfos.remove(info.getBrokerId());
|
brokerInfos.remove(info.getBrokerId());
|
||||||
}
|
}
|
||||||
LOG.debug("{} removeBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size()});
|
LOG.debug("{} removeBroker: {} brokerInfo size: {}",
|
||||||
|
getBrokerName(), info.getBrokerName(), brokerInfos.size());
|
||||||
// When stopping don't send cluster updates since we are the one's tearing down
|
// When stopping don't send cluster updates since we are the one's tearing down
|
||||||
// our own bridges.
|
// our own bridges.
|
||||||
if (!brokerService.isStopping()) {
|
if (!brokerService.isStopping()) {
|
||||||
|
|
|
@ -84,7 +84,10 @@ public class TempQueue extends Queue{
|
||||||
@Override
|
@Override
|
||||||
public void dispose(ConnectionContext context) throws IOException {
|
public void dispose(ConnectionContext context) throws IOException {
|
||||||
if (this.destinationStatistics.getMessages().getCount() > 0) {
|
if (this.destinationStatistics.getMessages().getCount() > 0) {
|
||||||
LOG.info("{} on dispose, purge of {} pending messages: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), this.destinationStatistics.getMessages().getCount(), messages });
|
LOG.info("{} on dispose, purge of {} pending messages: {}",
|
||||||
|
getActiveMQDestination().getQualifiedName(),
|
||||||
|
this.destinationStatistics.getMessages().getCount(),
|
||||||
|
messages);
|
||||||
// we may want to capture these message ids in an advisory
|
// we may want to capture these message ids in an advisory
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -894,10 +894,11 @@ public class Topic extends BaseDestination implements Task {
|
||||||
try {
|
try {
|
||||||
durableTopicSubscription.dispatchPending();
|
durableTopicSubscription.dispatchPending();
|
||||||
} catch (IOException exception) {
|
} catch (IOException exception) {
|
||||||
LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}, exception: {}", new Object[]{
|
LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}, exception: {}",
|
||||||
durableTopicSubscription,
|
durableTopicSubscription,
|
||||||
destination,
|
destination,
|
||||||
durableTopicSubscription.pending, exception });
|
durableTopicSubscription.pending,
|
||||||
|
exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,12 +138,10 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
}
|
}
|
||||||
if (!warnedAboutWait) {
|
if (!warnedAboutWait) {
|
||||||
LOG.info("{}: Pending message cursor [{}] is full, temp usage ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
|
LOG.info("{}: Pending message cursor [{}] is full, temp usage ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
|
||||||
new Object[]{
|
toString(),
|
||||||
toString(),
|
matched,
|
||||||
matched,
|
matched.getSystemUsage().getTempUsage().getPercentUsage(),
|
||||||
matched.getSystemUsage().getTempUsage().getPercentUsage(),
|
matched.getSystemUsage().getMemoryUsage().getPercentUsage());
|
||||||
matched.getSystemUsage().getMemoryUsage().getPercentUsage()
|
|
||||||
});
|
|
||||||
warnedAboutWait = true;
|
warnedAboutWait = true;
|
||||||
}
|
}
|
||||||
matchedListMutex.wait(20);
|
matchedListMutex.wait(20);
|
||||||
|
@ -189,9 +187,8 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
// lets avoid an infinite loop if we are given a bad eviction strategy
|
// lets avoid an infinite loop if we are given a bad eviction strategy
|
||||||
// for a bad strategy lets just not evict
|
// for a bad strategy lets just not evict
|
||||||
if (messagesToEvict == 0) {
|
if (messagesToEvict == 0) {
|
||||||
LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{
|
LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates",
|
||||||
destination, messageEvictionStrategy, list.size()
|
destination, messageEvictionStrategy, list.size());
|
||||||
});
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.activemq.broker.region.IndirectMessageReference;
|
||||||
import org.apache.activemq.broker.region.MessageReference;
|
import org.apache.activemq.broker.region.MessageReference;
|
||||||
import org.apache.activemq.broker.region.QueueMessageReference;
|
import org.apache.activemq.broker.region.QueueMessageReference;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
|
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.store.PList;
|
import org.apache.activemq.store.PList;
|
||||||
import org.apache.activemq.store.PListEntry;
|
import org.apache.activemq.store.PListEntry;
|
||||||
|
@ -443,8 +442,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
||||||
long start = 0;
|
long start = 0;
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
start = System.currentTimeMillis();
|
start = System.currentTimeMillis();
|
||||||
LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(),
|
LOG.trace("{}, flushToDisk() mem list size: {} {}",
|
||||||
(systemUsage != null ? systemUsage.getMemoryUsage() : "") });
|
name, memoryList.size(),
|
||||||
|
(systemUsage != null ? systemUsage.getMemoryUsage() : ""));
|
||||||
}
|
}
|
||||||
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
|
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
|
||||||
MessageReference node = iterator.next();
|
MessageReference node = iterator.next();
|
||||||
|
@ -461,7 +461,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
||||||
}
|
}
|
||||||
memoryList.clear();
|
memoryList.clear();
|
||||||
setCacheEnabled(false);
|
setCacheEnabled(false);
|
||||||
LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
|
LOG.trace("{}, flushToDisk() done - {} ms {}",
|
||||||
|
name,
|
||||||
|
(System.currentTimeMillis() - start),
|
||||||
|
(systemUsage != null ? systemUsage.getMemoryUsage() : ""));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -199,7 +199,8 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
||||||
if (prioritizedMessages && immediatePriorityDispatch && tsp.isPaging()) {
|
if (prioritizedMessages && immediatePriorityDispatch && tsp.isPaging()) {
|
||||||
if (msg.getPriority() > tsp.getLastRecoveredPriority()) {
|
if (msg.getPriority() > tsp.getLastRecoveredPriority()) {
|
||||||
tsp.recoverMessage(node.getMessage(), true);
|
tsp.recoverMessage(node.getMessage(), true);
|
||||||
LOG.trace("cached high priority ({}) message: {}, current paged batch priority: {}, cache size: {}", new Object[]{ msg.getPriority(), msg.getMessageId(), tsp.getLastRecoveredPriority(), tsp.batchList.size()});
|
LOG.trace("cached high priority ({}) message: {}, current paged batch priority: {}, cache size: {}",
|
||||||
|
msg.getPriority(), msg.getMessageId(), tsp.getLastRecoveredPriority(), tsp.batchList.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -173,7 +173,10 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
|
||||||
slowConsumers.remove(entry.getKey());
|
slowConsumers.remove(entry.getKey());
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
LOG.trace("Not yet time to abort consumer {}: slow duration = {}, slow count = {}", new Object[]{ entry.getKey().getConsumerInfo().getConsumerId(), entry.getValue().markCount * getCheckPeriod(), entry.getValue().slowCount });
|
LOG.trace("Not yet time to abort consumer {}: slow duration = {}, slow count = {}",
|
||||||
|
entry.getKey().getConsumerInfo().getConsumerId(),
|
||||||
|
entry.getValue().markCount * getCheckPeriod(),
|
||||||
|
entry.getValue().slowCount);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
|
||||||
slowConsumers.remove(entry.getKey());
|
slowConsumers.remove(entry.getKey());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
|
LOG.info("sub: {} is no longer slow", entry.getKey().getConsumerInfo().getConsumerId());
|
||||||
slowConsumers.remove(entry.getKey());
|
slowConsumers.remove(entry.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
|
||||||
|
|
||||||
Connection connection = connectionContext.getConnection();
|
Connection connection = connectionContext.getConnection();
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
|
LOG.debug("slowConsumer abort ignored, no connection in context: {}", connectionContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!abortMap.containsKey(connection)) {
|
if (!abortMap.containsKey(connection)) {
|
||||||
|
@ -152,9 +152,9 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
for (Subscription subscription : subscriptions) {
|
for (Subscription subscription : subscriptions) {
|
||||||
LOG.trace("Connection {} being aborted because of slow consumer: {} on destination: {}",
|
LOG.trace("Connection {} being aborted because of slow consumer: {} on destination: {}",
|
||||||
new Object[] { connection.getConnectionId(),
|
connection.getConnectionId(),
|
||||||
subscription.getConsumerInfo().getConsumerId(),
|
subscription.getConsumerInfo().getConsumerId(),
|
||||||
subscription.getActiveMQDestination() });
|
subscription.getActiveMQDestination());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,7 +224,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
|
||||||
toAbort.put(sub, entry);
|
toAbort.put(sub, entry);
|
||||||
abortSubscription(toAbort, abortSubscriberConnection);
|
abortSubscription(toAbort, abortSubscriberConnection);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
|
LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: {}", sub);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,12 +57,12 @@ public class PriorityNetworkDispatchPolicy extends SimpleDispatchPolicy {
|
||||||
// higher priority matching sub exists
|
// higher priority matching sub exists
|
||||||
highestPrioritySub = false;
|
highestPrioritySub = false;
|
||||||
LOG.debug("ignoring lower priority: {} [{}, {}] in favour of: {} [{}, {}]",
|
LOG.debug("ignoring lower priority: {} [{}, {}] in favour of: {} [{}, {}]",
|
||||||
new Object[]{ candidate,
|
candidate,
|
||||||
candidate.getConsumerInfo().getNetworkConsumerIds(),
|
candidate.getConsumerInfo().getNetworkConsumerIds(),
|
||||||
candidate.getConsumerInfo().getNetworkConsumerIds(),
|
candidate.getConsumerInfo().getNetworkConsumerIds(),
|
||||||
sub,
|
sub,
|
||||||
sub.getConsumerInfo().getNetworkConsumerIds(),
|
sub.getConsumerInfo().getNetworkConsumerIds(),
|
||||||
sub.getConsumerInfo().getNetworkConsumerIds() });
|
sub.getConsumerInfo().getNetworkConsumerIds());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,14 +89,14 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
@Override
|
@Override
|
||||||
public void dispatchSync(Command message) {
|
public void dispatchSync(Command message) {
|
||||||
if (message instanceof ExceptionResponse) {
|
if (message instanceof ExceptionResponse) {
|
||||||
LOG.warn("Unexpected response: " + message);
|
LOG.warn("Unexpected response: {}", message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dispatchAsync(Command command) {
|
public void dispatchAsync(Command command) {
|
||||||
if (command instanceof ExceptionResponse) {
|
if (command instanceof ExceptionResponse) {
|
||||||
LOG.warn("Unexpected response: " + command);
|
LOG.warn("Unexpected response: {}", command);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceException(Throwable error) {
|
public void serviceException(Throwable error) {
|
||||||
LOG.warn("Unexpected exception: " + error, error);
|
LOG.warn("Unexpected exception", error);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -152,7 +152,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceExceptionAsync(IOException e) {
|
public void serviceExceptionAsync(IOException e) {
|
||||||
LOG.warn("Unexpected async ioexception: " + e, e);
|
LOG.warn("Unexpected async ioexception", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -299,7 +299,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
|
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
if (now >= nextWarn) {
|
if (now >= nextWarn) {
|
||||||
LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
|
LOG.info("{}: {} (blocking for: {}s)", usage, logMessage, (now - start) / 1000);
|
||||||
nextWarn = now + 30000l;
|
nextWarn = now + 30000l;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -405,7 +405,8 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
messageSend.setExpiration(expiration);
|
messageSend.setExpiration(expiration);
|
||||||
}
|
}
|
||||||
messageSend.setTimestamp(newTimeStamp);
|
messageSend.setTimestamp(newTimeStamp);
|
||||||
LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ messageSend.getMessageId(), oldTimestamp, newTimeStamp });
|
LOG.debug("Set message {} timestamp from {} to {}",
|
||||||
|
messageSend.getMessageId(), oldTimestamp, newTimeStamp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -156,7 +156,8 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
|
||||||
if (isLogAll() || isLogConsumerEvents()) {
|
if (isLogAll() || isLogConsumerEvents()) {
|
||||||
LOG.info("Acknowledging message for client ID: {}{}", consumerExchange.getConnectionContext().getClientId(), (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
|
LOG.info("Acknowledging message for client ID: {}{}", consumerExchange.getConnectionContext().getClientId(), (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
|
||||||
if (ack.getMessageCount() > 1) {
|
if (ack.getMessageCount() > 1) {
|
||||||
LOG.trace("Message count: {}, First Message Id: {}, Last Message Id: {}", new Object[]{ ack.getMessageCount(), ack.getFirstMessageId(), ack.getLastMessageId() });
|
LOG.trace("Message count: {}, First Message Id: {}, Last Message Id: {}",
|
||||||
|
ack.getMessageCount(), ack.getFirstMessageId(), ack.getLastMessageId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
super.acknowledge(consumerExchange, ack);
|
super.acknowledge(consumerExchange, ack);
|
||||||
|
|
|
@ -172,9 +172,8 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
|
||||||
private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
|
private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
Destination regionDestination = (Destination) messageReference.getRegionDestination();
|
Destination regionDestination = (Destination) messageReference.getRegionDestination();
|
||||||
LOG.trace("redelivery #{} of: {} with delay: {}, dest: {}", new Object[]{
|
LOG.trace("redelivery #{} of: {} with delay: {}, dest: {}",
|
||||||
redeliveryCount, messageReference.getMessageId(), delay, regionDestination.getActiveMQDestination()
|
redeliveryCount, messageReference.getMessageId(), delay, regionDestination.getActiveMQDestination());
|
||||||
});
|
|
||||||
}
|
}
|
||||||
final Message old = messageReference.getMessage();
|
final Message old = messageReference.getMessage();
|
||||||
Message message = old.copy();
|
Message message = old.copy();
|
||||||
|
|
|
@ -125,7 +125,8 @@ public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
||||||
message.setExpiration(expiration);
|
message.setExpiration(expiration);
|
||||||
}
|
}
|
||||||
message.setTimestamp(newTimeStamp);
|
message.setTimestamp(newTimeStamp);
|
||||||
LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ message.getMessageId(), oldTimestamp, newTimeStamp });
|
LOG.debug("Set message {} timestamp from {} to {}",
|
||||||
|
message.getMessageId(), oldTimestamp, newTimeStamp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
super.send(producerExchange, message);
|
super.send(producerExchange, message);
|
||||||
|
|
|
@ -120,7 +120,8 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
|
||||||
if (match) {
|
if (match) {
|
||||||
LOG.trace("Replaying [{}] for [{}] back to origin in the absence of a local consumer", message.getMessageId(), message.getDestination());
|
LOG.trace("Replaying [{}] for [{}] back to origin in the absence of a local consumer", message.getMessageId(), message.getDestination());
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("Suppressing replay of [{}] for [{}] back to origin {}", new Object[]{ message.getMessageId(), message.getDestination(), Arrays.asList(message.getBrokerPath())} );
|
LOG.trace("Suppressing replay of [{}] for [{}] back to origin {}",
|
||||||
|
message.getMessageId(), message.getDestination(), Arrays.asList(message.getBrokerPath()));
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -129,9 +130,8 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
|
||||||
}
|
}
|
||||||
|
|
||||||
if (match && rateLimitExceeded()) {
|
if (match && rateLimitExceeded()) {
|
||||||
LOG.trace("Throttled network consumer rejecting [{}] for [{}] {}>{}/{}", new Object[]{
|
LOG.trace("Throttled network consumer rejecting [{}] for [{}] {}>{}/{}",
|
||||||
message.getMessageId(), message.getDestination(), matchCount, rateLimit, rateDuration
|
message.getMessageId(), message.getDestination(), matchCount, rateLimit, rateDuration);
|
||||||
});
|
|
||||||
match = false;
|
match = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,17 +149,15 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
|
||||||
if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
|
if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
|
||||||
|
|
||||||
if (!isSelectorAware()) {
|
if (!isSelectorAware()) {
|
||||||
LOG.trace("Not replaying [{}] for [{}] to origin due to existing local consumer: {}", new Object[]{
|
LOG.trace("Not replaying [{}] for [{}] to origin due to existing local consumer: {}",
|
||||||
message.getMessageId(), message.getDestination(), sub.getConsumerInfo()
|
message.getMessageId(), message.getDestination(), sub.getConsumerInfo());
|
||||||
});
|
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
if (sub.matches(message, mec)) {
|
if (sub.matches(message, mec)) {
|
||||||
LOG.trace("Not replaying [{}] for [{}] to origin due to existing selector matching local consumer: {}", new Object[]{
|
LOG.trace("Not replaying [{}] for [{}] to origin due to existing selector matching local consumer: {}",
|
||||||
message.getMessageId(), message.getDestination(), sub.getConsumerInfo()
|
message.getMessageId(), message.getDestination(), sub.getConsumerInfo());
|
||||||
});
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
} catch (Exception ignored) {}
|
} catch (Exception ignored) {}
|
||||||
|
|
|
@ -70,9 +70,8 @@ public class ConduitBridge extends DemandForwardingBridge {
|
||||||
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
|
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
|
||||||
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
|
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
|
||||||
if (canConduit(ds) && filter.matches(info.getDestination())) {
|
if (canConduit(ds) && filter.matches(info.getDestination())) {
|
||||||
LOG.debug("{} {} with ids {} matched (add interest) {}", new Object[]{
|
LOG.debug("{} {} with ids {} matched (add interest) {}",
|
||||||
configuration.getBrokerName(), info, info.getNetworkConsumerIds(), ds
|
configuration.getBrokerName(), info, info.getNetworkConsumerIds(), ds);
|
||||||
});
|
|
||||||
// add the interest in the subscription
|
// add the interest in the subscription
|
||||||
if (!info.isDurable()) {
|
if (!info.isDurable()) {
|
||||||
ds.add(info.getConsumerId());
|
ds.add(info.getConsumerId());
|
||||||
|
@ -118,9 +117,8 @@ public class ConduitBridge extends DemandForwardingBridge {
|
||||||
|
|
||||||
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
|
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
|
||||||
if (ds.remove(id)) {
|
if (ds.remove(id)) {
|
||||||
LOG.debug("{} on {} from {} removed interest for: {} from {}", new Object[]{
|
LOG.debug("{} on {} from {} removed interest for: {} from {}",
|
||||||
configuration.getBrokerName(), localBroker, remoteBrokerName, id, ds
|
configuration.getBrokerName(), localBroker, remoteBrokerName, id, ds);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
if (ds.isEmpty()) {
|
if (ds.isEmpty()) {
|
||||||
tmpList.add(ds);
|
tmpList.add(ds);
|
||||||
|
@ -129,9 +127,8 @@ public class ConduitBridge extends DemandForwardingBridge {
|
||||||
|
|
||||||
for (DemandSubscription ds : tmpList) {
|
for (DemandSubscription ds : tmpList) {
|
||||||
removeSubscription(ds);
|
removeSubscription(ds);
|
||||||
LOG.debug("{} on {} from {} removed {}", new Object[]{
|
LOG.debug("{} on {} from {} removed {}",
|
||||||
configuration.getBrokerName(), localBroker, remoteBrokerName, ds
|
configuration.getBrokerName(), localBroker, remoteBrokerName, ds);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -283,9 +283,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
try {
|
try {
|
||||||
// local start complete
|
// local start complete
|
||||||
if (startedLatch.getCount() < 2) {
|
if (startedLatch.getCount() < 2) {
|
||||||
LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
|
LOG.trace("{} unregister bridge ({}) to {}",
|
||||||
configuration.getBrokerName(), this, remoteBrokerName
|
configuration.getBrokerName(), this, remoteBrokerName);
|
||||||
});
|
|
||||||
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
|
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
|
||||||
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
|
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
|
||||||
}
|
}
|
||||||
|
@ -408,9 +407,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
// and if so just stop now before registering anything.
|
// and if so just stop now before registering anything.
|
||||||
remoteBrokerId = remoteBrokerInfo.getBrokerId();
|
remoteBrokerId = remoteBrokerInfo.getBrokerId();
|
||||||
if (localBrokerId.equals(remoteBrokerId)) {
|
if (localBrokerId.equals(remoteBrokerId)) {
|
||||||
LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
|
LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}",
|
||||||
configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
|
configuration.getBrokerName(), remoteBrokerName, remoteBrokerId);
|
||||||
});
|
|
||||||
ServiceSupport.dispose(localBroker);
|
ServiceSupport.dispose(localBroker);
|
||||||
ServiceSupport.dispose(remoteBroker);
|
ServiceSupport.dispose(remoteBroker);
|
||||||
// the bridge is left in a bit of limbo, but it won't get retried
|
// the bridge is left in a bit of limbo, but it won't get retried
|
||||||
|
@ -552,12 +550,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
// new peer broker (a consumer can work with remote broker also)
|
// new peer broker (a consumer can work with remote broker also)
|
||||||
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
|
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
|
||||||
|
|
||||||
LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
|
LOG.info("Network connection between {} and {} ({}) has been established.",
|
||||||
localBroker, remoteBroker, remoteBrokerName
|
localBroker, remoteBroker, remoteBrokerName);
|
||||||
});
|
LOG.trace("{} register bridge ({}) to {}",
|
||||||
LOG.trace("{} register bridge ({}) to {}", new Object[]{
|
configuration.getBrokerName(), this, remoteBrokerName);
|
||||||
configuration.getBrokerName(), this, remoteBrokerName
|
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
|
LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
|
||||||
}
|
}
|
||||||
|
@ -924,7 +920,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
try {
|
try {
|
||||||
remoteBroker.oneway(ack);
|
remoteBroker.oneway(ack);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to send advisory ack " + ack, e);
|
LOG.warn("Failed to send advisory ack {}", ack, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -945,25 +941,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
}
|
}
|
||||||
|
|
||||||
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
|
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
|
||||||
LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
|
LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}",
|
||||||
configuration.getBrokerName(), remoteBrokerName, networkTTL, info
|
configuration.getBrokerName(), remoteBrokerName, networkTTL, info);
|
||||||
});
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (contains(path, localBrokerPath[0])) {
|
if (contains(path, localBrokerPath[0])) {
|
||||||
// Ignore this consumer as it's a consumer we locally sent to the broker.
|
// Ignore this consumer as it's a consumer we locally sent to the broker.
|
||||||
LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
|
LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}",
|
||||||
configuration.getBrokerName(), remoteBrokerName, info
|
configuration.getBrokerName(), remoteBrokerName, info);
|
||||||
});
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isPermissableDestination(info.getDestination())) {
|
if (!isPermissableDestination(info.getDestination())) {
|
||||||
// ignore if not in the permitted or in the excluded list
|
// ignore if not in the permitted or in the excluded list
|
||||||
LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
|
LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}",
|
||||||
configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
|
configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info);
|
||||||
});
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -981,9 +974,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
final DestinationInfo destInfo = (DestinationInfo) data;
|
final DestinationInfo destInfo = (DestinationInfo) data;
|
||||||
BrokerId[] path = destInfo.getBrokerPath();
|
BrokerId[] path = destInfo.getBrokerPath();
|
||||||
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
|
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
|
||||||
LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
|
LOG.debug("{} Ignoring destination {} restricted to {} network hops only",
|
||||||
configuration.getBrokerName(), destInfo, networkTTL
|
configuration.getBrokerName(), destInfo, networkTTL);
|
||||||
});
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
|
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
|
||||||
|
@ -997,9 +989,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
|
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
|
||||||
}
|
}
|
||||||
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
|
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
|
||||||
LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
|
LOG.trace("{} bridging {} destination on {} from {}, destination: {}",
|
||||||
configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
|
configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo);
|
||||||
});
|
|
||||||
if (destInfo.isRemoveOperation()) {
|
if (destInfo.isRemoveOperation()) {
|
||||||
// not synced with addSubs so we will need to ignore any potential new subs with a timeout!=0
|
// not synced with addSubs so we will need to ignore any potential new subs with a timeout!=0
|
||||||
destInfo.setTimeout(1);
|
destInfo.setTimeout(1);
|
||||||
|
@ -1106,7 +1097,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
|
LOG.info("Network connection between {} and {} shutdown due to a local error: {}", localBroker, remoteBroker, error);
|
||||||
LOG.debug("The local Exception was: {}", error, error);
|
LOG.debug("The local Exception was: {}", error, error);
|
||||||
|
|
||||||
brokerService.getTaskRunnerFactory().execute(new Runnable() {
|
brokerService.getTaskRunnerFactory().execute(new Runnable() {
|
||||||
|
@ -1137,7 +1128,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("failed to fire forward failure advisory, cause: {}", (Object)e);
|
LOG.warn("failed to fire forward failure advisory, cause: {}", e);
|
||||||
LOG.debug("detail", e);
|
LOG.debug("detail", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1155,7 +1146,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
|
|
||||||
protected void removeSubscription(final DemandSubscription sub) throws IOException {
|
protected void removeSubscription(final DemandSubscription sub) throws IOException {
|
||||||
if (sub != null) {
|
if (sub != null) {
|
||||||
LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
|
LOG.trace("{} remove local subscription: {} for remote {}", configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId());
|
||||||
|
|
||||||
// ensure not available for conduit subs pending removal
|
// ensure not available for conduit subs pending removal
|
||||||
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
|
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
|
||||||
|
@ -1206,9 +1197,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
|
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
|
||||||
|
|
||||||
if (suppressMessageDispatch(md, sub)) {
|
if (suppressMessageDispatch(md, sub)) {
|
||||||
LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
|
LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}",
|
||||||
configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
|
configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage());
|
||||||
});
|
|
||||||
// still ack as it may be durable
|
// still ack as it may be durable
|
||||||
try {
|
try {
|
||||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||||
|
@ -1219,9 +1209,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
}
|
}
|
||||||
|
|
||||||
Message message = configureMessage(md);
|
Message message = configureMessage(md);
|
||||||
LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
|
LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}",
|
||||||
configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
|
configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId()));
|
||||||
});
|
|
||||||
if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
|
if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
|
||||||
try {
|
try {
|
||||||
// never request b/c they are eventually acked async
|
// never request b/c they are eventually acked async
|
||||||
|
@ -1498,18 +1487,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
boolean suppress = false;
|
boolean suppress = false;
|
||||||
|
|
||||||
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
|
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
|
||||||
LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
|
LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}",
|
||||||
configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
|
configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds());
|
||||||
});
|
|
||||||
suppress = true;
|
suppress = true;
|
||||||
} else {
|
} else {
|
||||||
// remove the existing lower priority duplicate and allow this candidate
|
// remove the existing lower priority duplicate and allow this candidate
|
||||||
try {
|
try {
|
||||||
removeDuplicateSubscription(existingSub);
|
removeDuplicateSubscription(existingSub);
|
||||||
|
|
||||||
LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
|
LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}",
|
||||||
configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
|
configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds());
|
||||||
});
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
|
LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
|
||||||
}
|
}
|
||||||
|
@ -1588,7 +1575,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
priority -= info.getBrokerPath().length + 1;
|
priority -= info.getBrokerPath().length + 1;
|
||||||
}
|
}
|
||||||
result.getLocalInfo().setPriority(priority);
|
result.getLocalInfo().setPriority(priority);
|
||||||
LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
|
LOG.debug("{} using priority: {} for subscription: {}", configuration.getBrokerName(), priority, info);
|
||||||
}
|
}
|
||||||
configureDemandSubscription(info, result);
|
configureDemandSubscription(info, result);
|
||||||
return result;
|
return result;
|
||||||
|
@ -1641,14 +1628,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
|
|
||||||
protected void removeDemandSubscription(ConsumerId id) throws IOException {
|
protected void removeDemandSubscription(ConsumerId id) throws IOException {
|
||||||
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
|
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
|
||||||
LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
|
LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}",
|
||||||
configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
|
configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub);
|
||||||
});
|
|
||||||
if (sub != null) {
|
if (sub != null) {
|
||||||
removeSubscription(sub);
|
removeSubscription(sub);
|
||||||
LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
|
LOG.debug("{} removed sub on {} from {}: {}",
|
||||||
configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
|
configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo());
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1964,9 +1949,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
|
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
|
||||||
if (producerSequenceId <= lastStoredForMessageProducer) {
|
if (producerSequenceId <= lastStoredForMessageProducer) {
|
||||||
result = false;
|
result = false;
|
||||||
LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
|
LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}",
|
||||||
(LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
|
(LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -114,7 +114,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
||||||
try {
|
try {
|
||||||
connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
|
connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
|
||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
LOG.warn("could not apply query parameters: {} to: {}", new Object[]{ parameters, connectUri }, e);
|
LOG.warn("could not apply query parameters: {} to: {}",parameters, connectUri, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Establishing network connection from {} to {}", localURI, connectUri);
|
LOG.info("Establishing network connection from {} to {}", localURI, connectUri);
|
||||||
|
@ -166,7 +166,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ServiceSupport.dispose(localTransport);
|
ServiceSupport.dispose(localTransport);
|
||||||
ServiceSupport.dispose(remoteTransport);
|
ServiceSupport.dispose(remoteTransport);
|
||||||
LOG.warn("Could not start network bridge between: {} and: {} due to: {}", new Object[]{ localURI, uri, e.getMessage() });
|
LOG.warn("Could not start network bridge between: {} and: {} due to: {}", localURI, uri, e.getMessage());
|
||||||
LOG.debug("Start failure exception: ", e);
|
LOG.debug("Start failure exception: ", e);
|
||||||
try {
|
try {
|
||||||
// Will remove bridge and active event.
|
// Will remove bridge and active event.
|
||||||
|
|
|
@ -290,14 +290,14 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
protected synchronized void addConnector(SearchResult result) throws Exception {
|
protected synchronized void addConnector(SearchResult result) throws Exception {
|
||||||
String uuid = toUUID(result);
|
String uuid = toUUID(result);
|
||||||
if (uuidMap.containsKey(uuid)) {
|
if (uuidMap.containsKey(uuid)) {
|
||||||
LOG.warn("connector already regsitered for UUID [{}]", uuid);
|
LOG.warn("connector already registered for UUID [{}]", uuid);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
URI connectorURI = toURI(result);
|
URI connectorURI = toURI(result);
|
||||||
if (connectorMap.containsKey(connectorURI)) {
|
if (connectorMap.containsKey(connectorURI)) {
|
||||||
int referenceCount = referenceMap.get(connectorURI) + 1;
|
int referenceCount = referenceMap.get(connectorURI) + 1;
|
||||||
LOG.warn("connector reference added for URI [{}], UUID [{}], total reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount });
|
LOG.warn("connector reference added for URI [{}], UUID [{}], total reference(s) [{}]",connectorURI, uuid, referenceCount);
|
||||||
referenceMap.put(connectorURI, referenceCount);
|
referenceMap.put(connectorURI, referenceCount);
|
||||||
uuidMap.put(uuid, connectorURI);
|
uuidMap.put(uuid, connectorURI);
|
||||||
return;
|
return;
|
||||||
|
@ -357,7 +357,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
int referenceCount = referenceMap.get(connectorURI) - 1;
|
int referenceCount = referenceMap.get(connectorURI) - 1;
|
||||||
referenceMap.put(connectorURI, referenceCount);
|
referenceMap.put(connectorURI, referenceCount);
|
||||||
uuidMap.remove(uuid);
|
uuidMap.remove(uuid);
|
||||||
LOG.debug("connector referenced removed for URI [{}], UUID[{}], remaining reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount });
|
LOG.debug("connector referenced removed for URI [{}], UUID[{}], remaining reference(s) [{}]", connectorURI, uuid, referenceCount);
|
||||||
|
|
||||||
if (referenceCount > 0) {
|
if (referenceCount > 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -434,7 +434,7 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
|
||||||
String uuidNew = event.getNewBinding().getName();
|
String uuidNew = event.getNewBinding().getName();
|
||||||
URI connectorURI = uuidMap.remove(uuidOld);
|
URI connectorURI = uuidMap.remove(uuidOld);
|
||||||
uuidMap.put(uuidNew, connectorURI);
|
uuidMap.put(uuidNew, connectorURI);
|
||||||
LOG.debug("connector reference renamed for URI [{}], Old UUID [{}], New UUID [{}]", new Object[]{ connectorURI, uuidOld, uuidNew });
|
LOG.debug("connector reference renamed for URI [{}], Old UUID [{}], New UUID [{}]", connectorURI, uuidOld, uuidNew);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -149,7 +149,7 @@ public abstract class DestinationBridge implements Service, MessageListener {
|
||||||
return;
|
return;
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("failed to forward message on attempt: {} reason: {} message: {}", new Object[]{ attempt, e, message });
|
LOG.info("failed to forward message on attempt: {} reason: {} message: {}", attempt, e, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -626,7 +626,7 @@ public abstract class JmsConnector implements Service {
|
||||||
|
|
||||||
return;
|
return;
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", new Object[]{ (local ? "local" : "foreign"), attempt }, e);
|
LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", (local ? "local" : "foreign"), attempt, e);
|
||||||
} finally {
|
} finally {
|
||||||
attempt++;
|
attempt++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -237,7 +237,7 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap {
|
||||||
currentContext.search(queueSearchBase, getFilterForPermissionType(permissionType),
|
currentContext.search(queueSearchBase, getFilterForPermissionType(permissionType),
|
||||||
constraints), DestinationType.QUEUE, permissionType);
|
constraints), DestinationType.QUEUE, permissionType);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Policy not applied!. Error processing policy under '{}' with filter '{}'", new Object[]{ queueSearchBase, getFilterForPermissionType(permissionType) }, e);
|
LOG.error("Policy not applied!. Error processing policy under '{}' with filter '{}'", queueSearchBase, getFilterForPermissionType(permissionType), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,7 +247,7 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap {
|
||||||
currentContext.search(topicSearchBase, getFilterForPermissionType(permissionType),
|
currentContext.search(topicSearchBase, getFilterForPermissionType(permissionType),
|
||||||
constraints), DestinationType.TOPIC, permissionType);
|
constraints), DestinationType.TOPIC, permissionType);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Policy not applied!. Error processing policy under '{}' with filter '{}'", new Object[]{ topicSearchBase, getFilterForPermissionType(permissionType) }, e);
|
LOG.error("Policy not applied!. Error processing policy under '{}' with filter '{}'", topicSearchBase, getFilterForPermissionType(permissionType), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,7 +257,7 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap {
|
||||||
currentContext.search(tempSearchBase, getFilterForPermissionType(permissionType),
|
currentContext.search(tempSearchBase, getFilterForPermissionType(permissionType),
|
||||||
constraints), DestinationType.TEMP, permissionType);
|
constraints), DestinationType.TEMP, permissionType);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Policy not applied!. Error processing policy under '{}' with filter '{}'", new Object[]{ tempSearchBase, getFilterForPermissionType(permissionType) }, e);
|
LOG.error("Policy not applied!. Error processing policy under '{}' with filter '{}'", tempSearchBase, getFilterForPermissionType(permissionType), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -405,7 +405,7 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap {
|
||||||
try {
|
try {
|
||||||
memberAttributes = context.getAttributes(memberDn, new String[] { "objectClass", groupNameAttribute, userNameAttribute });
|
memberAttributes = context.getAttributes(memberDn, new String[] { "objectClass", groupNameAttribute, userNameAttribute });
|
||||||
} catch (NamingException e) {
|
} catch (NamingException e) {
|
||||||
LOG.error("Policy not applied! Unknown member {} in policy entry {}", new Object[]{ memberDn, result.getNameInNamespace() }, e);
|
LOG.error("Policy not applied! Unknown member {} in policy entry {}", memberDn, result.getNameInNamespace(), e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -419,7 +419,7 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap {
|
||||||
group = true;
|
group = true;
|
||||||
Attribute name = memberAttributes.get(groupNameAttribute);
|
Attribute name = memberAttributes.get(groupNameAttribute);
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
LOG.error("Policy not applied! Group {} does not have name attribute {} under entry {}", new Object[]{ memberDn, groupNameAttribute, result.getNameInNamespace() });
|
LOG.error("Policy not applied! Group {} does not have name attribute {} under entry {}", memberDn, groupNameAttribute, result.getNameInNamespace());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,7 +430,7 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap {
|
||||||
user = true;
|
user = true;
|
||||||
Attribute name = memberAttributes.get(userNameAttribute);
|
Attribute name = memberAttributes.get(userNameAttribute);
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
LOG.error("Policy not applied! User {} does not have name attribute {} under entry {}", new Object[]{ memberDn, userNameAttribute, result.getNameInNamespace() });
|
LOG.error("Policy not applied! User {} does not have name attribute {} under entry {}", memberDn, userNameAttribute, result.getNameInNamespace());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -901,9 +901,9 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InvalidNameException e) {
|
} catch (InvalidNameException e) {
|
||||||
LOG.error("Policy not applied! Error parsing DN for object rename for rename of {} to {}", new Object[]{ oldBinding.getName(), newBinding.getName() }, e);
|
LOG.error("Policy not applied! Error parsing DN for object rename for rename of {} to {}", oldBinding.getName(), newBinding.getName(), e);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Policy not applied! Error processing object rename for rename of {} to {}", new Object[]{ oldBinding.getName(), newBinding.getName() }, e);
|
LOG.error("Policy not applied! Error processing object rename for rename of {} to {}", oldBinding.getName(), newBinding.getName(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue