This commit is contained in:
Clebert Suconic 2017-09-13 18:21:44 -04:00
commit 02b6b8c8eb
3 changed files with 53 additions and 1 deletions

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
@ -82,10 +83,13 @@ public class AMQConsumer {
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
boolean preAck = false;
if (info.isNoLocal()) {
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
//tell the connection to add the property
this.session.getConnection().setNoLocal(true);
} else {
preAck = true;
}
String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.getId().getConnectionId() + "'";
if (selector == null) {
@ -110,6 +114,8 @@ public class AMQConsumer {
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
//only advisory topic consumers need this.
((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
} else {
SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
try {

View File

@ -137,7 +137,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final SessionCallback callback;
private final boolean preAcknowledge;
private boolean preAcknowledge;
private final ManagementService managementService;
@ -1139,6 +1139,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
};
public void setPreAcknowledge(boolean preAcknowledge) {
this.preAcknowledge = preAcknowledge;
}
/**
* Internal encapsulation of the logic on sending LargeMessages.
* This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent

View File

@ -60,6 +60,7 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
@ -1585,6 +1586,47 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
assertNull(transaction);
}
@Test
public void testTempQueueLeak() throws Exception {
final Connection[] connections = new Connection[20];
try {
for (int i = 0; i < connections.length; i++) {
connections[i] = factory.createConnection();
connections[i].start();
}
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < connections.length; i++) {
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
temporaryQueue.delete();
}
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
AddressControl addressControl = null;
for (Object addressResource : addressResources) {
if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
addressControl = (AddressControl) addressResource;
}
}
assertNotNull("addressControl for temp advisory", addressControl);
//sleep a bit to allow message count to go down.
Thread.sleep(50);
assertEquals(0, addressControl.getMessageCount());
} finally {
for (Connection conn : connections) {
if (conn != null) {
conn.close();
}
}
}
}
private void checkQueueEmpty(String qName) {
PostOffice po = server.getPostOffice();
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));