NO-JIRA: Cleanup tests

This commit is contained in:
Clebert Suconic 2017-03-24 18:32:38 -04:00
parent f2e0891b0e
commit 07f7916ed3
7 changed files with 1 additions and 1455 deletions

View File

@ -1,203 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
public class ExclusiveConsumerStartupDestinationTest extends EmbeddedBrokerTestSupport {
private static final String VM_BROKER_URL = "vm://localhost";
@Override
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
PolicyMap map = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setAllConsumersExclusiveByDefault(true);
map.setDefaultEntry(entry);
answer.setDestinationPolicy(map);
return answer;
}
protected String getBrokerConfigUri() {
return "org/apache/activemq/broker/exclusive-consumer-startup-destination.xml";
}
private Connection createConnection(final boolean start) throws JMSException {
ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
Connection conn = cf.createConnection();
if (start) {
conn.start();
}
return conn;
}
public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1");
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
// TODO need two send a 2nd message - bug AMQ-1024
// producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer.receive(100));
assertNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
//Exclusive consumer not implemented yet.
public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession1 = null;
Session exclusiveSession2 = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// This creates the exclusive consumer first which avoids AMQ-1024
// bug.
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2");
MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer1.receive(100));
assertNull(exclusiveConsumer2.receive(100));
assertNull(fallbackConsumer.receive(100));
// Close the exclusive consumer to verify the non-exclusive consumer
// takes over
exclusiveConsumer1.close();
producer.send(msg);
producer.send(msg);
assertNotNull("Should have received a message", exclusiveConsumer2.receive(100));
assertNull("Should not have received a message", fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// This creates the exclusive consumer first which avoids AMQ-1024
// bug.
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3");
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer.receive(100));
assertNull(fallbackConsumer.receive(100));
// Close the exclusive consumer to verify the non-exclusive consumer
// takes over
exclusiveConsumer.close();
producer.send(msg);
assertNotNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
}

View File

@ -1,357 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
public class ExclusiveConsumerTest extends TestCase {
private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
public ExclusiveConsumerTest(String name) {
super(name);
}
@Override
protected void setUp() throws Exception {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
TcpTransportFactory.clearService();
super.tearDown();
}
private Connection createConnection(final boolean start) throws JMSException {
ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
Connection conn = cf.createConnection();
if (start) {
conn.start();
}
return conn;
}
public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true");
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
// TODO need two send a 2nd message - bug AMQ-1024
// producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer.receive(100));
assertNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true");
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer.receive(100));
assertNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession1 = null;
Session exclusiveSession2 = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// This creates the exclusive consumer first which avoids AMQ-1024
// bug.
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true");
MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer1.receive(100));
assertNull(exclusiveConsumer2.receive(100));
assertNull(fallbackConsumer.receive(100));
// Close the exclusive consumer to verify the non-exclusive consumer
// takes over
exclusiveConsumer1.close();
producer.send(msg);
producer.send(msg);
assertNotNull(exclusiveConsumer2.receive(100));
assertNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession1 = null;
Session exclusiveSession2 = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// This creates the exclusive consumer first which avoids AMQ-1024
// bug.
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true");
MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer1.receive(100));
assertNull(exclusiveConsumer2.receive(100));
assertNull(fallbackConsumer.receive(100));
// Close the exclusive consumer to verify the non-exclusive consumer
// takes over
exclusiveConsumer1.close();
producer.send(msg);
producer.send(msg);
assertNotNull(exclusiveConsumer2.receive(1000));
assertNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// This creates the exclusive consumer first which avoids AMQ-1024
// bug.
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true");
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer.receive(100));
assertNull(fallbackConsumer.receive(100));
// Close the exclusive consumer to verify the non-exclusive consumer
// takes over
exclusiveConsumer.close();
producer.send(msg);
assertNotNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException {
Connection conn = createConnection(true);
Session exclusiveSession = null;
Session fallbackSession = null;
Session senderSession = null;
try {
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// This creates the exclusive consumer first which avoids AMQ-1024
// bug.
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true");
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4");
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4");
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
producer.send(msg);
Thread.sleep(100);
// Verify exclusive consumer receives the message.
assertNotNull(exclusiveConsumer.receive(100));
assertNull(fallbackConsumer.receive(100));
// Close the exclusive consumer to verify the non-exclusive consumer
// takes over
exclusiveConsumer.close();
producer.send(msg);
// Verify other non-exclusive consumer receices the message.
assertNotNull(fallbackConsumer.receive(100));
// Create exclusive consumer to determine if it will start receiving
// the messages.
exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
producer.send(msg);
assertNotNull(exclusiveConsumer.receive(100));
assertNull(fallbackConsumer.receive(100));
} finally {
fallbackSession.close();
senderSession.close();
conn.close();
}
}
}

