This commit is contained in:
Clebert Suconic 2017-09-13 18:27:39 -04:00
commit 621a62fcb5
1 changed files with 5 additions and 15 deletions

View File

@ -99,33 +99,24 @@ public class AMQConsumer {
} }
} }
String physicalName = session.convertWildcard(openwireDestination.getPhysicalName()); SimpleString destinationName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
SimpleString address;
if (openwireDestination.isTopic()) { if (openwireDestination.isTopic()) {
if (openwireDestination.isTemporary()) { SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
address = new SimpleString(physicalName);
} else {
address = new SimpleString(physicalName);
}
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
//only advisory topic consumers need this. //only advisory topic consumers need this.
((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck); ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
} else { } else {
SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
try { try {
session.getCoreServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); session.getCoreServer().createQueue(destinationName, RoutingType.ANYCAST, destinationName, null, true, false);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// ignore // ignore
} }
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString()); AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString());
if (addrSettings != null) { if (addrSettings != null) {
//see PolicyEntry //see PolicyEntry
if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) { if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
@ -136,7 +127,6 @@ public class AMQConsumer {
session.getConnection().dispatch(cc); session.getConnection().dispatch(cc);
} }
} }
} }
serverConsumer.setProtocolData(this); serverConsumer.setProtocolData(this);