ARTEMIS-1419 OpenWire advisory message never deleted

By default, every openwire connection will create a queue
under the multicast address ActiveMQ.Advisory.TempQueue.
If a openwire client is create temporary queues these queues
will fill up with messages for as long as the associated
openwire connection is alive. It appears these messages
do not get consumed from the queues.

The reason behind is that advisory messages don't require
acknowledgement so the messages stay at the queue.
This commit is contained in:
Howard Gao 2017-09-13 20:50:56 +08:00 committed by Clebert Suconic
parent 17083d69e0
commit e4fb722ad8
3 changed files with 53 additions and 1 deletions

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
@ -82,10 +83,13 @@ public class AMQConsumer {
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
boolean preAck = false;
if (info.isNoLocal()) { if (info.isNoLocal()) {
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) { if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
//tell the connection to add the property //tell the connection to add the property
this.session.getConnection().setNoLocal(true); this.session.getConnection().setNoLocal(true);
} else {
preAck = true;
} }
String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.getId().getConnectionId() + "'"; String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.getId().getConnectionId() + "'";
if (selector == null) { if (selector == null) {
@ -110,6 +114,8 @@ public class AMQConsumer {
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
//only advisory topic consumers need this.
((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
} else { } else {
SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName())); SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
try { try {

View File

@ -137,7 +137,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final SessionCallback callback; private final SessionCallback callback;
private final boolean preAcknowledge; private boolean preAcknowledge;
private final ManagementService managementService; private final ManagementService managementService;
@ -1139,6 +1139,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
}; };
public void setPreAcknowledge(boolean preAcknowledge) {
this.preAcknowledge = preAcknowledge;
}
/** /**
* Internal encapsulation of the logic on sending LargeMessages. * Internal encapsulation of the logic on sending LargeMessages.
* This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent * This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent

View File

@ -60,6 +60,7 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
@ -1585,6 +1586,47 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
assertNull(transaction); assertNull(transaction);
} }
@Test
public void testTempQueueLeak() throws Exception {
final Connection[] connections = new Connection[20];
try {
for (int i = 0; i < connections.length; i++) {
connections[i] = factory.createConnection();
connections[i].start();
}
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < connections.length; i++) {
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
temporaryQueue.delete();
}
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
AddressControl addressControl = null;
for (Object addressResource : addressResources) {
if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
addressControl = (AddressControl) addressResource;
}
}
assertNotNull("addressControl for temp advisory", addressControl);
//sleep a bit to allow message count to go down.
Thread.sleep(50);
assertEquals(0, addressControl.getMessageCount());
} finally {
for (Connection conn : connections) {
if (conn != null) {
conn.close();
}
}
}
}
private void checkQueueEmpty(String qName) { private void checkQueueEmpty(String qName) {
PostOffice po = server.getPostOffice(); PostOffice po = server.getPostOffice();
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName)); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));