View File

@ -1,151 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQQueue;
/**
* Test cases used to test the JMS message exclusive consumers.
*/
public class JMSExclusiveConsumerTest extends JmsTestSupport {
public int deliveryMode;
public static Test suite() {
return suite(JMSExclusiveConsumerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
public void initCombosForTestRoundRobinDispatchOnNonExclusive() {
addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
}
/**
* Shows that by default messages are round robined across a set of
* consumers.
*
* @throws Exception
*/
public void testRoundRobinDispatchOnNonExclusive() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(deliveryMode);
MessageConsumer consumer1 = session.createConsumer(destination);
MessageConsumer consumer2 = session.createConsumer(destination);
// Send the messages
producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd"));
Message m;
m = consumer2.receive(1000);
assertNotNull(m);
m = consumer1.receive(1000);
assertNotNull(m);
assertNull(consumer1.receiveNoWait());
assertNull(consumer2.receiveNoWait());
}
public void initCombosForTestDispatchExclusive() {
addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
}
/**
* Shows that if the "?consumer.exclusive=true" option is added to
* destination, then all messages are routed to 1 consumer.
*
* @throws Exception
*/
public void testDispatchExclusive() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST?consumer.exclusive=true");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(deliveryMode);
MessageConsumer consumer1 = session.createConsumer(destination);
MessageConsumer consumer2 = session.createConsumer(destination);
// Send the messages
producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd"));
producer.send(session.createTextMessage("3nd"));
Message m;
m = consumer2.receive(1000);
if (m != null) {
// Consumer 2 should get all the messages.
for (int i = 0; i < 2; i++) {
m = consumer2.receive(1000);
assertNotNull(m);
}
} else {
// Consumer 1 should get all the messages.
for (int i = 0; i < 3; i++) {
m = consumer1.receive(1000);
assertNotNull(m);
}
}
assertNull(consumer1.receiveNoWait());
assertNull(consumer2.receiveNoWait());
}
public void testMixExclusiveWithNonExclusive() throws Exception {
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=true");
ActiveMQQueue nonExclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=false");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer nonExCon = session.createConsumer(nonExclusiveQueue);
MessageConsumer exCon = session.createConsumer(exclusiveQueue);
MessageProducer prod = session.createProducer(exclusiveQueue);
prod.send(session.createMessage());
prod.send(session.createMessage());
prod.send(session.createMessage());
Message m;
for (int i = 0; i < 3; i++) {
m = exCon.receive(1000);
assertNotNull(m);
m = nonExCon.receive(1000);
assertNull(m);
}
}
}

View File

