Clean up tests, remove hard coded ports, add test timeouts, move data
dirs into target so they get removed on clean.
This commit is contained in:
Timothy Bish 2015-07-09 17:35:32 -04:00
parent 257a79de07
commit ba1bd264d9
17 changed files with 587 additions and 461 deletions

View File

@ -16,6 +16,13 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.beans.IntrospectionException;
import java.beans.PropertyDescriptor;
import java.util.Arrays;
@ -26,13 +33,11 @@ import javax.jms.Session;
import javax.jms.Topic;
import javax.resource.spi.InvalidPropertyException;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Before;
import org.junit.Test;
/**
*
*/
public class ActiveMQActivationSpecTest extends TestCase {
public class ActiveMQActivationSpecTest {
private static final String DESTINATION = "defaultQueue";
private static final String DESTINATION_TYPE = Queue.class.getName();
@ -46,12 +51,8 @@ public class ActiveMQActivationSpecTest extends TestCase {
private PropertyDescriptor clientIdProperty;
private PropertyDescriptor subscriptionNameProperty;
public ActiveMQActivationSpecTest(String name) {
super(name);
}
protected void setUp() throws Exception {
super.setUp();
@Before
public void setUp() throws Exception {
activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestination(DESTINATION);
@ -65,39 +66,46 @@ public class ActiveMQActivationSpecTest extends TestCase {
subscriptionNameProperty = new PropertyDescriptor("subscriptionName", ActiveMQActivationSpec.class);
}
@Test(timeout = 60000)
public void testDefaultContructionValidation() throws IntrospectionException {
PropertyDescriptor[] expected = {destinationTypeProperty, destinationProperty};
assertActivationSpecInvalid(new ActiveMQActivationSpec(), expected);
}
@Test(timeout = 60000)
public void testMinimalSettings() {
assertEquals(DESTINATION, activationSpec.getDestination());
assertEquals(DESTINATION_TYPE, activationSpec.getDestinationType());
assertActivationSpecValid();
}
@Test(timeout = 60000)
public void testNoDestinationTypeFailure() {
activationSpec.setDestinationType(null);
PropertyDescriptor[] expected = {destinationTypeProperty};
assertActivationSpecInvalid(expected);
}
@Test(timeout = 60000)
public void testInvalidDestinationTypeFailure() {
activationSpec.setDestinationType("foobar");
PropertyDescriptor[] expected = {destinationTypeProperty};
assertActivationSpecInvalid(expected);
}
@Test(timeout = 60000)
public void testQueueDestinationType() {
activationSpec.setDestinationType(Queue.class.getName());
assertActivationSpecValid();
}
@Test(timeout = 60000)
public void testTopicDestinationType() {
activationSpec.setDestinationType(Topic.class.getName());
assertActivationSpecValid();
}
@Test(timeout = 60000)
public void testSuccessfulCreateQueueDestination() {
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination(DESTINATION);
@ -108,6 +116,7 @@ public class ActiveMQActivationSpecTest extends TestCase {
assertTrue("Destination is not a Queue", destination instanceof Queue);
}
@Test(timeout = 60000)
public void testSuccessfulCreateTopicDestination() {
activationSpec.setDestinationType(Topic.class.getName());
activationSpec.setDestination(DESTINATION);
@ -118,6 +127,7 @@ public class ActiveMQActivationSpecTest extends TestCase {
assertTrue("Destination is not a Topic", destination instanceof Topic);
}
@Test(timeout = 60000)
public void testCreateDestinationIncorrectType() {
activationSpec.setDestinationType(null);
activationSpec.setDestination(DESTINATION);
@ -125,6 +135,7 @@ public class ActiveMQActivationSpecTest extends TestCase {
assertNull("ActiveMQDestination should not have been created", destination);
}
@Test(timeout = 60000)
public void testCreateDestinationIncorrectDestinationName() {
activationSpec.setDestinationType(Topic.class.getName());
activationSpec.setDestination(null);
@ -132,7 +143,9 @@ public class ActiveMQActivationSpecTest extends TestCase {
assertNull("ActiveMQDestination should not have been created", destination);
}
//----------- acknowledgeMode tests
//----------- acknowledgeMode tests
@Test(timeout = 60000)
public void testDefaultAcknowledgeModeSetCorrectly() {
assertEquals("Incorrect default value", ActiveMQActivationSpec.AUTO_ACKNOWLEDGE_MODE,
activationSpec.getAcknowledgeMode());
@ -140,14 +153,16 @@ public class ActiveMQActivationSpecTest extends TestCase {
activationSpec.getAcknowledgeModeForSession());
}
@Test(timeout = 60000)
public void testInvalidAcknowledgeMode() {
activationSpec.setAcknowledgeMode("foobar");
PropertyDescriptor[] expected = {acknowledgeModeProperty};
assertActivationSpecInvalid(expected);
assertEquals("Incorrect acknowledge mode", ActiveMQActivationSpec.INVALID_ACKNOWLEDGE_MODE,
assertEquals("Incorrect acknowledge mode", ActiveMQActivationSpec.INVALID_ACKNOWLEDGE_MODE,
activationSpec.getAcknowledgeModeForSession());
}
@Test(timeout = 60000)
public void testNoAcknowledgeMode() {
activationSpec.setAcknowledgeMode(null);
PropertyDescriptor[] expected = {acknowledgeModeProperty};
@ -156,6 +171,7 @@ public class ActiveMQActivationSpecTest extends TestCase {
activationSpec.getAcknowledgeModeForSession());
}
@Test(timeout = 60000)
public void testSettingAutoAcknowledgeMode() {
activationSpec.setAcknowledgeMode(ActiveMQActivationSpec.AUTO_ACKNOWLEDGE_MODE);
assertActivationSpecValid();
@ -163,6 +179,7 @@ public class ActiveMQActivationSpecTest extends TestCase {
activationSpec.getAcknowledgeModeForSession());
}
@Test(timeout = 60000)
public void testSettingDupsOkAcknowledgeMode() {
activationSpec.setAcknowledgeMode(ActiveMQActivationSpec.DUPS_OK_ACKNOWLEDGE_MODE);
assertActivationSpecValid();
@ -170,39 +187,46 @@ public class ActiveMQActivationSpecTest extends TestCase {
activationSpec.getAcknowledgeModeForSession());
}
//----------- subscriptionDurability tests
//----------- subscriptionDurability tests
@Test(timeout = 60000)
public void testDefaultSubscriptionDurabilitySetCorrectly() {
assertEquals("Incorrect default value", ActiveMQActivationSpec.NON_DURABLE_SUBSCRIPTION,
activationSpec.getSubscriptionDurability());
assertEquals("Incorrect default value", ActiveMQActivationSpec.NON_DURABLE_SUBSCRIPTION, activationSpec.getSubscriptionDurability());
}
@Test(timeout = 60000)
public void testInvalidSubscriptionDurability() {
activationSpec.setSubscriptionDurability("foobar");
PropertyDescriptor[] expected = {subscriptionDurabilityProperty};
assertActivationSpecInvalid(expected);
}
@Test(timeout = 60000)
public void testNullSubscriptionDurability() {
activationSpec.setSubscriptionDurability(null);
PropertyDescriptor[] expected = {subscriptionDurabilityProperty};
assertActivationSpecInvalid(expected);
}
@Test(timeout = 60000)
public void testSettingNonDurableSubscriptionDurability() {
activationSpec.setSubscriptionDurability(ActiveMQActivationSpec.NON_DURABLE_SUBSCRIPTION);
assertActivationSpecValid();
}
//----------- durable subscriber tests
//----------- durable subscriber tests
@Test(timeout = 60000)
public void testValidDurableSubscriber() {
activationSpec.setDestinationType(Topic.class.getName());
activationSpec.setSubscriptionDurability(ActiveMQActivationSpec.DURABLE_SUBSCRIPTION);
activationSpec.setClientId("foobar");
activationSpec.setSubscriptionName("foobar");
assertActivationSpecValid();
assertActivationSpecValid();
assertTrue(activationSpec.isDurableSubscription());
}
@Test(timeout = 60000)
public void testDurableSubscriberWithQueueDestinationTypeFailure() {
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setSubscriptionDurability(ActiveMQActivationSpec.DURABLE_SUBSCRIPTION);
@ -211,7 +235,8 @@ public class ActiveMQActivationSpecTest extends TestCase {
PropertyDescriptor[] expected = {subscriptionDurabilityProperty};
assertActivationSpecInvalid(expected);
}
@Test(timeout = 60000)
public void testDurableSubscriberNoClientIdNoSubscriptionNameFailure() {
activationSpec.setDestinationType(Topic.class.getName());
activationSpec.setSubscriptionDurability(ActiveMQActivationSpec.DURABLE_SUBSCRIPTION);
@ -221,8 +246,9 @@ public class ActiveMQActivationSpecTest extends TestCase {
assertNull(activationSpec.getSubscriptionName());
PropertyDescriptor[] expected = {clientIdProperty, subscriptionNameProperty};
assertActivationSpecInvalid(expected);
}
}
@Test(timeout = 60000)
public void testDurableSubscriberEmptyClientIdEmptySubscriptionNameFailure() {
activationSpec.setDestinationType(Topic.class.getName());
activationSpec.setSubscriptionDurability(ActiveMQActivationSpec.DURABLE_SUBSCRIPTION);
@ -232,34 +258,36 @@ public class ActiveMQActivationSpecTest extends TestCase {
assertNull(activationSpec.getSubscriptionName());
PropertyDescriptor[] expected = {clientIdProperty, subscriptionNameProperty};
assertActivationSpecInvalid(expected);
}
public void testSetEmptyStringButGetNullValue() {
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(EMPTY_STRING);
assertNull("Property not null", activationSpec.getDestinationType());
activationSpec.setMessageSelector(EMPTY_STRING);
assertNull("Property not null", activationSpec.getMessageSelector());
activationSpec.setDestination(EMPTY_STRING);
assertNull("Property not null", activationSpec.getDestination());
activationSpec.setUserName(EMPTY_STRING);
assertNull("Property not null", activationSpec.getUserName());
activationSpec.setPassword(EMPTY_STRING);
assertNull("Property not null", activationSpec.getPassword());
activationSpec.setClientId(EMPTY_STRING);
assertNull("Property not null", activationSpec.getClientId());
activationSpec.setSubscriptionName(EMPTY_STRING);
assertNull("Property not null", activationSpec.getSubscriptionName());
}
//----------- helper methods
@Test(timeout = 60000)
public void testSetEmptyStringButGetNullValue() {
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(EMPTY_STRING);
assertNull("Property not null", activationSpec.getDestinationType());
activationSpec.setMessageSelector(EMPTY_STRING);
assertNull("Property not null", activationSpec.getMessageSelector());
activationSpec.setDestination(EMPTY_STRING);
assertNull("Property not null", activationSpec.getDestination());
activationSpec.setUserName(EMPTY_STRING);
assertNull("Property not null", activationSpec.getUserName());
activationSpec.setPassword(EMPTY_STRING);
assertNull("Property not null", activationSpec.getPassword());
activationSpec.setClientId(EMPTY_STRING);
assertNull("Property not null", activationSpec.getClientId());
activationSpec.setSubscriptionName(EMPTY_STRING);
assertNull("Property not null", activationSpec.getSubscriptionName());
}
//----------- helper methods
private void assertActivationSpecValid() {
try {
activationSpec.validate();
@ -267,7 +295,7 @@ public class ActiveMQActivationSpecTest extends TestCase {
fail("InvalidPropertyException should not be thrown");
}
}
private void assertActivationSpecInvalid(PropertyDescriptor[] expected) {
assertActivationSpecInvalid(activationSpec, expected);
}
@ -278,14 +306,14 @@ public class ActiveMQActivationSpecTest extends TestCase {
fail("InvalidPropertyException should have been thrown");
} catch (InvalidPropertyException e) {
PropertyDescriptor[] actual = e.getInvalidPropertyDescriptors();
assertEquals(expected, actual);
assertDescriptorsAreEqual(expected, actual);
}
}
private static void assertEquals(PropertyDescriptor[] expected, PropertyDescriptor[] actual) {
private static void assertDescriptorsAreEqual(PropertyDescriptor[] expected, PropertyDescriptor[] actual) {
/*
* This is kind of ugly. I originally created two HashSets and did an assertEquals(set1, set2)
* but because of a bug in the PropertyDescriptor class, it incorrectly fails. The problem is that the
* This is kind of ugly. I originally created two HashSets and did an assertEquals(set1, set2)
* but because of a bug in the PropertyDescriptor class, it incorrectly fails. The problem is that the
* PropertyDescriptor class implements the equals() method but not the hashCode() method and almost all
* of the java collection classes use hashCode() for testing equality. The one exception I found was
* the ArrayList class which uses equals() for testing equality. Since Arrays.asList(...) returns an
@ -299,7 +327,7 @@ public class ActiveMQActivationSpecTest extends TestCase {
List<PropertyDescriptor> actualList = Arrays.asList(actual);
assertTrue("Incorrect PropertyDescriptors returned", expectedList.containsAll(actualList));
}
public void testSelfEquality() {
assertEquality(activationSpec, activationSpec);
}
@ -313,11 +341,10 @@ public class ActiveMQActivationSpecTest extends TestCase {
assertTrue("ActiveMQActivationSpecs are not equal", rightSpec.equals(leftSpec));
assertTrue("HashCodes are not equal", leftSpec.hashCode() == rightSpec.hashCode());
}
private void assertNonEquality(ActiveMQActivationSpec leftSpec, ActiveMQActivationSpec rightSpec) {
assertFalse("ActiveMQActivationSpecs are equal", leftSpec.equals(rightSpec));
assertFalse("ActiveMQActivationSpecs are equal", rightSpec.equals(leftSpec));
assertFalse("HashCodes are equal", leftSpec.hashCode() == rightSpec.hashCode());
}
}

View File

@ -1,59 +1,51 @@
/*
* Copyright 2008 hak8fe.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* under the License.
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Timer;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.XATerminator;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQTopicSubscriber;
import org.junit.Before;
import org.junit.Test;
/**
*
* @author hak8fe
*/
public class ActiveMQConnectionFactoryTest extends TestCase {
ActiveMQManagedConnectionFactory mcf;
ActiveMQConnectionRequestInfo info;
String url = "vm://localhost";
String user = "defaultUser";
String pwd = "defaultPasswd";
public ActiveMQConnectionFactoryTest(String testName) {
super(testName);
}
public class ActiveMQConnectionFactoryTest {
@Override
protected void setUp() throws Exception {
super.setUp();
private ActiveMQManagedConnectionFactory mcf;
private ActiveMQConnectionRequestInfo info;
private String url = "vm://localhost?broker.persistent=false";
private String user = "defaultUser";
private String pwd = "defaultPasswd";
@Before
public void setUp() throws Exception {
mcf = new ActiveMQManagedConnectionFactory();
info = new ActiveMQConnectionRequestInfo();
info.setServerUrl(url);
@ -62,31 +54,27 @@ public class ActiveMQConnectionFactoryTest extends TestCase {
info.setAllPrefetchValues(new Integer(100));
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public void testSerializability() throws Exception
{
@Test(timeout = 60000)
public void testSerializability() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(mcf, new ConnectionManagerAdapter(), info);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(factory);
oos.close();
byte[] byteArray = bos.toByteArray();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(byteArray));
ActiveMQConnectionFactory deserializedFactory = (ActiveMQConnectionFactory) ois.readObject();
ois.close();
Connection con = deserializedFactory.createConnection("defaultUser", "defaultPassword");
ActiveMQConnection connection = ((ActiveMQConnection)((ManagedConnectionProxy)con).getManagedConnection().getPhysicalConnection());
ActiveMQConnection connection = ((ActiveMQConnection) ((ManagedConnectionProxy) con).getManagedConnection().getPhysicalConnection());
assertEquals(100, connection.getPrefetchPolicy().getQueuePrefetch());
assertNotNull("Connection object returned by ActiveMQConnectionFactory.createConnection() is null", con);
}
@Test(timeout = 60000)
public void testOptimizeDurablePrefetch() throws Exception {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setServerUrl(url);
@ -103,11 +91,11 @@ public class ActiveMQConnectionFactoryTest extends TestCase {
TopicSubscriber sub = sess.createDurableSubscriber(sess.createTopic("TEST"), "x");
con.start();
assertEquals(0, ((ActiveMQTopicSubscriber)sub).getPrefetchNumber());
assertEquals(0, ((ActiveMQTopicSubscriber) sub).getPrefetchNumber());
}
@Test(timeout = 60000)
public void testGetXAResource() throws Exception {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setServerUrl(url);
ra.setUserName(user);

View File

@ -16,86 +16,95 @@
*/
package org.apache.activemq.ra;
import junit.framework.TestCase;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
*
*/
public class ActiveMQResourceAdapterJavaBeanEqualityTest extends TestCase {
import org.junit.Before;
import org.junit.Test;
public class ActiveMQResourceAdapterJavaBeanEqualityTest {
private ActiveMQResourceAdapter raOne;
private ActiveMQResourceAdapter raTwo;
public ActiveMQResourceAdapterJavaBeanEqualityTest(String name) {
super(name);
}
protected void setUp() throws Exception {
super.setUp();
@Before
public void setUp() throws Exception {
raOne = new ActiveMQResourceAdapter();
raTwo = new ActiveMQResourceAdapter();
}
@Test(timeout = 60000)
public void testSelfEquality() {
assertEquality(raOne, raOne);
}
@Test(timeout = 60000)
public void testEmptyEquality() {
assertEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testNullEqualityFailure() {
assertFalse(raOne.equals(null));
}
@Test(timeout = 60000)
public void testServerUrlEquality() {
raOne.setServerUrl("one");
raTwo.setServerUrl("one");
assertEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testServerUrlInequality() {
raOne.setServerUrl("one");
raTwo.setServerUrl("two");
assertNonEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testServerUrlInequalityDifferentCase() {
raOne.setServerUrl("one");
raTwo.setServerUrl("ONE");
assertNonEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testNullServerUrlInequality() {
raOne.setServerUrl("one");
raTwo.setServerUrl(null);
assertNonEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testBrokerXMLConfigEquality() {
raOne.setBrokerXmlConfig("one");
raTwo.setBrokerXmlConfig("one");
assertEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testBrokerXMLConfigInequality() {
raOne.setBrokerXmlConfig("one");
raTwo.setBrokerXmlConfig("two");
assertNonEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testBrokerXMLConfigInequalityDifferentCase() {
raOne.setBrokerXmlConfig("one");
raTwo.setBrokerXmlConfig("ONE");
assertNonEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testNullBrokerXMLConfigInequality() {
raOne.setBrokerXmlConfig("one");
raTwo.setBrokerXmlConfig(null);
assertNonEquality(raOne, raTwo);
}
@Test(timeout = 60000)
public void testPasswordNotPartOfEquality() {
raOne.setClientid("one");
raTwo.setClientid("one");
@ -115,5 +124,4 @@ public class ActiveMQResourceAdapterJavaBeanEqualityTest extends TestCase {
assertFalse("ActiveMQResourceAdapters are equal", rightRa.equals(leftRa));
assertFalse("HashCodes are equal", leftRa.hashCode() == rightRa.hashCode());
}
}

View File

@ -19,39 +19,40 @@ package org.apache.activemq.ra;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
/**
*
*/
public class ConnectionEventListenerAdapter implements ConnectionEventListener {
/**
* @see javax.resource.spi.ConnectionEventListener#connectionClosed(javax.resource.spi.ConnectionEvent)
*/
@Override
public void connectionClosed(ConnectionEvent arg0) {
}
/**
* @see javax.resource.spi.ConnectionEventListener#localTransactionStarted(javax.resource.spi.ConnectionEvent)
*/
@Override
public void localTransactionStarted(ConnectionEvent arg0) {
}
/**
* @see javax.resource.spi.ConnectionEventListener#localTransactionCommitted(javax.resource.spi.ConnectionEvent)
*/
@Override
public void localTransactionCommitted(ConnectionEvent arg0) {
}
/**
* @see javax.resource.spi.ConnectionEventListener#localTransactionRolledback(javax.resource.spi.ConnectionEvent)
*/
@Override
public void localTransactionRolledback(ConnectionEvent arg0) {
}
/**
* @see javax.resource.spi.ConnectionEventListener#connectionErrorOccurred(javax.resource.spi.ConnectionEvent)
*/
@Override
public void connectionErrorOccurred(ConnectionEvent arg0) {
}
}

View File

@ -34,21 +34,19 @@ import org.slf4j.LoggerFactory;
/**
* A simple implementation of a ConnectionManager that can be extended so that
* it can see how the RA connections are interacting with it.
*
*
*/
public class ConnectionManagerAdapter implements ConnectionManager, ConnectionEventListener {
private static final long serialVersionUID = 5205646563916645831L;
private static final Logger LOG = LoggerFactory.getLogger(ConnectionManagerAdapter.class);
ArrayList<ConnectionEventListener> listners = new ArrayList<ConnectionEventListener>();
ArrayList<ManagedConnection> connections = new ArrayList<ManagedConnection>();
private ArrayList<ConnectionEventListener> listners = new ArrayList<ConnectionEventListener>();
private ArrayList<ManagedConnection> connections = new ArrayList<ManagedConnection>();
/**
* Adds a listener to all connections created by this connection manager.
* This listener will be added to all previously created connections.
*
*
* @param l
*/
public void addConnectionEventListener(ConnectionEventListener l) {
@ -63,6 +61,7 @@ public class ConnectionManagerAdapter implements ConnectionManager, ConnectionEv
* @see javax.resource.spi.ConnectionManager#allocateConnection(javax.resource.spi.ManagedConnectionFactory,
* javax.resource.spi.ConnectionRequestInfo)
*/
@Override
public Object allocateConnection(ManagedConnectionFactory connectionFactory, ConnectionRequestInfo info) throws ResourceException {
Subject subject = null;
ManagedConnection connection = connectionFactory.createManagedConnection(subject, info);
@ -78,6 +77,7 @@ public class ConnectionManagerAdapter implements ConnectionManager, ConnectionEv
/**
* @see javax.resource.spi.ConnectionEventListener#connectionClosed(javax.resource.spi.ConnectionEvent)
*/
@Override
public void connectionClosed(ConnectionEvent event) {
connections.remove(event.getSource());
try {
@ -85,6 +85,7 @@ public class ConnectionManagerAdapter implements ConnectionManager, ConnectionEv
} catch (ResourceException e) {
LOG.warn("Error occured during the cleanup of a managed connection: ", e);
}
try {
((ManagedConnection)event.getSource()).destroy();
} catch (ResourceException e) {
@ -95,24 +96,28 @@ public class ConnectionManagerAdapter implements ConnectionManager, ConnectionEv
/**
* @see javax.resource.spi.ConnectionEventListener#localTransactionStarted(javax.resource.spi.ConnectionEvent)
*/
@Override
public void localTransactionStarted(ConnectionEvent event) {
}
/**
* @see javax.resource.spi.ConnectionEventListener#localTransactionCommitted(javax.resource.spi.ConnectionEvent)
*/
@Override
public void localTransactionCommitted(ConnectionEvent event) {
}
/**
* @see javax.resource.spi.ConnectionEventListener#localTransactionRolledback(javax.resource.spi.ConnectionEvent)
*/
@Override
public void localTransactionRolledback(ConnectionEvent event) {
}
/**
* @see javax.resource.spi.ConnectionEventListener#connectionErrorOccurred(javax.resource.spi.ConnectionEvent)
*/
@Override
public void connectionErrorOccurred(ConnectionEvent event) {
LOG.warn("Managed connection experiened an error: ", event.getException());
try {
@ -120,11 +125,11 @@ public class ConnectionManagerAdapter implements ConnectionManager, ConnectionEv
} catch (ResourceException e) {
LOG.warn("Error occured during the cleanup of a managed connection: ", e);
}
try {
((ManagedConnection)event.getSource()).destroy();
} catch (ResourceException e) {
LOG.warn("Error occured during the destruction of a managed connection: ", e);
}
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -44,21 +46,24 @@ import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FailoverManagedClusterTest extends TestCase {
public class FailoverManagedClusterTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverManagedClusterTest.class);
long txGenerator = System.currentTimeMillis();
private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
private static final String SLAVE_BIND_ADDRESS = "tcp://0.0.0.0:61617";
private static final String MASTER_BIND_ADDRESS = "tcp://localhost:0";
private static final String SLAVE_BIND_ADDRESS = "tcp://localhost:0";
private static final String KAHADB_DIRECTORY = "target/activemq-data/";
private String masterConnectionUri;
private String slaveConnectionUri;
@ -69,16 +74,16 @@ public class FailoverManagedClusterTest extends TestCase {
private BrokerService slave;
private final CountDownLatch slaveThreadStarted = new CountDownLatch(1);
@Override
protected void setUp() throws Exception {
@Before
public void setUp() throws Exception {
createAndStartMaster();
createAndStartSlave();
brokerUri = "failover://(" + masterConnectionUri + "," + slaveConnectionUri + ")?randomize=false";
}
@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
if (slave != null) {
slave.stop();
}
@ -92,6 +97,7 @@ public class FailoverManagedClusterTest extends TestCase {
master = new BrokerService();
master.setDeleteAllMessagesOnStartup(true);
master.setUseJmx(false);
master.setDataDirectory(KAHADB_DIRECTORY);
master.setBrokerName("BROKER");
masterConnectionUri = master.addConnector(MASTER_BIND_ADDRESS).getPublishableConnectString();
master.start();
@ -101,6 +107,7 @@ public class FailoverManagedClusterTest extends TestCase {
private void createAndStartSlave() throws Exception {
slave = new BrokerService();
slave.setUseJmx(false);
slave.setDataDirectory(KAHADB_DIRECTORY);
slave.setBrokerName("BROKER");
slaveConnectionUri = slave.addConnector(SLAVE_BIND_ADDRESS).getPublishableConnectString();
@ -120,6 +127,7 @@ public class FailoverManagedClusterTest extends TestCase {
}).start();
}
@Test(timeout = 60000)
public void testFailover() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
@ -184,7 +192,6 @@ public class FailoverManagedClusterTest extends TestCase {
assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
}
private static final class StubBootstrapContext implements BootstrapContext {
@Override
public WorkManager getWorkManager() {
@ -271,7 +278,6 @@ public class FailoverManagedClusterTest extends TestCase {
public void onMessage(Message message) {
messageCount++;
}
}
public Xid createXid() throws IOException {
@ -298,5 +304,4 @@ public class FailoverManagedClusterTest extends TestCase {
}
};
}
}

View File

@ -20,17 +20,18 @@ import java.util.HashSet;
import javax.resource.spi.ManagedConnection;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class FailoverManagedConnectionTest extends TestCase {
public class FailoverManagedConnectionTest {
private static final String BROKER_TRANSPORT = "tcp://localhost:61616";
private static final String BROKER_URL = "failover://" + BROKER_TRANSPORT;
private static final String KAHADB_DIRECTORY = "target/activemq-data/";
private ActiveMQManagedConnectionFactory managedConnectionFactory;
private ManagedConnection managedConnection;
private ManagedConnectionProxy proxy;
@ -38,10 +39,11 @@ public class FailoverManagedConnectionTest extends TestCase {
private HashSet<ManagedConnection> connections;
private ActiveMQConnectionRequestInfo connectionInfo;
protected void setUp() throws Exception {
@Before
public void setUp() throws Exception {
createAndStartBroker();
connectionInfo = new ActiveMQConnectionRequestInfo();
connectionInfo.setServerUrl(BROKER_URL);
connectionInfo.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
@ -49,54 +51,56 @@ public class FailoverManagedConnectionTest extends TestCase {
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnection = managedConnectionFactory.createManagedConnection(null, connectionInfo);
connections = new HashSet<ManagedConnection>();
connections.add(managedConnection);
}
@After
public void tearDown() throws Exception {
stopBroker();
}
private void createAndStartBroker() throws Exception {
broker = new BrokerService();
broker.addConnector(BROKER_TRANSPORT);
broker.setDataDirectory(KAHADB_DIRECTORY);
broker.start();
broker.waitUntilStarted();
}
public void testFailoverBeforeClose() throws Exception {
@Test(timeout = 60000)
public void testFailoverBeforeClose() throws Exception {
createConnectionAndProxyAndSession();
stopBroker();
cleanupConnectionAndProxyAndSession();
createAndStartBroker();
for (int i=0; i<2; i++) {
createConnectionAndProxyAndSession();
cleanupConnectionAndProxyAndSession();
}
}
private void cleanupConnectionAndProxyAndSession() throws Exception {
proxy.close();
managedConnection.cleanup();
}
private void createConnectionAndProxyAndSession() throws Exception {
managedConnection =
managedConnectionFactory.matchManagedConnections(connections, null, connectionInfo);
proxy =
(ManagedConnectionProxy) managedConnection.getConnection(null, null);
managedConnection = managedConnectionFactory.matchManagedConnections(connections, null, connectionInfo);
proxy = (ManagedConnectionProxy) managedConnection.getConnection(null, null);
proxy.createSession(false, 0);
}
private void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
}

View File

@ -1,45 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.resource.spi.ManagedConnection;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import javax.resource.spi.ManagedConnection;
import javax.resource.ResourceException;
import org.apache.activemq.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsQueueTransactionTest;
/**
* @version $Rev$ $Date$
*/
public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
private static final String DEFAULT_HOST = "vm://localhost";
private static final String KAHADB_DIRECTORY = "target/activemq-data/";
private static final String DEFAULT_HOST = "vm://localhost?broker.dataDirectory=" + KAHADB_DIRECTORY;
private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter();
private ActiveMQManagedConnectionFactory managedConnectionFactory;
@ -63,7 +58,6 @@ public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
return (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);
}
/**
* Recreates the connection.
*
@ -134,19 +128,20 @@ public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
final byte[] bs = baos.toByteArray();
return new Xid() {
@Override
public int getFormatId() {
return 86;
}
@Override
public byte[] getGlobalTransactionId() {
return bs;
}
@Override
public byte[] getBranchQualifier() {
return bs;
}
};
}
}

View File

@ -1,47 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.jms.Connection;
import javax.resource.ResourceException;
import javax.resource.spi.ManagedConnection;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import javax.resource.spi.ManagedConnection;
import javax.resource.ResourceException;
import org.apache.activemq.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsQueueTransactionTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version $Rev$ $Date$
*/
public class JmsXARollback2CxTransactionTest extends JmsQueueTransactionTest {
protected static final Logger LOG = LoggerFactory.getLogger(JmsXARollback2CxTransactionTest.class);
private static final String DEFAULT_HOST = "vm://localhost?create=false";
private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter();
@ -163,19 +160,20 @@ public class JmsXARollback2CxTransactionTest extends JmsQueueTransactionTest {
final byte[] bs = baos.toByteArray();
return new Xid() {
@Override
public int getFormatId() {
return 86;
}
@Override
public byte[] getGlobalTransactionId() {
return bs;
}
@Override
public byte[] getBranchQualifier() {
return bs;
}
};
}
}

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -52,7 +58,6 @@ import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.junit.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@ -69,53 +74,61 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.spi.ErrorHandler;
import org.apache.log4j.spi.Filter;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class MDBTest {
private static final Logger LOG = LoggerFactory.getLogger(MDBTest.class);
long txGenerator = System.currentTimeMillis();
private long txGenerator = System.currentTimeMillis();
private AtomicInteger id = new AtomicInteger(0);
private static final class StubBootstrapContext implements BootstrapContext {
@Override
public WorkManager getWorkManager() {
return new WorkManager() {
@Override
public void doWork(Work work) throws WorkException {
new Thread(work).start();
}
@Override
public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
}
@Override
public long startWork(Work work) throws WorkException {
new Thread(work).start();
return 0;
}
@Override
public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
return 0;
}
@Override
public void scheduleWork(Work work) throws WorkException {
new Thread(work).start();
}
@Override
public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
}
};
}
@Override
public XATerminator getXATerminator() {
return null;
}
@Override
public Timer createTimer() throws UnavailableException {
return null;
}
@ -126,6 +139,7 @@ public class MDBTest {
public XAResource xaresource;
public Xid xid;
@Override
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
try {
if (xid == null) {
@ -138,6 +152,7 @@ public class MDBTest {
}
}
@Override
public void afterDelivery() throws ResourceException {
try {
xaresource.end(xid, 0);
@ -150,17 +165,18 @@ public class MDBTest {
}
}
@Override
public void release() {
LOG.info("In release, messageCount: " + messageCount + ", xid:" + xid);
}
@Override
public void onMessage(Message message) {
messageCount++;
}
}
@Test
@Test(timeout = 90000)
public void testDestinationInJndi() throws Exception{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = factory.createConnection();
@ -177,6 +193,7 @@ public class MDBTest {
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
@ -191,11 +208,13 @@ public class MDBTest {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -225,7 +244,7 @@ public class MDBTest {
adapter.stop();
}
@Test
@Test(timeout = 90000)
public void testMessageDelivery() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
@ -243,6 +262,7 @@ public class MDBTest {
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
@ -256,11 +276,13 @@ public class MDBTest {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -288,11 +310,10 @@ public class MDBTest {
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
//https://issues.apache.org/jira/browse/AMQ-5811
@Test
@Test(timeout = 90000)
public void testAsyncStop() throws Exception {
for (int repeat = 0; repeat < 10; repeat++) {
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
@ -308,6 +329,7 @@ public class MDBTest {
final StubMessageEndpoint endpoint = new StubMessageEndpoint()
{
@Override
public void onMessage(Message message)
{
super.onMessage(message);
@ -321,11 +343,13 @@ public class MDBTest {
activationSpecs[i].validate();
endpointFactories[i] = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -356,6 +380,7 @@ public class MDBTest {
final ActiveMQActivationSpec activationSpec = activationSpecs[i];
threads[i] = new Thread() {
@Override
public void run() {
adapter.endpointDeactivation(endpointFactory, activationSpec);
}
@ -364,7 +389,7 @@ public class MDBTest {
return threads;
}
@Test
@Test(timeout = 90000)
public void testErrorOnNoMessageDeliveryBrokerZeroPrefetchConfig() throws Exception {
final BrokerService brokerService = new BrokerService();
@ -438,8 +463,8 @@ public class MDBTest {
return false;
}
};
LogManager.getRootLogger().addAppender(testAppender);
LogManager.getRootLogger().addAppender(testAppender);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
Connection connection = factory.createConnection();
@ -455,6 +480,7 @@ public class MDBTest {
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
@ -468,11 +494,13 @@ public class MDBTest {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -524,6 +552,7 @@ public class MDBTest {
final AtomicBoolean failed = new AtomicBoolean(false);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
try {
@ -544,7 +573,8 @@ public class MDBTest {
} catch (InterruptedException ignored) {
}
};
@Override
public void afterDelivery() throws ResourceException {
try {
if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
@ -569,11 +599,13 @@ public class MDBTest {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -601,10 +633,9 @@ public class MDBTest {
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
@Test
@Test(timeout = 90000)
public void testOrderOfMessageExceptionReDelivery() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
@ -618,6 +649,7 @@ public class MDBTest {
final List<Integer> orderedReceipt = new ArrayList<Integer>();
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
if (messageCount == 2) {
@ -630,6 +662,7 @@ public class MDBTest {
}
};
@Override
public void afterDelivery() throws ResourceException {
try {
if (messageCount == 2) {
@ -658,11 +691,13 @@ public class MDBTest {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -701,10 +736,9 @@ public class MDBTest {
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
@Test
@Test(timeout = 90000)
public void testXaTimeoutRedelivery() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
@ -718,6 +752,7 @@ public class MDBTest {
final CountDownLatch messageDelivered = new CountDownLatch(2);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
try {
@ -742,6 +777,7 @@ public class MDBTest {
}
};
@Override
public void afterDelivery() throws ResourceException {
try {
xaresource.end(xid, XAResource.TMSUCCESS);
@ -759,11 +795,13 @@ public class MDBTest {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -776,7 +814,6 @@ public class MDBTest {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
// Send the broker a message to that endpoint
@ -790,10 +827,8 @@ public class MDBTest {
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
AtomicInteger id = new AtomicInteger(0);
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
@ -804,23 +839,25 @@ public class MDBTest {
return new Xid() {
final int lid = id.get();
@Override
public int getFormatId() {
return 86;
}
@Override
public byte[] getGlobalTransactionId() {
return bs;
}
@Override
public byte[] getBranchQualifier() {
return bs;
}
@Override
public String toString() {
return "DummyIdXID:" + lid;
}
};
}
}

View File

@ -16,6 +16,13 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
@ -36,23 +43,18 @@ import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Before;
import org.junit.Test;
/**
*
*/
public class ManagedConnectionFactoryTest extends TestCase {
public class ManagedConnectionFactoryTest {
private static final String DEFAULT_HOST = "vm://localhost?broker.persistent=false&broker.schedulerSupport=false";
private static final String REMOTE_HOST = "vm://remotehost?broker.persistent=false&broker.schedulerSupport=false";
private ActiveMQManagedConnectionFactory managedConnectionFactory;
/**
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
@Before
public void setUp() throws Exception {
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
managedConnectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
@ -60,17 +62,19 @@ public class ManagedConnectionFactoryTest extends TestCase {
managedConnectionFactory.setUseSessionArgs(false);
}
@Test(timeout = 60000)
public void testConnectionFactoryAllocation() throws ResourceException, JMSException {
// Make sure that the ConnectionFactory is asking the connection manager
// to
// allocate the connection.
// to allocate the connection.
final boolean allocateRequested[] = new boolean[] {
false
};
Object cf = managedConnectionFactory.createConnectionFactory(new ConnectionManagerAdapter() {
private static final long serialVersionUID = 1699499816530099939L;
@Override
public Object allocateConnection(ManagedConnectionFactory connectionFactory, ConnectionRequestInfo info) throws ResourceException {
allocateRequested[0] = true;
return super.allocateConnection(connectionFactory, info);
@ -94,9 +98,9 @@ public class ManagedConnectionFactoryTest extends TestCase {
assertFalse("transacted attribute is ignored, only transacted with xa or local tx", session.getTransacted());
connection.close();
}
@Test(timeout = 60000)
public void testConnectionSessionArgs() throws ResourceException, JMSException {
ActiveMQConnectionRequestInfo connectionRequestInfo = new ActiveMQConnectionRequestInfo();
connectionRequestInfo.setServerUrl(DEFAULT_HOST);
@ -112,6 +116,7 @@ public class ManagedConnectionFactoryTest extends TestCase {
connection.close();
}
@Test(timeout = 60000)
public void testConnectionFactoryConnectionMatching() throws ResourceException, JMSException {
ActiveMQConnectionRequestInfo ri1 = new ActiveMQConnectionRequestInfo();
@ -150,6 +155,7 @@ public class ManagedConnectionFactoryTest extends TestCase {
}
}
@Test(timeout = 60000)
public void testConnectionFactoryIsSerializableAndReferenceable() throws ResourceException, JMSException {
Object cf = managedConnectionFactory.createConnectionFactory(new ConnectionManagerAdapter());
assertTrue(cf != null);
@ -157,39 +163,40 @@ public class ManagedConnectionFactoryTest extends TestCase {
assertTrue(cf instanceof Referenceable);
}
@Test(timeout = 60000)
public void testImplementsQueueAndTopicConnectionFactory() throws Exception {
Object cf = managedConnectionFactory.createConnectionFactory(new ConnectionManagerAdapter());
assertTrue(cf instanceof QueueConnectionFactory);
assertTrue(cf instanceof TopicConnectionFactory);
}
@Test(timeout = 60000)
public void testSerializability() throws Exception {
managedConnectionFactory.setLogWriter(new PrintWriter(new ByteArrayOutputStream()));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(managedConnectionFactory);
oos.close();
byte[] byteArray = bos.toByteArray();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(byteArray));
ActiveMQManagedConnectionFactory deserializedFactory = (ActiveMQManagedConnectionFactory) ois.readObject();
ois.close();
assertNull(
"[logWriter] property of deserialized ActiveMQManagedConnectionFactory is not null",
"[logWriter] property of deserialized ActiveMQManagedConnectionFactory is not null",
deserializedFactory.getLogWriter());
assertNotNull(
"ConnectionRequestInfo of deserialized ActiveMQManagedConnectionFactory is null",
"ConnectionRequestInfo of deserialized ActiveMQManagedConnectionFactory is null",
deserializedFactory.getInfo());
assertEquals(
"[serverUrl] property of deserialized ConnectionRequestInfo object is not [" + DEFAULT_HOST + "]",
"[serverUrl] property of deserialized ConnectionRequestInfo object is not [" + DEFAULT_HOST + "]",
DEFAULT_HOST,
deserializedFactory.getInfo().getServerUrl());
assertNotNull(
"Log instance of deserialized ActiveMQManagedConnectionFactory is null",
deserializedFactory.log);
}
}
}

View File

@ -16,6 +16,11 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
@ -28,13 +33,11 @@ import javax.jms.TopicConnectionFactory;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Before;
import org.junit.Test;
/**
*
*/
public class ManagedConnectionTest extends TestCase {
public class ManagedConnectionTest {
private static final String DEFAULT_HOST = "vm://localhost?broker.persistent=false";
@ -44,10 +47,8 @@ public class ManagedConnectionTest extends TestCase {
private ManagedConnectionProxy connection;
private ActiveMQManagedConnection managedConnection;
/**
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
@Before
public void setUp() throws Exception {
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
@ -60,12 +61,14 @@ public class ManagedConnectionTest extends TestCase {
}
@Test(timeout = 60000)
public void testConnectionCloseEvent() throws ResourceException, JMSException {
final boolean test[] = new boolean[] {
false
};
connectionManager.addConnectionEventListener(new ConnectionEventListenerAdapter() {
@Override
public void connectionClosed(ConnectionEvent arg0) {
test[0] = true;
}
@ -74,12 +77,14 @@ public class ManagedConnectionTest extends TestCase {
assertTrue(test[0]);
}
@Test(timeout = 60000)
public void testLocalTransactionCommittedEvent() throws ResourceException, JMSException {
final boolean test[] = new boolean[] {
false
};
connectionManager.addConnectionEventListener(new ConnectionEventListenerAdapter() {
@Override
public void localTransactionCommitted(ConnectionEvent arg0) {
test[0] = true;
}
@ -95,12 +100,14 @@ public class ManagedConnectionTest extends TestCase {
}
@Test(timeout = 60000)
public void testLocalTransactionRollbackEvent() throws ResourceException, JMSException {
final boolean test[] = new boolean[] {
false
};
connectionManager.addConnectionEventListener(new ConnectionEventListenerAdapter() {
@Override
public void localTransactionRolledback(ConnectionEvent arg0) {
test[0] = true;
}
@ -113,12 +120,14 @@ public class ManagedConnectionTest extends TestCase {
assertTrue(test[0]);
}
@Test(timeout = 60000)
public void testLocalTransactionStartedEvent() throws ResourceException, JMSException {
final boolean test[] = new boolean[] {
false
};
connectionManager.addConnectionEventListener(new ConnectionEventListenerAdapter() {
@Override
public void localTransactionStarted(ConnectionEvent arg0) {
test[0] = true;
}
@ -136,6 +145,7 @@ public class ManagedConnectionTest extends TestCase {
* A managed connection that has been clean up should throw exceptions when
* it used.
*/
@Test(timeout = 60000)
public void testCleanup() throws ResourceException, JMSException {
// Do some work and close it...
@ -143,13 +153,14 @@ public class ManagedConnectionTest extends TestCase {
doWork(session);
connection.close();
try {
// This should throw expection
// This should throw exception
doWork(session);
fail("Using a session after the connection is closed should throw exception.");
} catch (JMSException e) {
}
}
@Test(timeout = 60000)
public void testSessionCloseIndependance() throws ResourceException, JMSException {
Session session1 = connection.createSession(true, 0);
@ -159,7 +170,7 @@ public class ManagedConnectionTest extends TestCase {
doWork(session1);
session1.close();
try {
// This should throw expection
// This should throw exception
doWork(session1);
fail("Using a session after the connection is closed should throw exception.");
} catch (JMSException e) {
@ -169,7 +180,7 @@ public class ManagedConnectionTest extends TestCase {
doWork(session2);
session2.close();
try {
// This should throw expection
// This should throw exception
doWork(session2);
fail("Using a session after the connection is closed should throw exception.");
} catch (JMSException e) {
@ -178,7 +189,7 @@ public class ManagedConnectionTest extends TestCase {
/**
* Does some work so that we can test commit/rollback etc.
*
*
* @throws JMSException
*/
public void doWork(Session session) throws JMSException {
@ -187,6 +198,7 @@ public class ManagedConnectionTest extends TestCase {
producer.send(session.createTextMessage("test message."));
}
@Test(timeout = 60000)
public void testImplementsQueueAndTopicConnection() throws Exception {
QueueConnection qc = ((QueueConnectionFactory)connectionFactory).createQueueConnection();
assertNotNull(qc);
@ -194,10 +206,12 @@ public class ManagedConnectionTest extends TestCase {
assertNotNull(tc);
}
@Test(timeout = 60000)
public void testSelfEquality() {
assertEquality(managedConnection, managedConnection);
}
@Test(timeout = 60000)
public void testSamePropertiesButNotEqual() throws Exception {
ManagedConnectionProxy newConnection = (ManagedConnectionProxy)connectionFactory.createConnection();
assertNonEquality(managedConnection, newConnection.getManagedConnection());
@ -214,5 +228,4 @@ public class ManagedConnectionTest extends TestCase {
assertFalse("ActiveMQManagedConnection are equal", rightCon.equals(leftCon));
assertFalse("HashCodes are equal", leftCon.hashCode() == rightCon.hashCode());
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.lang.reflect.Method;
import javax.jms.Message;
@ -30,20 +33,15 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import junit.framework.TestCase;
/**
* @author <a href="mailto:michael.gaffney@panacya.com">Michael Gaffney </a>
*/
@RunWith(JMock.class)
public class MessageEndpointProxyTest extends TestCase {
public class MessageEndpointProxyTest {
private MessageEndpoint mockEndpoint;
private EndpointAndListener mockEndpointAndListener;
private Message stubMessage;
private MessageEndpointProxy endpointProxy;
private Mockery context;
@Before
public void setUp() {
context = new Mockery();
@ -54,7 +52,7 @@ public class MessageEndpointProxyTest extends TestCase {
endpointProxy = new MessageEndpointProxy(mockEndpointAndListener);
}
@Test
@Test(timeout = 60000)
public void testInvalidConstruction() {
try {
new MessageEndpointProxy(mockEndpoint);
@ -64,7 +62,7 @@ public class MessageEndpointProxyTest extends TestCase {
}
}
@Test
@Test(timeout = 60000)
public void testSuccessfulCallSequence() throws Exception {
setupBeforeDeliverySuccessful();
setupOnMessageSuccessful();
@ -75,7 +73,7 @@ public class MessageEndpointProxyTest extends TestCase {
doAfterDeliveryExpectSuccess();
}
@Test
@Test(timeout = 60000)
public void testBeforeDeliveryFailure() throws Exception {
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).beforeDelivery(with(any(Method.class)));
@ -85,7 +83,7 @@ public class MessageEndpointProxyTest extends TestCase {
never (mockEndpointAndListener).onMessage(null);
never (mockEndpointAndListener).afterDelivery();
}});
setupExpectRelease();
try {
@ -96,19 +94,19 @@ public class MessageEndpointProxyTest extends TestCase {
}
doOnMessageExpectInvalidMessageEndpointException();
doAfterDeliveryExpectInvalidMessageEndpointException();
doFullyDeadCheck();
}
@Test
@Test(timeout = 60000)
public void testOnMessageFailure() throws Exception {
setupBeforeDeliverySuccessful();
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).onMessage(with(same(stubMessage)));
will(throwException(new RuntimeException()));
}});
setupAfterDeliverySuccessful();
doBeforeDeliveryExpectSuccess();
@ -122,11 +120,11 @@ public class MessageEndpointProxyTest extends TestCase {
}
@Test
@Test(timeout = 60000)
public void testAfterDeliveryFailure() throws Exception {
setupBeforeDeliverySuccessful();
setupOnMessageSuccessful();
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).afterDelivery(); will(throwException(new ResourceException()));
}});
@ -175,7 +173,7 @@ public class MessageEndpointProxyTest extends TestCase {
oneOf (mockEndpointAndListener).release();
}});
}
private void doBeforeDeliveryExpectSuccess() {
try {
endpointProxy.beforeDelivery(ActiveMQEndpointWorker.ON_MESSAGE_METHOD);

View File

@ -16,13 +16,6 @@
*/
package org.apache.activemq.ra;
import junit.framework.TestCase;
import org.apache.activemq.broker.SslBrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@ -30,41 +23,50 @@ import javax.jms.Session;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class SSLMAnagedConnectionFactoryTest extends TestCase {
import org.apache.activemq.broker.SslBrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class SSLMAnagedConnectionFactoryTest {
private static final String KAHADB_DIRECTORY = "target/activemq-data/";
private static final String DEFAULT_HOST = "ssl://localhost:0";
private static final String DEFAULT_HOST = "ssl://0.0.0.0:61616";
private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter();
private ActiveMQManagedConnectionFactory managedConnectionFactory;
private ConnectionFactory connectionFactory;
private ManagedConnectionProxy connection;
private ActiveMQManagedConnection managedConnection;
private SslBrokerService broker;
private TransportConnector connector;
private String connectionURI;
@Before
public void setUp() throws Exception {
createAndStartBroker();
/**
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
managedConnectionFactory.setServerUrl(connectionURI);
managedConnectionFactory.setTrustStore("server.keystore");
managedConnectionFactory.setTrustStorePassword("password");
managedConnectionFactory.setKeyStore("client.keystore");
managedConnectionFactory.setKeyStorePassword("password");
connectionFactory = (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);createAndStartBroker();
connectionFactory = (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);
}
@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
}
}
@Test(timeout = 60000)
public void testSSLManagedConnection() throws Exception {
connection = (ManagedConnectionProxy)connectionFactory.createConnection();
managedConnection = connection.getManagedConnection();
@ -82,11 +84,15 @@ public class SSLMAnagedConnectionFactoryTest extends TestCase {
broker.setDeleteAllMessagesOnStartup(true);
broker.setUseJmx(false);
broker.setBrokerName("BROKER");
broker.setDataDirectory(KAHADB_DIRECTORY);
KeyManager[] km = SSLTest.getKeyManager();
TrustManager[] tm = SSLTest.getTrustManager();
connector = broker.addSslConnector(DEFAULT_HOST, km, tm, null);
TransportConnector connector = broker.addSslConnector(DEFAULT_HOST, km, tm, null);
broker.start();
broker.waitUntilStarted(); // for client side
broker.waitUntilStarted();
connectionURI = connector.getPublishableConnectString();
SslTransportFactory sslFactory = new SslTransportFactory();
SslContext ctx = new SslContext(km, tm, null);
SslContext.setCurrentSslContext(ctx);

View File

@ -16,17 +16,20 @@
*/
package org.apache.activemq.ra;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.SslBrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.KeyStore;
import java.util.Timer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
@ -38,7 +41,6 @@ import javax.jms.Session;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
@ -55,40 +57,43 @@ import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.util.Timer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class SSLTest extends TestCase {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.SslBrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
long txGenerator = System.currentTimeMillis();
public class SSLTest {
private static final String KEYSTORE_TYPE = "jks";
private static final String PASSWORD = "password";
private static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
private static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
private static final String KAHADB_DIRECTORY = "target/activemq-data/";
private static final String BIND_ADDRESS = "ssl://0.0.0.0:61616";
private SslBrokerService broker;
private long txGenerator = System.currentTimeMillis();
private SslBrokerService broker;
private TransportConnector connector;
@Override
protected void setUp() throws Exception {
@Before
public void setUp() throws Exception {
createAndStartBroker();
}
@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
}
@ -99,11 +104,13 @@ public class SSLTest extends TestCase {
broker.setDeleteAllMessagesOnStartup(true);
broker.setUseJmx(false);
broker.setBrokerName("BROKER");
broker.setDataDirectory(KAHADB_DIRECTORY);
KeyManager[] km = getKeyManager();
TrustManager[] tm = getTrustManager();
connector = broker.addSslConnector(BIND_ADDRESS, km, tm, null);
broker.start();
broker.waitUntilStarted(); // for client side
SslTransportFactory sslFactory = new SslTransportFactory();
SslContext ctx = new SslContext(km, tm, null);
SslContext.setCurrentSslContext(ctx);
@ -111,40 +118,49 @@ public class SSLTest extends TestCase {
}
private static final class StubBootstrapContext implements BootstrapContext {
@Override
public WorkManager getWorkManager() {
return new WorkManager() {
@Override
public void doWork(Work work) throws WorkException {
new Thread(work).start();
}
@Override
public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
}
@Override
public long startWork(Work work) throws WorkException {
new Thread(work).start();
return 0;
}
@Override
public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
return 0;
}
@Override
public void scheduleWork(Work work) throws WorkException {
new Thread(work).start();
}
@Override
public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
}
};
}
@Override
public XATerminator getXATerminator() {
return null;
}
@Override
public Timer createTimer() throws UnavailableException {
return null;
}
@ -155,6 +171,7 @@ public class SSLTest extends TestCase {
public XAResource xaresource;
public Xid xid;
@Override
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
try {
if (xid == null) {
@ -166,6 +183,7 @@ public class SSLTest extends TestCase {
}
}
@Override
public void afterDelivery() throws ResourceException {
try {
xaresource.end(xid, 0);
@ -176,15 +194,17 @@ public class SSLTest extends TestCase {
}
}
@Override
public void release() {
}
@Override
public void onMessage(Message message) {
messageCount++;
}
}
@Test(timeout = 60000)
public void testMessageDeliveryUsingSSLTruststoreOnly() throws Exception {
SSLContext context = SSLContext.getInstance("TLS");
context.init(getKeyManager(), getTrustManager(), null);
@ -208,12 +228,11 @@ public class SSLTest extends TestCase {
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
}
;
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
@ -223,11 +242,13 @@ public class SSLTest extends TestCase {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -255,9 +276,9 @@ public class SSLTest extends TestCase {
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
@Test(timeout = 60000)
public void testMessageDeliveryUsingSSLTruststoreAndKeystore() throws Exception {
SSLContext context = SSLContext.getInstance("TLS");
context.init(getKeyManager(), getTrustManager(), null);
@ -285,12 +306,11 @@ public class SSLTest extends TestCase {
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
}
;
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
@ -300,11 +320,13 @@ public class SSLTest extends TestCase {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -332,9 +354,9 @@ public class SSLTest extends TestCase {
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
}
@Test(timeout = 60000)
public void testMessageDeliveryUsingSSLTruststoreAndKeystoreOverrides() throws Exception {
SSLContext context = SSLContext.getInstance("TLS");
context.init(getKeyManager(), getTrustManager(), null);
@ -358,12 +380,11 @@ public class SSLTest extends TestCase {
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
}
;
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
@ -377,11 +398,13 @@ public class SSLTest extends TestCase {
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@ -409,10 +432,8 @@ public class SSLTest extends TestCase {
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
@ -421,19 +442,21 @@ public class SSLTest extends TestCase {
final byte[] bs = baos.toByteArray();
return new Xid() {
@Override
public int getFormatId() {
return 86;
}
@Override
public byte[] getGlobalTransactionId() {
return bs;
}
@Override
public byte[] getBranchQualifier() {
return bs;
}
};
}
public static TrustManager[] getTrustManager() throws Exception {
@ -457,7 +480,6 @@ public class SSLTest extends TestCase {
byte[] sslCert = loadClientCredential(SERVER_KEYSTORE);
if (sslCert != null && sslCert.length > 0) {
ByteArrayInputStream bin = new ByteArrayInputStream(sslCert);
ks.load(bin, PASSWORD.toCharArray());
@ -471,28 +493,30 @@ public class SSLTest extends TestCase {
if (fileName == null) {
return null;
}
FileInputStream in = new FileInputStream(fileName);
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[512];
int i = in.read(buf);
while (i > 0) {
out.write(buf, 0, i);
i = in.read(buf);
}
in.close();
return out.toByteArray();
}
private void makeSSLConnection(SSLContext context, String enabledSuites[], TransportConnector connector) throws Exception,
UnknownHostException, SocketException {
private void makeSSLConnection(SSLContext context, String enabledSuites[], TransportConnector connector) throws Exception {
SSLSocket sslSocket = (SSLSocket) context.getSocketFactory().createSocket("localhost", connector.getUri().getPort());
if (enabledSuites != null) {
sslSocket.setEnabledCipherSuites(enabledSuites);
}
sslSocket.setSoTimeout(5000);
SSLSession session = sslSocket.getSession();
sslSocket.setSoTimeout(5000);
sslSocket.getSession();
sslSocket.startHandshake();
}
}

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Session;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.endpoint.MessageEndpointFactory;
@ -28,9 +31,7 @@ import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
@ -54,31 +55,30 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
@RunWith(JMock.class)
public class ServerSessionImplTest extends TestCase {
public class ServerSessionImplTest {
private static final Logger LOG = LoggerFactory.getLogger(ServerSessionImplTest.class);
private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
private ServerSessionImpl serverSession;
private ServerSessionPoolImpl pool;
private WorkManager workManager;
private MessageEndpointProxy messageEndpoint;
private ActiveMQConnection con;
private ActiveMQSession session;
ActiveMQEndpointWorker endpointWorker;
private ActiveMQEndpointWorker endpointWorker;
private Mockery context;
@Before
public void setUp() throws Exception
{
super.setUp();
context = new Mockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
}};
org.apache.activemq.ActiveMQConnectionFactory factory =
new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
public void setUp() throws Exception {
context = new Mockery() {
{
setImposteriser(ClassImposteriser.INSTANCE);
}
};
org.apache.activemq.ActiveMQConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
con = (ActiveMQConnection) factory.createConnection();
con.start();
session = (ActiveMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -98,18 +98,14 @@ public class ServerSessionImplTest extends TestCase {
workManager = context.mock(WorkManager.class);
messageEndpoint = context.mock(MessageEndpointProxy.class);
serverSession = new ServerSessionImpl(
(ServerSessionPoolImpl) pool,
session,
(WorkManager) workManager,
messageEndpoint,
false,
10);
serverSession = new ServerSessionImpl(pool, session, workManager, messageEndpoint, false, 10);
con.close();
context.checking(new Expectations() {{
oneOf (pool).removeFromPool(with(same(serverSession)));
}});
context.checking(new Expectations() {
{
oneOf(pool).removeFromPool(with(same(serverSession)));
}
});
serverSession.run();
}
@ -126,53 +122,70 @@ public class ServerSessionImplTest extends TestCase {
workManager = context.mock(WorkManager.class);
final MessageActivationSpec messageActivationSpec = context.mock(MessageActivationSpec.class);
final BootstrapContext boostrapContext = context.mock(BootstrapContext.class);
context.checking(new Expectations() {{
allowing(boostrapContext).getWorkManager(); will (returnValue(workManager));
allowing(resourceAdapter).getBootstrapContext(); will (returnValue(boostrapContext));
allowing(messageEndpointFactory).isDeliveryTransacted(with (any(Method.class))); will(returnValue(Boolean.FALSE));
allowing(key).getMessageEndpointFactory(); will(returnValue(messageEndpointFactory));
allowing(key).getActivationSpec(); will (returnValue(messageActivationSpec));
allowing(messageActivationSpec).isUseJndi(); will (returnValue(Boolean.FALSE));
allowing(messageActivationSpec).getDestinationType(); will (returnValue("javax.jms.Queue"));
allowing(messageActivationSpec).getDestination(); will (returnValue("Queue"));
allowing(messageActivationSpec).getAcknowledgeModeForSession(); will (returnValue(1));
allowing(messageActivationSpec).getMaxSessionsIntValue(); will (returnValue(1));
allowing(messageActivationSpec).getEnableBatchBooleanValue(); will (returnValue(Boolean.FALSE));
allowing(messageActivationSpec).isUseRAManagedTransactionEnabled(); will (returnValue(Boolean.TRUE));
allowing(messageEndpointFactory).createEndpoint(with (any(XAResource.class))); will (returnValue(messageEndpoint));
context.checking(new Expectations() {
{
allowing(boostrapContext).getWorkManager();
will(returnValue(workManager));
allowing(resourceAdapter).getBootstrapContext();
will(returnValue(boostrapContext));
allowing(messageEndpointFactory).isDeliveryTransacted(with(any(Method.class)));
will(returnValue(Boolean.FALSE));
allowing(key).getMessageEndpointFactory();
will(returnValue(messageEndpointFactory));
allowing(key).getActivationSpec();
will(returnValue(messageActivationSpec));
allowing(messageActivationSpec).isUseJndi();
will(returnValue(Boolean.FALSE));
allowing(messageActivationSpec).getDestinationType();
will(returnValue("javax.jms.Queue"));
allowing(messageActivationSpec).getDestination();
will(returnValue("Queue"));
allowing(messageActivationSpec).getAcknowledgeModeForSession();
will(returnValue(1));
allowing(messageActivationSpec).getMaxSessionsIntValue();
will(returnValue(1));
allowing(messageActivationSpec).getEnableBatchBooleanValue();
will(returnValue(Boolean.FALSE));
allowing(messageActivationSpec).isUseRAManagedTransactionEnabled();
will(returnValue(Boolean.TRUE));
allowing(messageEndpointFactory).createEndpoint(with(any(XAResource.class)));
will(returnValue(messageEndpoint));
allowing(workManager).scheduleWork((Work) with(anything()), (long) with(any(long.class)), with(any(ExecutionContext.class)), with(any(WorkListener.class)));
will (new Action() {
@Override
public Object invoke(Invocation invocation) throws Throwable {
return null;
}
@Override
public void describeTo(Description description) {
}
});
allowing(messageEndpoint).beforeDelivery((Method) with(anything()));
allowing (messageEndpoint).onMessage(with (any(javax.jms.Message.class))); will(new Action(){
@Override
public Object invoke(Invocation invocation) throws Throwable {
messageCount.countDown();
if (messageCount.getCount() < maxMessages - 11) {
TimeUnit.MILLISECONDS.sleep(200);
allowing(workManager).scheduleWork((Work) with(anything()), with(any(long.class)), with(any(ExecutionContext.class)),
with(any(WorkListener.class)));
will(new Action() {
@Override
public Object invoke(Invocation invocation) throws Throwable {
return null;
}
return null;
}
@Override
public void describeTo(Description description) {
description.appendText("Keep message count");
}
});
allowing(messageEndpoint).afterDelivery();
allowing(messageEndpoint).release();
@Override
public void describeTo(Description description) {
}
});
}});
allowing(messageEndpoint).beforeDelivery((Method) with(anything()));
allowing(messageEndpoint).onMessage(with(any(javax.jms.Message.class)));
will(new Action() {
@Override
public Object invoke(Invocation invocation) throws Throwable {
messageCount.countDown();
if (messageCount.getCount() < maxMessages - 11) {
TimeUnit.MILLISECONDS.sleep(200);
}
return null;
}
@Override
public void describeTo(Description description) {
description.appendText("Keep message count");
}
});
allowing(messageEndpoint).afterDelivery();
allowing(messageEndpoint).release();
}
});
endpointWorker = new ActiveMQEndpointWorker(resourceAdapter, key);
endpointWorker.setConnection(con);
@ -183,8 +196,8 @@ public class ServerSessionImplTest extends TestCase {
// preload the session dispatch queue to keep the session active
ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
for (int i=0; i<maxMessages; i++) {
MessageDispatch messageDispatch = new MessageDispatch();
for (int i = 0; i < maxMessages; i++) {
MessageDispatch messageDispatch = new MessageDispatch();
ActiveMQMessage message = new ActiveMQTextMessage();
message.setMessageId(new MessageId("0:0:0:" + i));
message.getMessageId().setBrokerSequenceId(i);
@ -195,7 +208,7 @@ public class ServerSessionImplTest extends TestCase {
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch runState = new CountDownLatch(1);
executorService.execute(new Runnable(){
executorService.execute(new Runnable() {
@Override
public void run() {
try {

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertNotNull;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
@ -26,13 +28,11 @@ import javax.jms.TopicSubscriber;
import javax.resource.ResourceException;
import javax.transaction.xa.XAException;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Before;
import org.junit.Test;
/**
*
*/
public class UnsubscribeResubscribeTest extends TestCase {
public class UnsubscribeResubscribeTest {
private static final String DEFAULT_HOST = "vm://localhost?broker.persistent=false";
@ -42,17 +42,13 @@ public class UnsubscribeResubscribeTest extends TestCase {
private ManagedConnectionProxy connection;
private ActiveMQManagedConnection managedConnection;
/**
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
@Before
public void setUp() throws Exception {
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
managedConnectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
managedConnectionFactory.setClientid("clientId");
}
private void getConnection() throws ResourceException, JMSException {
@ -61,8 +57,10 @@ public class UnsubscribeResubscribeTest extends TestCase {
managedConnection = connection.getManagedConnection();
}
@Test(timeout = 60000)
public void testUnsubscribeResubscribe() throws ResourceException, JMSException, XAException {
getConnection();
assertNotNull(managedConnection);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
TopicSubscriber sub = session.createDurableSubscriber(topic, "sub");
@ -73,5 +71,4 @@ public class UnsubscribeResubscribeTest extends TestCase {
session.unsubscribe("sub");
sub = session.createDurableSubscriber(topic, "sub");
}
}