This closes #3208
This commit is contained in:
commit
663c1d67d4
|
@ -1033,7 +1033,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
public RoutingStatus route(final Message message,
|
public RoutingStatus route(final Message message,
|
||||||
final RoutingContext context,
|
final RoutingContext context,
|
||||||
final boolean direct) throws Exception {
|
final boolean direct) throws Exception {
|
||||||
return route(message, context, direct, true, null);
|
return route(message, context, direct, true, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1043,6 +1043,21 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
boolean rejectDuplicates,
|
boolean rejectDuplicates,
|
||||||
final Binding bindingMove) throws Exception {
|
final Binding bindingMove) throws Exception {
|
||||||
|
|
||||||
|
return route(message, context, direct, rejectDuplicates, bindingMove, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The route can call itelf sending to DLA.
|
||||||
|
* if a DLA still not found, it should then use previous semantics.
|
||||||
|
* */
|
||||||
|
private RoutingStatus route(final Message message,
|
||||||
|
final RoutingContext context,
|
||||||
|
final boolean direct,
|
||||||
|
boolean rejectDuplicates,
|
||||||
|
final Binding bindingMove, boolean sendToDLA) throws Exception {
|
||||||
|
|
||||||
|
|
||||||
RoutingStatus result;
|
RoutingStatus result;
|
||||||
// Sanity check
|
// Sanity check
|
||||||
if (message.getRefCount() > 0) {
|
if (message.getRefCount() > 0) {
|
||||||
|
@ -1102,7 +1117,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
|
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
|
||||||
|
|
||||||
boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
|
|
||||||
|
if (sendToDLA) {
|
||||||
|
// it's already been through here once, giving up now
|
||||||
|
sendToDLA = false;
|
||||||
|
} else {
|
||||||
|
sendToDLA = addressSettings.isSendToDLAOnNoRoute();
|
||||||
|
}
|
||||||
|
|
||||||
if (sendToDLA) {
|
if (sendToDLA) {
|
||||||
// Send to the DLA for the address
|
// Send to the DLA for the address
|
||||||
|
@ -1123,7 +1144,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
message.reencode();
|
message.reencode();
|
||||||
|
|
||||||
route(message, context.getTransaction(), false);
|
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, sendToDLA);
|
||||||
result = RoutingStatus.NO_BINDINGS_DLA;
|
result = RoutingStatus.NO_BINDINGS_DLA;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -304,7 +304,7 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
|
||||||
clearCache();
|
clearCache();
|
||||||
wildcardMatches.remove(modMatch);
|
wildcardMatches.remove(modMatch);
|
||||||
} else {
|
} else {
|
||||||
cache.remove(modMatch);
|
clearCache();
|
||||||
exactMatches.remove(modMatch);
|
exactMatches.remove(modMatch);
|
||||||
}
|
}
|
||||||
onChange();
|
onChange();
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.addressing;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class SendDLQNoRouteTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
private ActiveMQServer server;
|
||||||
|
|
||||||
|
private ClientSessionFactory sessionFactory;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
server = createServer(true);
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 20_000)
|
||||||
|
public void testDLQNoRoute() throws Exception {
|
||||||
|
AddressSettings addressSettings = new AddressSettings().setSendToDLAOnNoRoute(true);
|
||||||
|
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString("DLA"));
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", addressSettings);
|
||||||
|
|
||||||
|
AddressInfo info = new AddressInfo(SimpleString.toSimpleString("info")).addRoutingType(RoutingType.MULTICAST);
|
||||||
|
server.addAddressInfo(info);
|
||||||
|
|
||||||
|
AddressInfo dla = new AddressInfo(SimpleString.toSimpleString("DLA")).addRoutingType(RoutingType.MULTICAST);
|
||||||
|
server.addAddressInfo(dla);
|
||||||
|
|
||||||
|
|
||||||
|
ServerLocator locator = createNonHALocator(false);
|
||||||
|
|
||||||
|
ClientSessionFactory factory = locator.createSessionFactory();
|
||||||
|
ClientSession session = factory.createSession(true, true);
|
||||||
|
ClientProducer producer = session.createProducer("info");
|
||||||
|
|
||||||
|
producer.send(session.createMessage(true));
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -1313,7 +1313,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
public void testRemoveAddressSettingsEffective() throws Exception {
|
public void testRemoveAddressSettingsEffective() throws Exception {
|
||||||
ActiveMQServerControl serverControl = createManagementControl();
|
ActiveMQServerControl serverControl = createManagementControl();
|
||||||
String addr = "test";
|
String addr = "test";
|
||||||
String root = "#";
|
String root = "test.#";
|
||||||
|
|
||||||
String DLA = "someDLA";
|
String DLA = "someDLA";
|
||||||
String expiryAddress = "someExpiry";
|
String expiryAddress = "someExpiry";
|
||||||
|
@ -1322,8 +1322,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
long maxExpiryDelay = 20000;
|
long maxExpiryDelay = 20000;
|
||||||
boolean lastValueQueue = true;
|
boolean lastValueQueue = true;
|
||||||
int deliveryAttempts = 1;
|
int deliveryAttempts = 1;
|
||||||
long maxSizeBytes = 20;
|
long maxSizeBytes = 10 * 1024 * 1024;
|
||||||
int pageSizeBytes = 10;
|
int pageSizeBytes = 1024 * 1024;
|
||||||
int pageMaxCacheSize = 7;
|
int pageMaxCacheSize = 7;
|
||||||
long redeliveryDelay = 4;
|
long redeliveryDelay = 4;
|
||||||
double redeliveryMultiplier = 1;
|
double redeliveryMultiplier = 1;
|
||||||
|
|
Loading…
Reference in New Issue