This commit is contained in:
Clebert Suconic 2018-09-25 18:07:32 -04:00
commit 2453978f41
6 changed files with 102 additions and 14 deletions

View File

@ -104,6 +104,18 @@ public interface AddressControl {
@Attribute(desc = "number of messages added to all the queues for this address") @Attribute(desc = "number of messages added to all the queues for this address")
long getMessageCount(); long getMessageCount();
/**
* Returns the number of messages routed to one or more bindings
*/
@Attribute(desc = "number of messages routed to one or more bindings")
long getRoutedMessageCount();
/**
* Returns the number of messages not routed to any bindings
*/
@Attribute(desc = "number of messages not routed to any bindings")
long getUnRoutedMessageCount();
/** /**
* @param headers the message headers and properties to set. Can only * @param headers the message headers and properties to set. Can only

View File

@ -267,6 +267,16 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return getMessageCount(DurabilityType.ALL); return getMessageCount(DurabilityType.ALL);
} }
@Override
public long getRoutedMessageCount() {
return addressInfo.getRoutedMessageCount();
}
@Override
public long getUnRoutedMessageCount() {
return addressInfo.getUnRoutedMessageCount();
}
@Override @Override
public String sendMessage(final Map<String, String> headers, public String sendMessage(final Map<String, String> headers,

View File

@ -842,12 +842,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw new IllegalStateException("Message cannot be routed more than once"); throw new IllegalStateException("Message cannot be routed more than once");
} }
setPagingStore(context.getAddress(message), message); final SimpleString address = context.getAddress(message);
setPagingStore(address, message);
AtomicBoolean startedTX = new AtomicBoolean(false); AtomicBoolean startedTX = new AtomicBoolean(false);
final SimpleString address = context.getAddress(message);
applyExpiryDelay(message, address); applyExpiryDelay(message, address);
if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) { if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
@ -856,23 +856,24 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.cleanupInternalProperties(); message.cleanupInternalProperties();
Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message)); Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
AddressInfo addressInfo = addressManager.getAddressInfo(address);
// TODO auto-create queues here?
// first check for the auto-queue creation thing
if (bindings == null) {
// There is no queue with this address, we will check if it needs to be created
// if (queueCreator.create(address)) {
// TODO: this is not working!!!!
// reassign bindings if it was created
// bindings = addressManager.getBindingsForRoutingAddress(address);
// }
}
if (bindingMove != null) { if (bindingMove != null) {
bindingMove.route(message, context); bindingMove.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
} else if (bindings != null) { } else if (bindings != null) {
bindings.route(message, context); bindings.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
} else { } else {
if (addressInfo != null) {
addressInfo.incrementUnRoutedMessageCount();
}
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message); logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message);

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class AddressInfo { public class AddressInfo {
@ -36,6 +37,14 @@ public class AddressInfo {
private boolean internal = false; private boolean internal = false;
private volatile long routedMessageCount = 0;
private static final AtomicLongFieldUpdater<AddressInfo> routedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "routedMessageCount");
private volatile long unRoutedMessageCount = 0;
private static final AtomicLongFieldUpdater<AddressInfo> unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount");
public AddressInfo(SimpleString name) { public AddressInfo(SimpleString name) {
this(name, EnumSet.noneOf(RoutingType.class)); this(name, EnumSet.noneOf(RoutingType.class));
} }
@ -155,4 +164,20 @@ public class AddressInfo {
return this; return this;
} }
public long incrementRoutedMessageCount() {
return routedMessageCountUpdater.incrementAndGet(this);
}
public long incrementUnRoutedMessageCount() {
return unRoutedMessageCountUpdater.incrementAndGet(this);
}
public long getRoutedMessageCount() {
return routedMessageCountUpdater.get(this);
}
public long getUnRoutedMessageCount() {
return unRoutedMessageCountUpdater.get(this);
}
} }

View File

@ -340,6 +340,36 @@ public class AddressControlTest extends ManagementTestBase {
assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100)); assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100));
} }
@Test
public void testGetRoutedMessageCounts() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
assertEquals(0, addressControl.getMessageCount());
ClientProducer producer = session.createProducer(address.toString());
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 0, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
session.createQueue(address, RoutingType.ANYCAST, address);
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 1, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
session.deleteQueue(address);
session.deleteQueue(address.concat('2'));
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 2, 2000, 100));
}
@Test @Test
public void testSendMessage() throws Exception { public void testSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();

View File

@ -103,6 +103,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
return (long) proxy.retrieveAttributeValue("messageCount"); return (long) proxy.retrieveAttributeValue("messageCount");
} }
@Override
public long getRoutedMessageCount() {
return (long) proxy.retrieveAttributeValue("routedMessageCount");
}
@Override
public long getUnRoutedMessageCount() {
return (long) proxy.retrieveAttributeValue("unRoutedMessageCount");
}
@Override @Override
public String sendMessage(Map<String, String> headers, public String sendMessage(Map<String, String> headers,
int type, int type,