resolve https://issues.apache.org/activemq/browse/AMQ-1191 by introducing driver specific locker so that impl for sql_server can diverge from those of oracle etc. Thus the patch is applied to a transact sql specific impl - the impls are defined in a resoruce file in the same way as the adapters. The default locker now remains unchanged. Also updated to the latest stable jmock 2.5.1

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@798602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-07-28 16:44:47 +00:00
parent a8d3908586
commit 8274f09dd4
12 changed files with 367 additions and 332 deletions

View File

@ -241,6 +241,13 @@
<version>1.2.24</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock</artifactId>
<version>${jmock-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<reporting>

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import org.apache.activemq.Service;
/**
@ -25,10 +27,23 @@ import org.apache.activemq.Service;
*/
public interface DatabaseLocker extends Service {
/**
* allow the injection of a jdbc persistence adapter
* @param adapter the persistence adapter to use
* @throws IOException
*/
void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException;
/**
* Used by a timer to keep alive the lock.
* If the method returns false the broker should be terminated
*/
boolean keepAlive();
/**
* set the delay interval in milliseconds between lock acquire attempts
* @param lockAcquireSleepInterval the sleep interval in miliseconds
*/
void setLockAcquireSleepInterval(long lockAcquireSleepInterval);
}

View File

@ -19,8 +19,6 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import javax.sql.DataSource;
@ -38,21 +36,24 @@ import org.apache.commons.logging.LogFactory;
public class DefaultDatabaseLocker implements DatabaseLocker {
public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
private static final Log LOG = LogFactory.getLog(DefaultDatabaseLocker.class);
private final DataSource dataSource;
private final Statements statements;
private long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
protected DataSource dataSource;
protected Statements statements;
protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
private Connection connection;
private boolean stopping;
private Handler<Exception> exceptionHandler;
protected Connection connection;
protected boolean stopping;
protected Handler<Exception> exceptionHandler;
public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
this(persistenceAdapter.getLockDataSource(), persistenceAdapter.getStatements());
public DefaultDatabaseLocker() {
}
public DefaultDatabaseLocker(DataSource dataSource, Statements statements) {
this.dataSource = dataSource;
this.statements = statements;
public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
setPersistenceAdapter(persistenceAdapter);
}
public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
this.dataSource = adapter.getLockDataSource();
this.statements = adapter.getStatements();
}
public void start() throws Exception {
@ -66,13 +67,7 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
connection.setAutoCommit(false);
String sql = statements.getLockCreateStatement();
statement = connection.prepareStatement(sql);
if (statement.getMetaData() != null) {
ResultSet rs = statement.executeQuery();
// if not already locked the statement below blocks until lock acquired
rs.next();
} else {
statement.execute();
}
statement.execute();
break;
} catch (Exception e) {
if (stopping) {

View File

@ -64,8 +64,10 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(JDBCPersistenceAdapter.class);
private static FactoryFinder factoryFinder = new FactoryFinder(
private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/store/jdbc/");
private static FactoryFinder lockFactoryFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/store/jdbc/lock/");
private WireFormat wireFormat = new OpenWireFormat();
private BrokerService brokerService;
@ -285,11 +287,21 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public DatabaseLocker getDatabaseLocker() throws IOException {
if (databaseLocker == null && isUseDatabaseLock()) {
databaseLocker = createDatabaseLocker();
setDatabaseLocker(loadDataBaseLocker());
}
return databaseLocker;
}
/**
* Sets the database locker strategy to use to lock the database on startup
* @throws IOException
*/
public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
databaseLocker = locker;
databaseLocker.setPersistenceAdapter(this);
databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
}
public DataSource getLockDataSource() throws IOException {
if (lockDataSource == null) {
lockDataSource = getDataSource();
@ -308,13 +320,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
this.lockDataSource = dataSource;
}
/**
* Sets the database locker strategy to use to lock the database on startup
*/
public void setDatabaseLocker(DatabaseLocker databaseLocker) {
this.databaseLocker = databaseLocker;
}
public BrokerService getBrokerService() {
return brokerService;
}
@ -327,37 +332,39 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
* @throws IOException
*/
protected JDBCAdapter createAdapter() throws IOException {
JDBCAdapter adapter = null;
adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
// Use the default JDBC adapter if the
// Database type is not recognized.
if (adapter == null) {
adapter = new DefaultJDBCAdapter();
LOG.debug("Using default JDBC Adapter: " + adapter);
}
return adapter;
}
private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
Object adapter = null;
TransactionContext c = getTransactionContext();
try {
try {
// Make the filename file system safe.
String dirverName = c.getConnection().getMetaData().getDriverName();
dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
try {
adapter = (DefaultJDBCAdapter)factoryFinder.newInstance(dirverName);
LOG.info("Database driver recognized: [" + dirverName + "]");
adapter = finder.newInstance(dirverName);
LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
} catch (Throwable e) {
LOG.warn("Database driver NOT recognized: [" + dirverName
+ "]. Will use default JDBC implementation.");
LOG.warn("Database " + kind + " driver override not found for : [" + dirverName
+ "]. Will use default implementation.");
}
} catch (SQLException e) {
LOG
.warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: "
LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
+ e.getMessage());
JDBCPersistenceAdapter.log("Failure Details: ", e);
}
// Use the default JDBC adapter if the
// Database type is not recognized.
if (adapter == null) {
adapter = new DefaultJDBCAdapter();
}
} finally {
c.close();
}
@ -520,9 +527,12 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
}
protected DatabaseLocker createDatabaseLocker() throws IOException {
DefaultDatabaseLocker locker = new DefaultDatabaseLocker(this);
locker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
protected DatabaseLocker loadDataBaseLocker() throws IOException {
DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
if (locker == null) {
locker = new DefaultDatabaseLocker();
LOG.debug("Using default JDBC Locker: " + locker);
}
return locker;
}
@ -530,7 +540,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
public String toString() {
return "JDBCPersistenceAdaptor(" + super.toString() + ")";
return "JDBCPersistenceAdapter(" + super.toString() + ")";
}
public void setDirectory(File dir) {

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.store.jdbc.DefaultDatabaseLocker;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Represents an exclusive lock on a database to avoid multiple brokers running
* against the same logical database.
*
* @version $Revision$
*/
public class TransactDatabaseLocker extends DefaultDatabaseLocker {
private static final Log LOG = LogFactory.getLog(TransactDatabaseLocker.class);
public TransactDatabaseLocker() {
}
public TransactDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
setPersistenceAdapter(persistenceAdapter);
}
@Override
public void start() throws Exception {
stopping = false;
LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
PreparedStatement statement = null;
while (true) {
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
String sql = statements.getLockCreateStatement();
statement = connection.prepareStatement(sql);
if (statement.getMetaData() != null) {
ResultSet rs = statement.executeQuery();
// if not already locked the statement below blocks until lock acquired
rs.next();
} else {
statement.execute();
}
break;
} catch (Exception e) {
if (stopping) {
throw new Exception("Cannot start broker as being asked to shut down. Interrupted attempt to acquire lock: " + e, e);
}
if (exceptionHandler != null) {
try {
exceptionHandler.handle(e);
} catch (Throwable handlerException) {
LOG.error("The exception handler " + exceptionHandler.getClass().getCanonicalName() + " threw this exception: " + handlerException
+ " while trying to handle this excpetion: " + e, handlerException);
}
} else {
LOG.error("Failed to acquire lock: " + e, e);
}
} finally {
if (null != statement) {
try {
statement.close();
} catch (SQLException e1) {
LOG.warn("Caught while closing statement: " + e1, e1);
}
statement = null;
}
}
LOG.debug("Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again to get the lock...");
try {
Thread.sleep(lockAcquireSleepInterval);
} catch (InterruptedException ie) {
LOG.warn("Master lock retry sleep interrupted", ie);
}
}
LOG.info("Becoming the master on dataSource: " + dataSource);
}
}

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.store.jdbc.adapter.TransactDatabaseLocker

View File

@ -17,8 +17,13 @@
package org.apache.activemq.config;
import java.io.File;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.List;
import javax.sql.DataSource;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
@ -33,7 +38,9 @@ import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.DefaultDatabaseLocker;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.adapter.TransactDatabaseLocker;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.tcp.TcpTransportServer;
@ -43,6 +50,8 @@ import org.apache.activemq.xbean.BrokerFactoryBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
@ -146,6 +155,56 @@ public class ConfigTest extends TestCase {
}
}
public void testJdbcLockConfigOverride() throws Exception {
JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
Mockery context = new Mockery();
final DataSource dataSource = context.mock(DataSource.class);
final Connection connection = context.mock(Connection.class);
final DatabaseMetaData metadata = context.mock(DatabaseMetaData.class);
final ResultSet result = context.mock(ResultSet.class);
adapter.setDataSource(dataSource);
adapter.setCreateTablesOnStartup(false);
context.checking(new Expectations() {{
allowing (dataSource).getConnection(); will (returnValue(connection));
allowing (connection).getMetaData(); will (returnValue(metadata));
allowing (connection);
allowing (metadata).getDriverName(); will (returnValue("Microsoft_SQL_Server_2005_jdbc_driver"));
allowing (result).next(); will (returnValue(true));
}});
adapter.start();
assertTrue("has the locker override", adapter.getDatabaseLocker() instanceof TransactDatabaseLocker);
adapter.stop();
}
public void testJdbcLockConfigDefault() throws Exception {
JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
Mockery context = new Mockery();
final DataSource dataSource = context.mock(DataSource.class);
final Connection connection = context.mock(Connection.class);
final DatabaseMetaData metadata = context.mock(DatabaseMetaData.class);
final ResultSet result = context.mock(ResultSet.class);
adapter.setDataSource(dataSource);
adapter.setCreateTablesOnStartup(false);
context.checking(new Expectations() {{
allowing (dataSource).getConnection(); will (returnValue(connection));
allowing (connection).getMetaData(); will (returnValue(metadata));
allowing (connection);
allowing (metadata).getDriverName(); will (returnValue("Some_Unknown_driver"));
allowing (result).next(); will (returnValue(true));
}});
adapter.start();
assertEquals("has the default locker", adapter.getDatabaseLocker().getClass(), DefaultDatabaseLocker.class);
adapter.stop();
}
/*
* This tests configuring the different broker properties using
* xbeans-spring

View File

@ -51,12 +51,6 @@
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jmdns_1.0</artifactId>
</dependency>
-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
@ -65,23 +59,15 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>jmock</groupId>
<artifactId>jmock</artifactId>
<version>1.2.0</version>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jmock</groupId>
<artifactId>jmock-cglib</artifactId>
<version>1.2.0</version>
<groupId>org.jmock</groupId>
<artifactId>jmock-legacy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-full</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -1,104 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.ra;
import javax.jms.Connection;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import org.jmock.Mock;
import org.jmock.cglib.MockObjectTestCase;
/**
* @author <a href="mailto:michael.gaffney@panacya.com">Michael Gaffney </a>
*/
public class ActiveMQAsfEndpointWorkerTest extends MockObjectTestCase {
private Mock mockResourceAdapter;
private Mock mockActivationKey;
private Mock mockEndpointFactory;
private Mock mockBootstrapContext;
private ActiveMQActivationSpec stubActivationSpec;
// private Mock mockConnection;
public ActiveMQAsfEndpointWorkerTest(String name) {
setName(name);
}
public void testTopicSubscriberDurableNoDups() throws Exception {
// Constraint[] args = {isA(Topic.class),
// eq(stubActivationSpec.getSubscriptionId()),
// NULL,
// ANYTHING,
// ANYTHING};
// mockConnection.expects(once()).method("createDurableConnectionConsumer").with(args)
// .will(returnValue(null));
// worker.start();
// verifyMocks();
}
protected void setUp() throws Exception {
setupStubs();
setupMocks();
setupEndpointWorker();
}
private void setupStubs() {
stubActivationSpec = new ActiveMQActivationSpec();
stubActivationSpec.setDestination("some.topic");
stubActivationSpec.setDestinationType("javax.jms.Topic");
stubActivationSpec.setSubscriptionDurability(ActiveMQActivationSpec.DURABLE_SUBSCRIPTION);
stubActivationSpec.setClientId("foo");
stubActivationSpec.setSubscriptionName("bar");
}
private void setupMocks() {
mockResourceAdapter = mock(ActiveMQResourceAdapter.class);
mockActivationKey = mock(ActiveMQEndpointActivationKey.class);
mockEndpointFactory = mock(MessageEndpointFactory.class);
mockBootstrapContext = mock(BootstrapContext.class);
// mockConnection = mock(Connection.class);
mockActivationKey.expects(atLeastOnce()).method("getMessageEndpointFactory").will(returnValue((MessageEndpointFactory)mockEndpointFactory.proxy()));
mockActivationKey.expects(atLeastOnce()).method("getActivationSpec").will(returnValue(stubActivationSpec));
mockResourceAdapter.expects(atLeastOnce()).method("getBootstrapContext").will(returnValue((BootstrapContext)mockBootstrapContext.proxy()));
mockBootstrapContext.expects(atLeastOnce()).method("getWorkManager").will(returnValue(null));
final boolean isTransactedResult = true;
setupIsTransacted(isTransactedResult);
}
private void setupIsTransacted(final boolean transactedResult) {
mockEndpointFactory.expects(atLeastOnce()).method("isDeliveryTransacted").with(ANYTHING).will(returnValue(transactedResult));
}
private void setupEndpointWorker() throws Exception {
new ActiveMQEndpointWorker((ActiveMQResourceAdapter)mockResourceAdapter.proxy(), (ActiveMQEndpointActivationKey)mockActivationKey.proxy());
}
// private void verifyMocks() {
// mockResourceAdapter.verify();
// mockActivationKey.verify();
// mockEndpointFactory.verify();
// mockBootstrapContext.verify();
// mockConnection.verify();
// }
}

View File

@ -23,39 +23,49 @@ import javax.jms.MessageListener;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import org.jmock.Mock;
import org.jmock.MockObjectTestCase;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import junit.framework.TestCase;
/**
* @author <a href="mailto:michael.gaffney@panacya.com">Michael Gaffney </a>
*/
public class MessageEndpointProxyTest extends MockObjectTestCase {
@RunWith(JMock.class)
public class MessageEndpointProxyTest extends TestCase {
private Mock mockEndpoint;
private Mock stubMessage;
private MessageEndpoint mockEndpoint;
private EndpointAndListener mockEndpointAndListener;
private Message stubMessage;
private MessageEndpointProxy endpointProxy;
private Mockery context;
public MessageEndpointProxyTest(String name) {
super(name);
}
protected void setUp() {
mockEndpoint = new Mock(EndpointAndListener.class);
stubMessage = new Mock(Message.class);
endpointProxy = new MessageEndpointProxy((MessageEndpoint) mockEndpoint.proxy());
@Before
public void setUp() {
context = new Mockery();
mockEndpoint = context.mock(MessageEndpoint.class);
context.mock(MessageListener.class);
mockEndpointAndListener = context.mock(EndpointAndListener.class);
stubMessage = context.mock(Message.class);
endpointProxy = new MessageEndpointProxy(mockEndpointAndListener);
}
@Test
public void testInvalidConstruction() {
Mock mockEndpoint = new Mock(MessageEndpoint.class);
try {
new MessageEndpointProxy((MessageEndpoint) mockEndpoint.proxy());
new MessageEndpointProxy(mockEndpoint);
fail("An exception should have been thrown");
} catch (IllegalArgumentException e) {
assertTrue(true);
}
}
public void testSuccessfulCallSequence() {
@Test
public void testSuccessfulCallSequence() throws Exception {
setupBeforeDeliverySuccessful();
setupOnMessageSuccessful();
setupAfterDeliverySuccessful();
@ -65,11 +75,17 @@ public class MessageEndpointProxyTest extends MockObjectTestCase {
doAfterDeliveryExpectSuccess();
}
public void testBeforeDeliveryFailure() {
mockEndpoint.expects(once()).method("beforeDelivery").with(isA(Method.class))
.will(throwException(new ResourceException()));
mockEndpoint.expects(never()).method("onMessage");
mockEndpoint.expects(never()).method("afterDelivery");
@Test
public void testBeforeDeliveryFailure() throws Exception {
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).beforeDelivery(with(any(Method.class)));
will(throwException(new ResourceException()));
}});
context.checking(new Expectations() {{
never (mockEndpointAndListener).onMessage(null);
never (mockEndpointAndListener).afterDelivery();
}});
setupExpectRelease();
try {
@ -84,15 +100,20 @@ public class MessageEndpointProxyTest extends MockObjectTestCase {
doFullyDeadCheck();
}
public void testOnMessageFailure() {
@Test
public void testOnMessageFailure() throws Exception {
setupBeforeDeliverySuccessful();
mockEndpoint.expects(once()).method("onMessage").with(same(stubMessage.proxy()))
.will(throwException(new RuntimeException()));
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).onMessage(with(same(stubMessage)));
will(throwException(new RuntimeException()));
}});
setupAfterDeliverySuccessful();
doBeforeDeliveryExpectSuccess();
try {
endpointProxy.onMessage((Message) stubMessage.proxy());
endpointProxy.onMessage(stubMessage);
fail("An exception should have been thrown");
} catch (Exception e) {
assertTrue(true);
@ -101,11 +122,15 @@ public class MessageEndpointProxyTest extends MockObjectTestCase {
}
public void testAfterDeliveryFailure() {
@Test
public void testAfterDeliveryFailure() throws Exception {
setupBeforeDeliverySuccessful();
setupOnMessageSuccessful();
mockEndpoint.expects(once()).method("afterDelivery")
.will(throwException(new ResourceException()));
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).afterDelivery(); will(throwException(new ResourceException()));
}});
setupExpectRelease();
doBeforeDeliveryExpectSuccess();
@ -127,20 +152,28 @@ public class MessageEndpointProxyTest extends MockObjectTestCase {
doReleaseExpectInvalidMessageEndpointException();
}
private void setupAfterDeliverySuccessful() {
mockEndpoint.expects(once()).method("afterDelivery");
private void setupAfterDeliverySuccessful() throws Exception {
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).afterDelivery();
}});
}
private void setupOnMessageSuccessful() {
mockEndpoint.expects(once()).method("onMessage").with(same(stubMessage.proxy()));
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).onMessage(with(stubMessage));
}});
}
private void setupBeforeDeliverySuccessful() {
mockEndpoint.expects(once()).method("beforeDelivery").with(isA(Method.class));
private void setupBeforeDeliverySuccessful() throws Exception {
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).beforeDelivery(with(any(Method.class)));
}});
}
private void setupExpectRelease() {
mockEndpoint.expects(once()).method("release");
context.checking(new Expectations() {{
oneOf (mockEndpointAndListener).release();
}});
}
private void doBeforeDeliveryExpectSuccess() {
@ -153,7 +186,7 @@ public class MessageEndpointProxyTest extends MockObjectTestCase {
private void doOnMessageExpectSuccess() {
try {
endpointProxy.onMessage((Message) stubMessage.proxy());
endpointProxy.onMessage(stubMessage);
} catch (Exception e) {
fail("No exception should have been thrown");
}
@ -180,7 +213,7 @@ public class MessageEndpointProxyTest extends MockObjectTestCase {
private void doOnMessageExpectInvalidMessageEndpointException() {
try {
endpointProxy.onMessage((Message) stubMessage.proxy());
endpointProxy.onMessage(stubMessage);
fail("An InvalidMessageEndpointException should have been thrown");
} catch (InvalidMessageEndpointException e) {
assertTrue(true);

View File

@ -16,147 +16,66 @@
*/
package org.apache.activemq.ra;
import java.lang.reflect.Method;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.work.WorkManager;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.jmock.Mock;
import org.jmock.cglib.MockObjectTestCase;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* @version $Revision: 1.1.1.1 $
*/
public class ServerSessionImplTest extends MockObjectTestCase {
@RunWith(JMock.class)
public class ServerSessionImplTest extends TestCase {
private static final String BROKER_URL = "vm://localhost";
private ServerSessionImpl serverSession;
private Mock pool;
private Mock workManager;
private ServerSessionPoolImpl pool;
private WorkManager workManager;
private MessageEndpoint messageEndpoint;
private ActiveMQConnection con;
private ActiveMQSession session;
private Mockery context;
@Override
protected void setUp() throws Exception
@Before
public void setUp() throws Exception
{
super.setUp();
context = new Mockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
}};
org.apache.activemq.ActiveMQConnectionFactory factory =
new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
con = (ActiveMQConnection) factory.createConnection();
session = (ActiveMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
pool = mock(ServerSessionPoolImpl.class, new Class[]{ActiveMQEndpointWorker.class, int.class}, new Object[]{null, 10});
workManager = mock(WorkManager.class);
messageEndpoint = new MockMessageEndpoint();
pool = context.mock(ServerSessionPoolImpl.class);
workManager = context.mock(WorkManager.class);
serverSession = new ServerSessionImpl(
(ServerSessionPoolImpl) pool.proxy(),
(ServerSessionPoolImpl) pool,
session,
(WorkManager) workManager.proxy(),
(WorkManager) workManager,
messageEndpoint,
false,
10);
}
private class MockMessageEndpoint implements MessageEndpoint, MessageListener {
public void afterDelivery() throws ResourceException
{
throw new UnsupportedOperationException("Not supported yet.");
}
public void beforeDelivery(Method arg0) throws NoSuchMethodException, ResourceException
{
throw new UnsupportedOperationException("Not supported yet.");
}
public void release()
{
throw new UnsupportedOperationException("Not supported yet.");
}
public void onMessage(Message msg)
{
throw new UnsupportedOperationException("Not supported yet.");
}
}
/**
* Need to re-work this test case, it broke since the amq4 internals changed and
* mocks were being using against the internals.
*
*/
public void testDummy() {
}
/*
public void testBatch() throws Exception {
DummyActiveMQConnection connection = new DummyActiveMQConnection(new ActiveMQConnectionFactory(),
null,
null,
getMockTransportChannel());
ServerSessionPoolImpl pool = new ServerSessionPoolImpl(null, 1);
DummyActiveMQSession session = new DummyActiveMQSession(connection);
MemoryBoundedQueue queue = connection.getMemoryBoundedQueue("Session(" + session.getSessionId() + ")");
queue.enqueue(new ActiveMQTextMessage());
queue.enqueue(new ActiveMQTextMessage());
queue.enqueue(new ActiveMQTextMessage());
DummyMessageEndpoint endpoint = new DummyMessageEndpoint();
ServerSessionImpl serverSession = new ServerSessionImpl(pool, session, null, endpoint, true, 2);
serverSession.run();
assertEquals(2, endpoint.messagesPerBatch.size());
assertEquals(new Integer(2), endpoint.messagesPerBatch.get(0));
assertEquals(new Integer(1), endpoint.messagesPerBatch.get(1));
}
private class DummyMessageEndpoint implements MessageEndpoint, MessageListener {
protected List messagesPerBatch = new ArrayList();
protected int nbMessages = -1000;
public void beforeDelivery(Method arg0) throws NoSuchMethodException, ResourceException {
nbMessages = 0;
}
public void afterDelivery() throws ResourceException {
messagesPerBatch.add(new Integer(nbMessages));
nbMessages = -1000;
}
public void release() {
}
public void onMessage(Message arg0) {
nbMessages ++;
}
}
private class DummyActiveMQSession extends ActiveMQSession {
protected DummyActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
super(connection, sessionId, acknowledgeMode, asyncDispatch);
}
}
private class DummyActiveMQConnection extends ActiveMQConnection {
protected DummyActiveMQConnection(Transport transport, String userName, String password, JMSStatsImpl factoryStats) throws IOException {
super(transport, userName, password, factoryStats);
}
}
private TransportChannel getMockTransportChannel() {
Mock tc = new Mock(TransportChannel.class);
tc.expects(once()).method("setPacketListener");
tc.expects(once()).method("setExceptionListener");
tc.expects(once()).method("addTransportStatusEventListener");
tc.expects(atLeastOnce()).method("asyncSend");
tc.expects(atLeastOnce()).method("send");
return (TransportChannel) tc.proxy();
}
*/
@Test
public void testRunDetectsStoppedSession() throws Exception {
con.close();
pool.expects(once()).method("removeFromPool").with(eq(serverSession));
context.checking(new Expectations() {{
oneOf (pool).removeFromPool(with(same(serverSession)));
}});
serverSession.run();
pool.verify();
}
}
}

16
pom.xml
View File

@ -62,7 +62,7 @@
<hsqldb-version>1.7.2.2</hsqldb-version>
<jdom-version>1.0</jdom-version>
<jetty-version>6.1.9</jetty-version>
<jmock-version>1.0.1</jmock-version>
<jmock-version>2.5.1</jmock-version>
<junit-version>4.4</junit-version>
<jxta-version>2.0</jxta-version>
<log4j-version>1.2.14</log4j-version>
@ -789,23 +789,17 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>jmock</groupId>
<artifactId>jmock</artifactId>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<version>${jmock-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jmock</groupId>
<artifactId>jmock-cglib</artifactId>
<groupId>org.jmock</groupId>
<artifactId>jmock-legacy</artifactId>
<version>${jmock-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-full</artifactId>
<version>${cglib-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>