ARTEMIS-1900 fix race in STOMP auto-create
This commit is contained in:
parent
ec87cc9a3b
commit
dc29a55e1b
|
@ -275,7 +275,6 @@ public final class StompConnection implements RemotingConnection {
|
|||
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
|
||||
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
|
||||
RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
|
||||
boolean checkAnycast = false;
|
||||
/**
|
||||
* If the address doesn't exist then it is created if possible.
|
||||
* If the address does exist but doesn't support the routing-type then the address is updated if possible.
|
||||
|
@ -284,8 +283,6 @@ public final class StompConnection implements RemotingConnection {
|
|||
if (addressSettings.isAutoCreateAddresses()) {
|
||||
session.createAddress(simpleQueue, effectiveAddressRoutingType, true);
|
||||
}
|
||||
|
||||
checkAnycast = true;
|
||||
} else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
|
||||
if (addressSettings.isAutoCreateAddresses()) {
|
||||
EnumSet<RoutingType> routingTypes = EnumSet.noneOf(RoutingType.class);
|
||||
|
@ -295,12 +292,10 @@ public final class StompConnection implements RemotingConnection {
|
|||
routingTypes.add(effectiveAddressRoutingType);
|
||||
manager.getServer().updateAddressInfo(simpleQueue, routingTypes);
|
||||
}
|
||||
|
||||
checkAnycast = true;
|
||||
}
|
||||
|
||||
// only auto create the queue if the address is ANYCAST
|
||||
if (checkAnycast && effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
|
||||
if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues() && manager.getServer().locateQueue(simpleQueue) == null) {
|
||||
session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
|
||||
}
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
|
|
|
@ -186,8 +186,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
|||
// Public --------------------------------------------------------
|
||||
|
||||
public boolean send(final StompConnection connection, final StompFrame frame) {
|
||||
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQServerLogger.LOGGER.trace("sent " + frame);
|
||||
if (ActiveMQStompProtocolLogger.LOGGER.isTraceEnabled()) {
|
||||
ActiveMQStompProtocolLogger.LOGGER.trace("sent " + frame);
|
||||
}
|
||||
|
||||
invokeInterceptors(this.outgoingInterceptors, frame, connection);
|
||||
|
|
|
@ -252,11 +252,13 @@ public class StompSession implements SessionCallback {
|
|||
String destination,
|
||||
String selector,
|
||||
String ack) throws Exception {
|
||||
SimpleString address = SimpleString.toSimpleString(destination);
|
||||
SimpleString queueName = SimpleString.toSimpleString(destination);
|
||||
SimpleString selectorSimple = SimpleString.toSimpleString(selector);
|
||||
boolean pubSub = false;
|
||||
final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;
|
||||
|
||||
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
|
||||
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
|
||||
boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
|
||||
if (topic) {
|
||||
// subscribes to a topic
|
||||
|
@ -267,14 +269,14 @@ public class StompSession implements SessionCallback {
|
|||
}
|
||||
queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
|
||||
if (manager.getServer().locateQueue(queueName) == null) {
|
||||
session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), false, true);
|
||||
session.createQueue(address, queueName, selectorSimple, false, true);
|
||||
}
|
||||
} else {
|
||||
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
||||
session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), true, false);
|
||||
session.createQueue(address, queueName, selectorSimple, true, false);
|
||||
}
|
||||
}
|
||||
final ServerConsumer consumer = topic ? session.createConsumer(consumerID, queueName, null, false, false, 0) : session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, 0);
|
||||
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic ? null : selectorSimple, false, false, 0);
|
||||
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
|
||||
subscriptions.put(consumerID, subscription);
|
||||
session.start();
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.stomp;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class StompTestMultiThreaded extends StompTestBase {
|
||||
|
||||
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
private static final SimpleString QUEUE = new SimpleString("x");
|
||||
|
||||
class SomeConsumer extends Thread {
|
||||
|
||||
private final StompClientConnection conn;
|
||||
|
||||
boolean failed = false;
|
||||
|
||||
SomeConsumer() throws Exception {
|
||||
URI uri = createStompClientUri(scheme, "localhost", 61614);
|
||||
this.conn = StompClientConnectionFactory.createClientConnection(uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
conn.connect(defUser, defPass);
|
||||
if (!subscribe(conn, UUID.randomUUID().toString(), Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, "/queue/" + QUEUE, true).getCommand().equals(Stomp.Responses.RECEIPT)) {
|
||||
failed = true;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
failed = true;
|
||||
} finally {
|
||||
try {
|
||||
conn.disconnect();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoConcurrentSubscribers() throws Exception {
|
||||
server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteAddresses(false).setAutoDeleteQueues(false));
|
||||
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://localhost:61614?protocols=STOMP&anycastPrefix=/queue/").start();
|
||||
|
||||
int nThreads = 2;
|
||||
|
||||
SomeConsumer[] consumers = new SomeConsumer[nThreads];
|
||||
for (int j = 0; j < 1000; j++) {
|
||||
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
consumers[i] = new SomeConsumer();
|
||||
}
|
||||
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
consumers[i].start();
|
||||
}
|
||||
|
||||
for (SomeConsumer consumer : consumers) {
|
||||
consumer.join();
|
||||
Assert.assertFalse(consumer.failed);
|
||||
}
|
||||
|
||||
// delete queue here so it can be auto-created again during the next loop iteration
|
||||
server.getActiveMQServer().locateQueue(QUEUE).deleteQueue();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue