NO-JIRA Adding Advisory OpenWire tests
This commit is contained in:
parent
a9b02fb68c
commit
6982244bf4
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
||||||
|
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TemporaryQueue;
|
||||||
|
import javax.jms.TemporaryTopic;
|
||||||
|
|
||||||
|
public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
//this system property is used to construct the executor in
|
||||||
|
//org.apache.activemq.transport.AbstractInactivityMonitor.createExecutor()
|
||||||
|
//and affects the pool's shutdown time. (default is 30 sec)
|
||||||
|
//set it to 2 to make tests shutdown quicker.
|
||||||
|
System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "2");
|
||||||
|
this.realStore = true;
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTempTopicLeak() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
|
||||||
|
temporaryTopic.delete();
|
||||||
|
|
||||||
|
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
|
||||||
|
|
||||||
|
for (Object queueResource : queueResources) {
|
||||||
|
|
||||||
|
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempTopic")) {
|
||||||
|
QueueControl queueControl = (QueueControl) queueResource;
|
||||||
|
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
|
||||||
|
assertNotNull("addressControl for temp advisory", queueControl);
|
||||||
|
assertEquals(0, queueControl.getMessageCount());
|
||||||
|
assertEquals(2, queueControl.getMessagesAdded());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTempQueueLeak() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
||||||
|
temporaryQueue.delete();
|
||||||
|
|
||||||
|
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
|
||||||
|
|
||||||
|
for (Object queueResource : queueResources) {
|
||||||
|
|
||||||
|
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
|
||||||
|
QueueControl queueControl = (QueueControl) queueResource;
|
||||||
|
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
|
||||||
|
assertNotNull("addressControl for temp advisory", queueControl);
|
||||||
|
assertEquals(0, queueControl.getMessageCount());
|
||||||
|
assertEquals(2, queueControl.getMessagesAdded());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTempQueueLeakManyConnections() throws Exception {
|
||||||
|
final Connection[] connections = new Connection[20];
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < connections.length; i++) {
|
||||||
|
connections[i] = factory.createConnection();
|
||||||
|
connections[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
for (int i = 0; i < connections.length; i++) {
|
||||||
|
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
||||||
|
temporaryQueue.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
|
||||||
|
|
||||||
|
for (Object addressResource : addressResources) {
|
||||||
|
|
||||||
|
if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
|
||||||
|
AddressControl addressControl = (AddressControl) addressResource;
|
||||||
|
Wait.waitFor(() -> addressControl.getMessageCount() == 0);
|
||||||
|
assertNotNull("addressControl for temp advisory", addressControl);
|
||||||
|
assertEquals(0, addressControl.getMessageCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//sleep a bit to allow message count to go down.
|
||||||
|
} finally {
|
||||||
|
for (Connection conn : connections) {
|
||||||
|
if (conn != null) {
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -60,7 +60,6 @@ import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.ActiveMQSession;
|
import org.apache.activemq.ActiveMQSession;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
|
@ -1653,46 +1652,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
||||||
assertNull(transaction);
|
assertNull(transaction);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTempQueueLeak() throws Exception {
|
|
||||||
final Connection[] connections = new Connection[20];
|
|
||||||
|
|
||||||
try {
|
|
||||||
for (int i = 0; i < connections.length; i++) {
|
|
||||||
connections[i] = factory.createConnection();
|
|
||||||
connections[i].start();
|
|
||||||
}
|
|
||||||
|
|
||||||
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
|
|
||||||
for (int i = 0; i < connections.length; i++) {
|
|
||||||
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
|
||||||
temporaryQueue.delete();
|
|
||||||
}
|
|
||||||
|
|
||||||
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
|
|
||||||
|
|
||||||
for (Object addressResource : addressResources) {
|
|
||||||
|
|
||||||
if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
|
|
||||||
AddressControl addressControl = (AddressControl) addressResource;
|
|
||||||
Wait.waitFor(() -> addressControl.getMessageCount() == 0);
|
|
||||||
assertNotNull("addressControl for temp advisory", addressControl);
|
|
||||||
assertEquals(0, addressControl.getMessageCount());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
//sleep a bit to allow message count to go down.
|
|
||||||
} finally {
|
|
||||||
for (Connection conn : connections) {
|
|
||||||
if (conn != null) {
|
|
||||||
conn.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkQueueEmpty(String qName) {
|
private void checkQueueEmpty(String qName) {
|
||||||
PostOffice po = server.getPostOffice();
|
PostOffice po = server.getPostOffice();
|
||||||
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));
|
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));
|
||||||
|
|
Loading…
Reference in New Issue