@ -22,18 +22,11 @@ import javax.jms.MessageProducer;
import javax.jms.QueueBrowser; import javax.jms.QueueBrowser;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.List;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -109,191 +102,6 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
} }
public void testBatchSendBrowseReceive() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
TextMessage[] outbound = new TextMessage[10];
for (int i = 0; i < 10; i++) {
outbound[i] = session.createTextMessage(i + " Message");
}
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
consumer.close();
for (int i = 0; i < outbound.length; i++) {
producer.send(outbound[i]);
}
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
for (int i = 0; i < outbound.length; i++) {
assertTrue("should have a", enumeration.hasMoreElements());
assertEquals(outbound[i], enumeration.nextElement());
}
browser.close();
for (int i = 0; i < outbound.length; i++) {
producer.send(outbound[i]);
}
// verify second batch is visible to browse
browser = session.createBrowser(destination);
enumeration = browser.getEnumeration();
for (int j = 0; j < 2; j++) {
for (int i = 0; i < outbound.length; i++) {
assertTrue("should have a", enumeration.hasMoreElements());
assertEquals("j=" + j + ", i=" + i, outbound[i].getText(), ((TextMessage) enumeration.nextElement()).getText());
}
}
browser.close();
consumer = session.createConsumer(destination);
for (int i = 0; i < outbound.length * 2; i++) {
assertNotNull("Got message: " + i, consumer.receive(2000));
}
consumer.close();
}
public void initCombosForTestBatchSendJmxBrowseReceive() {
addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
}
public void testBatchSendJmxBrowseReceive() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
TextMessage[] outbound = new TextMessage[10];
for (int i = 0; i < 10; i++) {
outbound[i] = session.createTextMessage(i + " Message");
}
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
consumer.close();
for (int i = 0; i < outbound.length; i++) {
producer.send(outbound[i]);
}
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
LOG.info("Create QueueView MBean...");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
long concount = proxy.getConsumerCount();
LOG.info("Consumer Count :" + concount);
long messcount = proxy.getQueueSize();
LOG.info("current number of messages in the queue :" + messcount);
// lets browse
CompositeData[] compdatalist = proxy.browse();
if (compdatalist.length == 0) {
fail("There is no message in the queue:");
}
String[] messageIDs = new String[compdatalist.length];
for (int i = 0; i < compdatalist.length; i++) {
CompositeData cdata = compdatalist[i];
if (i == 0) {
LOG.info("Columns: " + cdata.getCompositeType().keySet());
}
messageIDs[i] = (String) cdata.get("JMSMessageID");
LOG.info("message " + i + " : " + cdata.values());
}
TabularData table = proxy.browseAsTable();
LOG.info("Found tabular data: " + table);
assertTrue("Table should not be empty!", table.size() > 0);
assertEquals("Queue size", outbound.length, proxy.getQueueSize());
assertEquals("Queue size", outbound.length, compdatalist.length);
assertEquals("Queue size", outbound.length, table.size());
LOG.info("Send another 10");
for (int i = 0; i < outbound.length; i++) {
producer.send(outbound[i]);
}
LOG.info("Browse again");
messcount = proxy.getQueueSize();
LOG.info("current number of messages in the queue :" + messcount);
compdatalist = proxy.browse();
if (compdatalist.length == 0) {
fail("There is no message in the queue:");
}
messageIDs = new String[compdatalist.length];
for (int i = 0; i < compdatalist.length; i++) {
CompositeData cdata = compdatalist[i];
if (i == 0) {
LOG.info("Columns: " + cdata.getCompositeType().keySet());
}
messageIDs[i] = (String) cdata.get("JMSMessageID");
LOG.info("message " + i + " : " + cdata.values());
}
table = proxy.browseAsTable();
LOG.info("Found tabular data: " + table);
assertTrue("Table should not be empty!", table.size() > 0);
assertEquals("Queue size", outbound.length * 2, proxy.getQueueSize());
assertEquals("Queue size", outbound.length * 2, compdatalist.length);
assertEquals("Queue size", outbound.length * 2, table.size());
consumer = session.createConsumer(destination);
for (int i = 0; i < outbound.length * 2; i++) {
assertNotNull("Got message: " + i, consumer.receive(2000));
}
consumer.close();
}
//ref: https://issues.apache.org/jira/browse/ARTEMIS-384
public void testBrowseReceive() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
connection.start();
// create consumer
MessageConsumer consumer = session.createConsumer(destination);
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
Message[] outbound = new Message[]{session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
MessageProducer producer = session.createProducer(destination);
producer.send(outbound[0]);
// create browser first
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
// browse the first message
assertTrue("should have received the first message", enumeration.hasMoreElements());
assertEquals(outbound[0], enumeration.nextElement());
// Receive the first message.
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
browser.close();
producer.close();
}
public void testLargeNumberOfMessages() throws Exception { public void testLargeNumberOfMessages() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -331,101 +139,6 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
producer.close(); producer.close();
} }
public void testQueueBrowserWith2Consumers() throws Exception {
final int numMessages = 1000;
connection.setAlwaysSyncSend(false);
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
ActiveMQQueue destinationPrefetch10 = new ActiveMQQueue("TEST?jms.prefetchSize=10");
ActiveMQQueue destinationPrefetch1 = new ActiveMQQueue("TEST?jms.prefetchsize=1");
connection.start();
ActiveMQConnection connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
connection2.start();
connections.add(connection2);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destinationPrefetch10);
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
for (int i = 0; i < numMessages; i++) {
TextMessage message = session.createTextMessage("Message: " + i);
producer.send(message);
}
QueueBrowser browser = session2.createBrowser(destinationPrefetch1);
Enumeration<Message> browserView = browser.getEnumeration();
List<Message> messages = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
Message m1 = consumer.receive(5000);
assertNotNull("m1 is null for index: " + i, m1);
messages.add(m1);
}
int i = 0;
for (; i < numMessages && browserView.hasMoreElements(); i++) {
Message m1 = messages.get(i);
Message m2 = browserView.nextElement();
assertNotNull("m2 is null for index: " + i, m2);
assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
}
// currently browse max page size is ignored for a queue browser consumer
// only guarantee is a page size - but a snapshot of pagedinpending is
// used so it is most likely more
assertTrue("got at least our expected minimum in the browser: ", i > BaseDestination.MAX_PAGE_SIZE);
assertFalse("nothing left in the browser", browserView.hasMoreElements());
assertNull("consumer finished", consumer.receiveNoWait());
}
public void testBrowseClose() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
connection.start();
TextMessage[] outbound = new TextMessage[]{session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
// create consumer
MessageConsumer consumer = session.createConsumer(destination);
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
MessageProducer producer = session.createProducer(destination);
producer.send(outbound[0]);
producer.send(outbound[1]);
producer.send(outbound[2]);
// create browser first
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
// browse some messages
assertEquals(outbound[0], enumeration.nextElement());
assertEquals(outbound[1], enumeration.nextElement());
//assertEquals(outbound[2], (Message) enumeration.nextElement());
browser.close();
// Receive the first message.
TextMessage msg = (TextMessage) consumer.receive(1000);
assertEquals("Expected " + outbound[0].getText() + " but received " + msg.getText(), outbound[0], msg);
msg = (TextMessage) consumer.receive(1000);
assertEquals("Expected " + outbound[1].getText() + " but received " + msg.getText(), outbound[1], msg);
msg = (TextMessage) consumer.receive(1000);
assertEquals("Expected " + outbound[2].getText() + " but received " + msg.getText(), outbound[2], msg);
consumer.close();
producer.close();
}
@Override @Override
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService brokerService = super.createBroker(); BrokerService brokerService = super.createBroker();

View File

@ -97,67 +97,6 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection2); assertNoMessagesLeft(connection2);
} }
public void initCombosForTestQueueBrowserWith2Consumers() {
addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
}
//https://issues.apache.org/jira/browse/ARTEMIS-384
public void testQueueBrowserWith2Consumers() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(10);
connection1.request(consumerInfo1);
// Send the messages
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
//as the messages are sent async - need to synchronize the last
//one to ensure they arrive in the order we want
connection1.request(createMessage(producerInfo, destination, deliveryMode));
// Setup a second connection with a queue browser.
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
consumerInfo2.setPrefetchSize(1);
consumerInfo2.setBrowser(true);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.request(consumerInfo2);
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1);
assertNotNull("m1 is null for index: " + i, m1);
messages.add(m1);
}
for (int i = 0; i < 4; i++) {
Message m1 = messages.get(i);
Message m2 = receiveMessage(connection2);
assertNotNull("m2 is null for index: " + i, m2);
assertEquals(m1.getMessageId(), m2.getMessageId());
connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
}
assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2);
}
/* /*
* change the order of the above test * change the order of the above test
*/ */
@ -212,73 +151,9 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection2); assertNoMessagesLeft(connection2);
} }
public void testQueueBrowserWith2ConsumersInterleaved() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
deliveryMode = DeliveryMode.NON_PERSISTENT;
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(10);
connection1.request(consumerInfo1);
// Send the messages
connection1.request(createMessage(producerInfo, destination, deliveryMode));
// Setup a second connection with a queue browser.
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
consumerInfo2.setPrefetchSize(1);
consumerInfo2.setBrowser(true);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.request(consumerInfo2);
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
//as the messages are sent async - need to synchronize the last
//one to ensure they arrive in the order we want
connection1.request(createMessage(producerInfo, destination, deliveryMode));
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1);
assertNotNull("m1 is null for index: " + i, m1);
messages.add(m1);
}
for (int i = 0; i < 4; i++) {
Message m1 = messages.get(i);
Message m2 = receiveMessage(connection2);
assertNotNull("m2 is null for index: " + i, m2);
assertEquals(m1.getMessageId(), m2.getMessageId());
connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
}
assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2);
}
public void initCombosForTestConsumerPrefetchAndStandardAck() {
addCombinationValues("deliveryMode", new Object[]{
// Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[]{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
}
public void testConsumerPrefetchAndStandardAck() throws Exception { public void testConsumerPrefetchAndStandardAck() throws Exception {
destinationType = (byte)1;
// Start a producer and consumer // Start a producer and consumer
StubConnection connection = createConnection(); StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo(); ConnectionInfo connectionInfo = createConnectionInfo();

View File

@ -1,157 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.SocketProxy;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutClientTest.class);
private String messageTextPrefix = "";
private EmbeddedJMS server;
@BeforeClass
public static void beforeTest() throws Exception {
//this thread keeps alive in original test too. Exclude it.
ThreadLeakCheckRule.addKownThread("WriteTimeoutFilter-Timeout");
}
@AfterClass
public static void afterTest() throws Exception {
ThreadLeakCheckRule.removeKownThread("WriteTimeoutFilter-Timeout");
}
@Before
public void setUp() throws Exception {
Configuration config = this.createConfig(0);
server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
server.start();
}
@After
public void tearDown() throws Exception {
server.stop();
}
@Test
public void testSendWithClientWriteTimeout() throws Exception {
final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
messageTextPrefix = initMessagePrefix(80 * 1024);
URI tcpBrokerUri = new URI(newURI(0));
LOG.info("consuming using uri: " + tcpBrokerUri);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(dest);
SocketProxy proxy = new SocketProxy();
try {
proxy.setTarget(tcpBrokerUri);
proxy.open();
ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
final Connection pc = pFactory.createConnection();
try {
pc.start();
proxy.pause();
final int messageCount = 20;
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendMessages(pc, dest, messageCount);
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
// wait for timeout and reconnect
TimeUnit.SECONDS.sleep(8);
proxy.goOn();
for (int i = 0; i < messageCount; i++) {
TextMessage m = (TextMessage) consumer.receive(10000);
Assert.assertNotNull("Got message " + i + " after reconnect", m);
}
Assert.assertNull(consumer.receive(5000));
} finally {
pc.close();
}
} finally {
proxy.close();
c.close();
}
}
protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
sendMessages(session, destination, count);
session.close();
}
protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
MessageProducer producer = session.createProducer(destination);
sendMessages(session, producer, count);
producer.close();
}
protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException {
for (int i = 0; i < count; i++) {
producer.send(session.createTextMessage(messageTextPrefix + i));
}
}
private String initMessagePrefix(int i) {
byte[] content = new byte[i];
return new String(content);
}
}

