This closes #3401
This commit is contained in:
commit
cfc56a84ad
|
@ -1149,13 +1149,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
// it's already been through here once, giving up now
|
// it's already been through here once, giving up now
|
||||||
sendToDLA = false;
|
sendToDLA = false;
|
||||||
} else {
|
} else {
|
||||||
sendToDLA = addressSettings.isSendToDLAOnNoRoute();
|
sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sendToDLA) {
|
if (sendToDLA) {
|
||||||
// Send to the DLA for the address
|
// Send to the DLA for the address
|
||||||
|
|
||||||
SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
|
SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null;
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
|
logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
|
||||||
|
@ -1218,22 +1218,25 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
// HORNETQ-1029
|
// HORNETQ-1029
|
||||||
private void applyExpiryDelay(Message message, SimpleString address) {
|
private void applyExpiryDelay(Message message, SimpleString address) {
|
||||||
long expirationOverride = addressSettingsRepository.getMatch(address.toString()).getExpiryDelay();
|
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
|
||||||
|
if (settings != null) {
|
||||||
|
long expirationOverride = settings.getExpiryDelay();
|
||||||
|
|
||||||
// A -1 <expiry-delay> means don't do anything
|
// A -1 <expiry-delay> means don't do anything
|
||||||
if (expirationOverride >= 0) {
|
if (expirationOverride >= 0) {
|
||||||
// only override the expiration on messages where the expiration hasn't been set by the user
|
// only override the expiration on messages where the expiration hasn't been set by the user
|
||||||
if (message.getExpiration() == 0) {
|
if (message.getExpiration() == 0) {
|
||||||
message.setExpiration(System.currentTimeMillis() + expirationOverride);
|
message.setExpiration(System.currentTimeMillis() + expirationOverride);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
long minExpiration = addressSettingsRepository.getMatch(address.toString()).getMinExpiryDelay();
|
long minExpiration = settings.getMinExpiryDelay();
|
||||||
long maxExpiration = addressSettingsRepository.getMatch(address.toString()).getMaxExpiryDelay();
|
long maxExpiration = settings.getMaxExpiryDelay();
|
||||||
|
|
||||||
if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
|
if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
|
||||||
message.setExpiration(System.currentTimeMillis() + maxExpiration);
|
message.setExpiration(System.currentTimeMillis() + maxExpiration);
|
||||||
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
|
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
|
||||||
message.setExpiration(System.currentTimeMillis() + minExpiration);
|
message.setExpiration(System.currentTimeMillis() + minExpiration);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -934,7 +934,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
pageMaxCache = merged.pageMaxCache;
|
pageMaxCache = merged.pageMaxCache;
|
||||||
}
|
}
|
||||||
if (pageSizeBytes == null) {
|
if (pageSizeBytes == null) {
|
||||||
pageSizeBytes = merged.getPageSizeBytes();
|
pageSizeBytes = merged.pageSizeBytes;
|
||||||
}
|
}
|
||||||
if (messageCounterHistoryDayLimit == null) {
|
if (messageCounterHistoryDayLimit == null) {
|
||||||
messageCounterHistoryDayLimit = merged.messageCounterHistoryDayLimit;
|
messageCounterHistoryDayLimit = merged.messageCounterHistoryDayLimit;
|
||||||
|
|
|
@ -111,4 +111,9 @@ public class Match<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return value.toString();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -284,6 +284,24 @@ public class AddressSettingsTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test3LevelHierarchyPageSizeBytes() throws Exception {
|
||||||
|
ActiveMQServer server = createServer(true);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
AddressSettings level1 = new AddressSettings().setPageSizeBytes(100 * 1024);
|
||||||
|
AddressSettings level2 = new AddressSettings();
|
||||||
|
AddressSettings level3 = new AddressSettings();
|
||||||
|
server.getAddressSettingsRepository().clear();
|
||||||
|
server.getAddressSettingsRepository().setDefault(null);
|
||||||
|
HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
|
||||||
|
repos.addMatch("test.foo.bar", level3);
|
||||||
|
repos.addMatch("test.foo.#", level2);
|
||||||
|
repos.addMatch("test.#", level1);
|
||||||
|
|
||||||
|
assertEquals(100 * 1024, server.getAddressSettingsRepository().getMatch("test.foo.bar").getPageSizeBytes());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOverrideHierarchyWithDLA() throws Exception {
|
public void testOverrideHierarchyWithDLA() throws Exception {
|
||||||
ActiveMQServer server = createServer(false);
|
ActiveMQServer server = createServer(false);
|
||||||
|
|
Loading…
Reference in New Issue