This commit is contained in:
Michael Andre Pearce 2019-01-17 22:50:41 +00:00
commit 10f5b184ee
6 changed files with 60 additions and 44 deletions

View File

@ -1033,17 +1033,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} else {
Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(dest.getPhysicalName()));
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(new SimpleString(dest.getPhysicalName()));
for (Binding binding : bindings.getBindings()) {
Queue b = (Queue) binding.getBindable();
if (b.getConsumerCount() > 0) {
throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
Queue b = (Queue) binding.getBindable();
if (b.getConsumerCount() > 0) {
throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
}
if (b.isDurable()) {
throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
}
b.deleteQueue();
}
if (b.isDurable()) {
throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
}
b.deleteQueue();
}
}

View File

@ -1069,8 +1069,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
final Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(address));
return bindings.getBindings().stream().map(Binding::toManagementString).collect(Collectors.joining(","));
final Bindings bindings = server.getPostOffice().lookupBindingsForAddress(new SimpleString(address));
return bindings == null ? "" : bindings.getBindings().stream().map(Binding::toManagementString).collect(Collectors.joining(","));
} finally {
blockOnIO();
}

View File

@ -123,14 +123,18 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public String[] getQueueNames() throws Exception {
clearIO();
try {
Bindings bindings = server.getPostOffice().getBindingsForAddress(addressInfo.getName());
List<String> queueNames = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
queueNames.add(binding.getUniqueName().toString());
Bindings bindings = postOffice.lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
List<String> queueNames = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
queueNames.add(binding.getUniqueName().toString());
}
}
return queueNames.toArray(new String[queueNames.size()]);
} else {
return new String[0];
}
return queueNames.toArray(new String[queueNames.size()]);
} catch (Throwable t) {
throw new IllegalStateException(t.getMessage());
} finally {
@ -142,13 +146,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public String[] getBindingNames() throws Exception {
clearIO();
try {
Bindings bindings = server.getPostOffice().getBindingsForAddress(addressInfo.getName());
String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings()) {
bindingNames[i++] = binding.getUniqueName().toString();
Bindings bindings = postOffice.lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings()) {
bindingNames[i++] = binding.getUniqueName().toString();
}
return bindingNames;
} else {
return new String[0];
}
return bindingNames;
} catch (Throwable t) {
throw new IllegalStateException(t.getMessage());
} finally {
@ -227,10 +235,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
clearIO();
long totalMsgs = 0;
try {
Bindings bindings = server.getPostOffice().getBindingsForAddress(addressInfo.getName());
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().getMessageCount();
Bindings bindings = postOffice.lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().getMessageCount();
}
}
}
return totalMsgs;

View File

@ -639,12 +639,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public List<Queue> listQueuesForAddress(SimpleString address) throws Exception {
Bindings bindingsForAddress = getBindingsForAddress(address);
Bindings bindingsForAddress = lookupBindingsForAddress(address);
List<Queue> queues = new ArrayList<>();
for (Binding b : bindingsForAddress.getBindings()) {
if (b instanceof QueueBinding) {
Queue q = ((QueueBinding) b).getQueue();
queues.add(q);
if (bindingsForAddress != null) {
for (Binding b : bindingsForAddress.getBindings()) {
if (b instanceof QueueBinding) {
Queue q = ((QueueBinding) b).getQueue();
queues.add(q);
}
}
}
return queues;
@ -779,7 +781,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public boolean isAddressBound(final SimpleString address) throws Exception {
Bindings bindings = getBindingsForAddress(address);
Bindings bindings = lookupBindingsForAddress(address);
return bindings != null && !bindings.getBindings().isEmpty();
}

View File

@ -2999,9 +2999,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
if (expiryAddress != null) {
Bindings bindingList = postOffice.getBindingsForAddress(expiryAddress);
Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);
if (bindingList.getBindings().isEmpty()) {
if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
@ -3027,9 +3027,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final MessageReference ref,
final SimpleString deadLetterAddress) throws Exception {
if (deadLetterAddress != null) {
Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);
if (bindingList.getBindings().isEmpty()) {
if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
ref.acknowledge(tx, AckReason.KILLED, null);
} else {

View File

@ -115,16 +115,18 @@ public class ScaleDownHandler {
// perform a loop per address
for (SimpleString address : postOffice.getAddresses()) {
logger.debug("Scaling down address " + address);
Bindings bindings = postOffice.getBindingsForAddress(address);
Bindings bindings = postOffice.lookupBindingsForAddress(address);
// It will get a list of queues on this address, ordered by the number of messages
Set<Queue> queues = new TreeSet<>(new OrderQueueByNumberOfReferencesComparator());
for (Binding binding : bindings.getBindings()) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
// as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away
queue.deliverScheduledMessages();
queues.add(queue);
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
// as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away
queue.deliverScheduledMessages();
queues.add(queue);
}
}
}