ARTEMIS-2753 Expanding wildcard fix to Advisories
This commit is contained in:
parent
4bbd7ba4d1
commit
9571ca16db
|
@ -139,12 +139,7 @@ public class AMQConsumer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SimpleString destinationName;
|
SimpleString destinationName = new SimpleString(session.convertWildcard(openwireDestination));
|
||||||
if (openwireDestination.isTemporary()) {
|
|
||||||
destinationName = new SimpleString(openwireDestination.getPhysicalName());
|
|
||||||
} else {
|
|
||||||
destinationName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (openwireDestination.isTopic()) {
|
if (openwireDestination.isTopic()) {
|
||||||
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
|
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
|
||||||
|
|
|
@ -183,12 +183,7 @@ public class AMQSession implements SessionCallback {
|
||||||
}
|
}
|
||||||
if (openWireDest.isQueue()) {
|
if (openWireDest.isQueue()) {
|
||||||
openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
|
openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
|
||||||
SimpleString queueName;
|
SimpleString queueName = new SimpleString(convertWildcard(openWireDest));
|
||||||
if (!openWireDest.isTemporary()) {
|
|
||||||
queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
|
|
||||||
} else {
|
|
||||||
queueName = new SimpleString(openWireDest.getPhysicalName());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
|
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
|
||||||
throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
|
throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
|
||||||
|
@ -522,8 +517,12 @@ public class AMQSession implements SessionCallback {
|
||||||
connection.enableTtl();
|
connection.enableTtl();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String convertWildcard(String physicalName) {
|
public String convertWildcard(ActiveMQDestination openWireDest) {
|
||||||
return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration());
|
if (openWireDest.isTemporary() || AdvisorySupport.isAdvisoryTopic(openWireDest)) {
|
||||||
|
return openWireDest.getPhysicalName();
|
||||||
|
} else {
|
||||||
|
return OPENWIRE_WILDCARD.convert(openWireDest.getPhysicalName(), server.getConfiguration().getWildcardConfiguration());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServerSession getCoreSession() {
|
public ServerSession getCoreSession() {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
|
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -63,7 +64,7 @@ public class AMQConsumerTest {
|
||||||
Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
|
Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
|
||||||
Mockito.when(session.getCoreServer()).thenReturn(Mockito.mock(ActiveMQServer.class));
|
Mockito.when(session.getCoreServer()).thenReturn(Mockito.mock(ActiveMQServer.class));
|
||||||
Mockito.when(session.getCoreSession()).thenReturn(coreSession);
|
Mockito.when(session.getCoreSession()).thenReturn(coreSession);
|
||||||
Mockito.when(session.convertWildcard(ArgumentMatchers.any(String.class))).thenReturn("");
|
Mockito.when(session.convertWildcard(ArgumentMatchers.any(ActiveMQDestination.class))).thenReturn("");
|
||||||
|
|
||||||
ConsumerInfo info = new ConsumerInfo();
|
ConsumerInfo info = new ConsumerInfo();
|
||||||
info.setPrefetchSize(prefetchSize);
|
info.setPrefetchSize(prefetchSize);
|
||||||
|
|
Loading…
Reference in New Issue