View File

@ -1,174 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SoWriteTimeoutTest extends JmsTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class);
final int receiveBufferSize = 16 * 1024;
public String brokerTransportScheme = "nio";
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.setPersistent(true);
broker.setDeleteAllMessagesOnStartup(true);
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setConcurrentStoreAndDispatchQueues(false);
broker.setPersistenceAdapter(adapter);
broker.addConnector("tcp://localhost:61616");
broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000");
if ("nio".equals(brokerTransportScheme)) {
broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + receiveBufferSize + "&trace=true");
}
return broker;
}
public void initCombosForTestWriteTimeout() {
addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
}
public void testWriteTimeout() throws Exception {
Destination dest = new ActiveMQQueue("testWriteTimeout");
messageTextPrefix = initMessagePrefix(8 * 1024);
sendMessages(dest, 500);
URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
LOG.info("consuming using uri: " + tcpBrokerUri);
SocketProxy proxy = new SocketProxy();
proxy.setTarget(tcpBrokerUri);
proxy.setReceiveBufferSize(receiveBufferSize);
proxy.open();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(proxy.getUrl());
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(dest);
proxy.pause();
// writes should back up... writeTimeout will kick in and abort the connection
TimeUnit.SECONDS.sleep(10);
proxy.goOn();
assertNotNull("can receive buffered messages", consumer.receive(500));
try {
session.commit();
fail("expect commit to fail as server has aborted writeTimeout connection");
} catch (JMSException expected) {
}
}
public void testWriteTimeoutStompNio() throws Exception {
ActiveMQQueue dest = new ActiveMQQueue("testWriteTimeout");
messageTextPrefix = initMessagePrefix(8 * 1024);
sendMessages(dest, 500);
URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(2).getConnectUri());
LOG.info("consuming using uri: " + stompBrokerUri);
SocketProxy proxy = new SocketProxy();
proxy.setTarget(new URI("tcp://localhost:" + stompBrokerUri.getPort()));
proxy.setReceiveBufferSize(receiveBufferSize);
proxy.open();
StompConnection stompConnection = new StompConnection();
stompConnection.open(new Socket("localhost", proxy.getUrl().getPort()));
stompConnection.getStompSocket().setTcpNoDelay(true);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:" + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// ensure dispatch has started before pause
frame = stompConnection.receiveFrame();
System.out.println("frame: " + frame);
assertTrue(frame.startsWith("MESSAGE"));
proxy.pause();
// writes should back up... writeTimeout will kick in and abort the connection
TimeUnit.SECONDS.sleep(1);
// see the blocked threads
//dumpAllThreads("blocked on write");
// abort should be done after this
TimeUnit.SECONDS.sleep(10);
proxy.goOn();
// get a buffered message
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
// verify connection is dead
try {
for (int i = 0; i < 200; i++) {
stompConnection.send(dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
}
fail("expected send to fail with timeout out connection");
} catch (SocketException expected) {
LOG.info("got exception on send after timeout: " + expected);
}
}
private String initMessagePrefix(int i) {
byte[] content = new byte[i];
return new String(content);
}
@Override
protected void setUp() throws Exception {
BrokerService.disableWrapper = true;
setAutoFail(true);
super.setUp();
}
public static Test suite() {
return suite(SoWriteTimeoutTest.class);
}
}