[AMQ-9239] jakarta.jms - activemq-spring changes

This commit is contained in:
Matt Pavlovich 2023-03-30 11:37:38 -05:00
parent 5473ea58e5
commit af07efe27d
26 changed files with 1169 additions and 86 deletions

View File

@ -31,7 +31,7 @@
<properties>
<activemq.osgi.import>
javax.transaction*;resolution:=optional,
jakarta.transaction*;resolution:=optional,
org.apache.geronimo.transaction.manager*;resolution:=optional,
org.springframework*;resolution:=optional
</activemq.osgi.import>
@ -39,6 +39,15 @@
<dependencies>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<optional>true</optional>
</dependency>
<!-- =============================== -->
<!-- Required Dependencies -->
<!-- =============================== -->
@ -103,19 +112,54 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-connector</artifactId>
<classifier>jakarta</classifier>
<version>3.1.5</version>
<scope>test</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.1_spec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-connector_1.6_spec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-validation_1.0_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
<classifier>jakarta</classifier>
<scope>test</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-connector_1.6_spec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.1_spec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-annotation_1.3_spec</artifactId>
<optional>true</optional>
<groupId>org.objectweb.howl</groupId>
<artifactId>howl</artifactId>
<version>1.0.1-1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@ -152,12 +196,14 @@
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<!--
<dependency>
<groupId>org.jencks</groupId>
<artifactId>jencks</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>
-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>

View File

@ -16,10 +16,10 @@
*/
package org.apache.activemq.pool;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.ConnectionFactory;
import javax.transaction.TransactionManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.jms.ConnectionFactory;
import jakarta.transaction.TransactionManager;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.security;
import javax.annotation.PostConstruct;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.InitializingBean;

View File

@ -18,7 +18,7 @@ package org.apache.activemq.security;
import java.util.List;
import javax.annotation.PostConstruct;
import jakarta.annotation.PostConstruct;
import org.apache.activemq.filter.DestinationMapEntry;
import org.springframework.beans.factory.InitializingBean;

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.spring;
import javax.annotation.PostConstruct;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.BeanNameAware;

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.spring;
import javax.annotation.PostConstruct;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.BeanNameAware;

View File

@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import javax.annotation.PostConstruct;
import jakarta.annotation.PostConstruct;
import javax.net.ssl.*;
import org.apache.activemq.broker.SslContext;

View File

@ -18,8 +18,8 @@ package org.apache.activemq.xbean;
import java.io.IOException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;

View File

@ -83,6 +83,8 @@ http\://activemq.apache.org/schema/core/activemq-core-5.17.2.xsd=activemq.xsd
http\://activemq.apache.org/schema/core/activemq-core-5.17.3.xsd=activemq.xsd
http\://activemq.apache.org/schema/core/activemq-core-5.17.4.xsd=activemq.xsd
http\://activemq.apache.org/schema/core/activemq-core-5.18.0.xsd=activemq.xsd
http\://activemq.apache.org/schema/core/activemq-core-5.18.1.xsd=activemq.xsd
http\://activemq.apache.org/schema/core/activemq-core-5.19.0.xsd=activemq.xsd
http\://camel.apache.org/schema/osgi/camel-osgi.xsd=camel-osgi.xsd
http\://camel.apache.org/schema/spring/camel-spring.xsd=camel-spring.xsd

View File

@ -20,9 +20,9 @@ import org.apache.activemq.command.ActiveMQTextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import java.util.ArrayList;
import java.util.List;

View File

@ -28,8 +28,8 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import javax.jms.*;
import jakarta.annotation.Resource;
import jakarta.jms.*;
@RunWith(SpringJUnit4ClassRunner.class)

View File

@ -35,10 +35,10 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import jakarta.annotation.Resource;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;
import java.util.Arrays;
@RunWith(SpringJUnit4ClassRunner.class)
@ -112,14 +112,13 @@ public class ParallelXATransactionTest {
}
});
Thread.sleep((long) (Math.random() * SLEEP));
LOG.info("P1: Send msg to " + queue + "," + AUDIT);
LOG.info("P1: Send msg to " + queue + "," + AUDIT + " thread:" + Thread.currentThread().getName());
}
} catch (Exception e) {
Assert.fail("Exception occurred " + e);
e.printStackTrace();
Assert.fail("Exception occurred in thread: " + Thread.currentThread().getName() + " exception:" + e);
}
}
});
} catch (TransactionException e) {
@ -137,8 +136,9 @@ public class ParallelXATransactionTest {
ProducerThread t1 = new ProducerThread(jmsTemplate, txManager);
t1.setName("XaTX");
ProducerThread t2 = new ProducerThread(jmsTemplate2, txManager2);
t2.setName("LocalTX");
t1.start();
t2.start();

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spring.geronimo;
import jakarta.resource.ResourceException;
import jakarta.resource.spi.ConnectionManager;
import jakarta.resource.spi.ManagedConnectionFactory;
import org.springframework.beans.factory.FactoryBean;
/**
* @org.apache.xbean.XBean element="connectionFactory"
*/
public class ConnectionFactoryFactoryBean implements FactoryBean {
private ManagedConnectionFactory managedConnectionFactory;
private ConnectionManager connectionManager;
private Object connectionFactory;
public ManagedConnectionFactory getManagedConnectionFactory() {
return managedConnectionFactory;
}
public void setManagedConnectionFactory(ManagedConnectionFactory managedConnectionFactory) {
this.managedConnectionFactory = managedConnectionFactory;
}
public ConnectionManager getConnectionManager() {
return connectionManager;
}
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
public Object getObject() throws Exception {
return getConnectionFactory();
}
public Class<?> getObjectType() {
try {
Object connectionFactory = getConnectionFactory();
if (connectionFactory != null) {
return connectionFactory.getClass();
}
} catch (ResourceException e) {
}
return null;
}
public boolean isSingleton() {
return true;
}
public Object getConnectionFactory() throws ResourceException {
// we must initialize the connection factory outside of the getObject method since spring needs the
// connetion factory type for autowiring before we have created the bean
if (connectionFactory == null) {
// START SNIPPET: cf
if (managedConnectionFactory == null) {
throw new NullPointerException("managedConnectionFactory is null");
}
if (connectionManager != null) {
connectionFactory = managedConnectionFactory.createConnectionFactory(connectionManager);
} else {
connectionFactory = managedConnectionFactory.createConnectionFactory();
}
// END SNIPPET: cf
}
return connectionFactory;
}
}

View File

@ -0,0 +1,365 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spring.geronimo;
import jakarta.resource.ResourceException;
import jakarta.resource.spi.ConnectionManager;
import jakarta.resource.spi.ConnectionRequestInfo;
import jakarta.resource.spi.ManagedConnection;
import jakarta.resource.spi.ManagedConnectionFactory;
import jakarta.transaction.TransactionManager;
import java.io.PrintWriter;
import java.util.Set;
import javax.security.auth.Subject;
import org.apache.geronimo.connector.outbound.GenericConnectionManager;
import org.apache.geronimo.connector.outbound.SubjectSource;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.NoTransactions;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PoolingSupport;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.TransactionSupport;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.LocalTransactions;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.NoPool;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.SinglePool;
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PartitionedPool;
import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.FatalBeanException;
/**
* This FactoryBean creates a local JCA connection factory outside
* a J2EE application server.
* <p/>
* The connection manager will be then injected in the
* LocalConnectionFactoryBean, class of the JCA support of Spring.
*
* @author Thierry Templier
* @see org.apache.geronimo.spring.factory.ConnectionFactoryFactoryBean#setConnectionManager(ConnectionManager)
* @org.apache.xbean.XBean element="connectionManager"
*/
public class ConnectionManagerFactoryBean implements FactoryBean, InitializingBean, DisposableBean {
private GenericConnectionManager connectionManager;
private RecoverableTransactionManager transactionManager;
private TransactionSupport transactionSupport;
private String transaction;
private SubjectSource subjectSource;
private ConnectionTracker connectionTracker;
private PoolingSupport poolingSupport;
private boolean pooling = true;
private String partitionStrategy; //: none, by-subject, by-connector-properties
private int poolMaxSize = 10;
private int poolMinSize = 0;
private boolean allConnectionsEqual = true;
private int connectionMaxWaitMilliseconds = 5000;
private int connectionMaxIdleMinutes = 15;
public Object getObject() throws Exception {
if (connectionManager == null) {
if (transactionManager == null) {
throw new NullPointerException("transactionManager is null");
}
// Instanciate the Geronimo Connection Manager
this.connectionManager = new GenericConnectionManager(
transactionSupport,
poolingSupport,
subjectSource,
connectionTracker,
transactionManager,
new ManagedConnectionFactory() {
@Override
public void setLogWriter(PrintWriter out) throws ResourceException {
// TODO Auto-generated method stub
}
@Override
public ManagedConnection matchManagedConnections(Set connectionSet, Subject subject,
ConnectionRequestInfo cxRequestInfo) throws ResourceException {
// TODO Auto-generated method stub
return null;
}
@Override
public PrintWriter getLogWriter() throws ResourceException {
// TODO Auto-generated method stub
return null;
}
@Override
public ManagedConnection createManagedConnection(Subject subject, ConnectionRequestInfo cxRequestInfo)
throws ResourceException {
// TODO Auto-generated method stub
return null;
}
@Override
public Object createConnectionFactory(ConnectionManager cxManager) throws ResourceException {
// TODO Auto-generated method stub
return null;
}
@Override
public Object createConnectionFactory() throws ResourceException {
// TODO Auto-generated method stub
return null;
}
}, // ManagedConnectionFactory
getClass().getName(),
getClass().getClassLoader());
connectionManager.doStart();
}
return connectionManager;
}
public void destroy() throws Exception {
if (connectionManager != null) {
connectionManager.doStop();
connectionManager = null;
}
}
public Class<?> getObjectType() {
return ConnectionManager.class;
}
public boolean isSingleton() {
return true;
}
public PoolingSupport getPoolingSupport() {
return poolingSupport;
}
/**
* Set the pooling support for the Geronimo Connection Manager.
* Geronimo provides two kinds of pool: single and partitioned.
*
* @see org.apache.geronimo.connector.outbound.connectionmanagerconfig.SinglePool
* @see org.apache.geronimo.connector.outbound.connectionmanagerconfig.PartitionedPool
*/
public void setPoolingSupport(PoolingSupport support) {
poolingSupport = support;
}
public RecoverableTransactionManager getTransactionManager() {
return transactionManager;
}
/**
* Set the transaction manager for the Geronimo Connection Manager.
*/
public void setTransactionManager(RecoverableTransactionManager manager) {
transactionManager = manager;
}
public String getTransaction() {
return transaction;
}
public void setTransaction(String transaction) {
this.transaction = transaction;
}
public TransactionSupport getTransactionSupport() {
return transactionSupport;
}
/**
* Set the transaction support for the Geronimo Connection Manager.
* Geronimo provides in this case three kinds of support like the
* JCA specification: no transaction, local transactions, XA transactions.
*
* @see NoTransactions
* @see org.apache.geronimo.connector.outbound.connectionmanagerconfig.LocalTransactions
* @see org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions
*/
public void setTransactionSupport(TransactionSupport support) {
transactionSupport = support;
}
public ConnectionTracker getConnectionTracker() {
return connectionTracker;
}
/**
* Set the connection tracker for the Geronimo Connection Manager.
*/
public void setConnectionTracker(ConnectionTracker tracker) {
connectionTracker = tracker;
}
/**
* Enables/disables container managed security
*/
public void setContainerManagedSecurity(boolean containerManagedSecurity) {
// TODO: warn for deprecated method
}
public SubjectSource getSubjectSource() {
return subjectSource;
}
public void setSubjectSource(SubjectSource subjectSource) {
this.subjectSource = subjectSource;
}
public boolean isPooling() {
return pooling;
}
public void setPooling(boolean pooling) {
this.pooling = pooling;
}
public String getPartitionStrategy() {
return partitionStrategy;
}
public void setPartitionStrategy(String partitionStrategy) {
this.partitionStrategy = partitionStrategy;
}
public int getPoolMaxSize() {
return poolMaxSize;
}
public void setPoolMaxSize(int poolMaxSize) {
this.poolMaxSize = poolMaxSize;
}
public int getPoolMinSize() {
return poolMinSize;
}
public void setPoolMinSize(int poolMinSize) {
this.poolMinSize = poolMinSize;
}
public boolean isAllConnectionsEqual() {
return allConnectionsEqual;
}
public void setAllConnectionsEqual(boolean allConnectionsEqual) {
this.allConnectionsEqual = allConnectionsEqual;
}
public int getConnectionMaxWaitMilliseconds() {
return connectionMaxWaitMilliseconds;
}
public void setConnectionMaxWaitMilliseconds(int connectionMaxWaitMilliseconds) {
this.connectionMaxWaitMilliseconds = connectionMaxWaitMilliseconds;
}
public int getConnectionMaxIdleMinutes() {
return connectionMaxIdleMinutes;
}
public void setConnectionMaxIdleMinutes(int connectionMaxIdleMinutes) {
this.connectionMaxIdleMinutes = connectionMaxIdleMinutes;
}
/**
* This method checks all the needed parameters to construct
* the Geronimo connection manager which is implemented by the
* GenericConnectionManager class.
* If the transaction support property is not set, the method
* configures the connection manager with the no transaction value.
* If the pooling support property is not set, the method
* configures the connection manager with the no pool value.
* If the realm bridge is not set, the method configure
* the connection manager with an identity realm bridge.
*
* @see GenericConnectionManager
*/
public void afterPropertiesSet() throws Exception {
// Apply the default value for property if necessary
if (this.transactionSupport == null) {
// No transaction
this.transactionSupport = createTransactionSupport(transaction);
}
if (this.poolingSupport == null) {
// No pool
if (!pooling) {
poolingSupport = new NoPool();
} else {
if (partitionStrategy == null || "none".equalsIgnoreCase(partitionStrategy)) {
// unpartitioned pool
poolingSupport = new SinglePool(poolMaxSize,
poolMinSize,
connectionMaxWaitMilliseconds,
connectionMaxIdleMinutes,
allConnectionsEqual,
!allConnectionsEqual,
false);
} else if ("by-connector-properties".equalsIgnoreCase(partitionStrategy)) {
// partition by contector properties such as username and password on a jdbc connection
poolingSupport = new PartitionedPool(poolMaxSize,
poolMinSize,
connectionMaxWaitMilliseconds,
connectionMaxIdleMinutes,
allConnectionsEqual,
!allConnectionsEqual,
false,
true,
false);
} else if ("by-subject".equalsIgnoreCase(partitionStrategy)) {
// partition by caller subject
poolingSupport = new PartitionedPool(poolMaxSize,
poolMinSize,
connectionMaxWaitMilliseconds,
connectionMaxIdleMinutes,
allConnectionsEqual,
!allConnectionsEqual,
false,
false,
true);
} else {
throw new FatalBeanException("Unknown partition strategy " + partitionStrategy);
}
}
}
}
public static TransactionSupport createTransactionSupport(String transaction) {
if (transaction == null || "local".equalsIgnoreCase(transaction)) {
return LocalTransactions.INSTANCE;
} else if ("none".equalsIgnoreCase(transaction)) {
return NoTransactions.INSTANCE;
} else if ("xa".equalsIgnoreCase(transaction)) {
return new XATransactions(true, false);
} else {
throw new FatalBeanException("Unknown transaction type " + transaction);
}
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spring.geronimo;
import org.springframework.beans.factory.FactoryBean;
import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTrackingCoordinator;
import org.apache.geronimo.connector.outbound.connectiontracking.GeronimoTransactionListener;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
/**
* @org.apache.xbean.XBean element="connectionTracker"
*/
public class ConnectionTrackerFactoryBean implements FactoryBean {
private ConnectionTrackingCoordinator coordinator;
private GeronimoTransactionManager geronimoTransactionManager;
public Object getObject() throws Exception {
if (coordinator == null) {
coordinator = new ConnectionTrackingCoordinator();
if (geronimoTransactionManager != null) {
GeronimoTransactionListener transactionListener = new GeronimoTransactionListener(coordinator);
geronimoTransactionManager.addTransactionAssociationListener(transactionListener);
}
}
return coordinator;
}
public Class<?> getObjectType() {
return ConnectionTrackingCoordinator.class;
}
public boolean isSingleton() {
return true;
}
public GeronimoTransactionManager getGeronimoTransactionManager() {
return geronimoTransactionManager;
}
public void setGeronimoTransactionManager(GeronimoTransactionManager geronimoTransactionManager) {
this.geronimoTransactionManager = geronimoTransactionManager;
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spring.geronimo;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import jakarta.transaction.Status;
import jakarta.transaction.SystemException;
import jakarta.transaction.Transaction;
import javax.transaction.xa.XAException;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.apache.geronimo.transaction.manager.TransactionLog;
import org.apache.geronimo.transaction.manager.TransactionManagerMonitor;
import org.apache.geronimo.transaction.manager.XidFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.jta.JtaTransactionManager;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* @version $Revision$ $Date$
*/
public class GeronimoPlatformTransactionManager extends GeronimoTransactionManager implements PlatformTransactionManager {
private final PlatformTransactionManager platformTransactionManager;
private final Map<Transaction, SuspendedResourcesHolder> suspendedResources = new ConcurrentHashMap<Transaction, SuspendedResourcesHolder>();
public GeronimoPlatformTransactionManager() throws XAException {
platformTransactionManager = new JtaTransactionManager(this, this);
registerTransactionAssociationListener();
}
public GeronimoPlatformTransactionManager(int defaultTransactionTimeoutSeconds) throws XAException {
super(defaultTransactionTimeoutSeconds);
platformTransactionManager = new JtaTransactionManager(this, this);
registerTransactionAssociationListener();
}
public GeronimoPlatformTransactionManager(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
super(defaultTransactionTimeoutSeconds, transactionLog);
platformTransactionManager = new JtaTransactionManager(this, this);
registerTransactionAssociationListener();
}
public GeronimoPlatformTransactionManager(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException {
super(defaultTransactionTimeoutSeconds, xidFactory, transactionLog);
platformTransactionManager = new JtaTransactionManager(this, this);
registerTransactionAssociationListener();
}
public TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
return platformTransactionManager.getTransaction(definition);
}
public void commit(TransactionStatus status) throws TransactionException {
platformTransactionManager.commit(status);
}
public void rollback(TransactionStatus status) throws TransactionException {
platformTransactionManager.rollback(status);
}
protected void registerTransactionAssociationListener() {
addTransactionAssociationListener(new TransactionManagerMonitor() {
public void threadAssociated(Transaction transaction) {
try {
if (transaction.getStatus() == Status.STATUS_ACTIVE) {
SuspendedResourcesHolder holder = suspendedResources.remove(transaction);
if (holder != null && holder.getSuspendedSynchronizations() != null) {
TransactionSynchronizationManager.setActualTransactionActive(true);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(holder.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(holder.getName());
TransactionSynchronizationManager.initSynchronization();
for (Iterator<?> it = holder.getSuspendedSynchronizations().iterator(); it.hasNext();) {
TransactionSynchronization synchronization = (TransactionSynchronization) it.next();
synchronization.resume();
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
}
} catch (SystemException e) {
return;
}
}
public void threadUnassociated(Transaction transaction) {
try {
if (transaction.getStatus() == Status.STATUS_ACTIVE) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<?> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
for (Iterator<?> it = suspendedSynchronizations.iterator(); it.hasNext();) {
((TransactionSynchronization) it.next()).suspend();
}
TransactionSynchronizationManager.clearSynchronization();
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
TransactionSynchronizationManager.setActualTransactionActive(false);
SuspendedResourcesHolder holder = new SuspendedResourcesHolder(null, suspendedSynchronizations, name, readOnly);
suspendedResources.put(transaction, holder);
}
}
} catch (SystemException e) {
return;
}
}
});
}
/**
* Holder for suspended resources.
* Used internally by <code>suspend</code> and <code>resume</code>.
*/
private static class SuspendedResourcesHolder {
private final Object suspendedResources;
private final List<?> suspendedSynchronizations;
private final String name;
private final boolean readOnly;
public SuspendedResourcesHolder(
Object suspendedResources, List<?> suspendedSynchronizations, String name, boolean readOnly) {
this.suspendedResources = suspendedResources;
this.suspendedSynchronizations = suspendedSynchronizations;
this.name = name;
this.readOnly = readOnly;
}
public Object getSuspendedResources() {
return suspendedResources;
}
public List<?> getSuspendedSynchronizations() {
return suspendedSynchronizations;
}
public String getName() {
return name;
}
public boolean isReadOnly() {
return readOnly;
}
}
}

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spring.geronimo;
import java.io.File;
import org.apache.geronimo.transaction.log.HOWLLog;
import org.apache.geronimo.transaction.manager.TransactionLog;
import org.apache.geronimo.transaction.manager.XidFactory;
import org.apache.geronimo.transaction.manager.XidFactoryImpl;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
/**
* @version $Revision$ $Date$
*/
public class HowlLogFactoryBean implements FactoryBean, DisposableBean {
private HOWLLog howlLog;
private String logFileDir;
private XidFactory xidFactory;
private String bufferClassName = "org.objectweb.howl.log.BlockLogBuffer";
private int bufferSizeKBytes = 32;
private boolean checksumEnabled = true;
private boolean adler32Checksum = true;
private int flushSleepTimeMilliseconds = 50;
private String logFileExt = "log";
private String logFileName = "transaction";
private int maxBlocksPerFile = -1;
private int maxLogFiles = 2;
private int maxBuffers = 0;
private int minBuffers = 4;
private int threadsWaitingForceThreshold = -1;
private File serverBaseDir;
public HowlLogFactoryBean() {
String serverDir = System.getProperty("jencks.server.dir", System.getProperty("basedir", System.getProperty("user.dir")));
serverBaseDir = new File(serverDir);
}
public Object getObject() throws Exception {
if (howlLog == null) {
howlLog = new HOWLLog(bufferClassName,
bufferSizeKBytes,
checksumEnabled,
adler32Checksum,
flushSleepTimeMilliseconds,
logFileDir,
logFileExt,
logFileName,
maxBlocksPerFile,
maxBuffers,
maxLogFiles,
minBuffers,
threadsWaitingForceThreshold,
xidFactory != null ? xidFactory : createXidFactory(),
serverBaseDir);
howlLog.doStart();
}
return howlLog;
}
public void destroy() throws Exception {
if (howlLog != null) {
howlLog.doStop();
howlLog = null;
}
}
public Class<?> getObjectType() {
return TransactionLog.class;
}
public boolean isSingleton() {
return true;
}
public String getBufferClassName() {
return bufferClassName;
}
public void setBufferClassName(String bufferClassName) {
this.bufferClassName = bufferClassName;
}
public int getBufferSizeKBytes() {
return bufferSizeKBytes;
}
public void setBufferSizeKBytes(int bufferSizeKBytes) {
this.bufferSizeKBytes = bufferSizeKBytes;
}
public boolean isChecksumEnabled() {
return checksumEnabled;
}
public void setChecksumEnabled(boolean checksumEnabled) {
this.checksumEnabled = checksumEnabled;
}
public boolean isAdler32Checksum() {
return adler32Checksum;
}
public void setAdler32Checksum(boolean adler32Checksum) {
this.adler32Checksum = adler32Checksum;
}
public int getFlushSleepTimeMilliseconds() {
return flushSleepTimeMilliseconds;
}
public void setFlushSleepTimeMilliseconds(int flushSleepTimeMilliseconds) {
this.flushSleepTimeMilliseconds = flushSleepTimeMilliseconds;
}
public String getLogFileDir() {
return logFileDir;
}
public void setLogFileDir(String logFileDir) {
this.logFileDir = logFileDir;
}
public String getLogFileExt() {
return logFileExt;
}
public void setLogFileExt(String logFileExt) {
this.logFileExt = logFileExt;
}
public String getLogFileName() {
return logFileName;
}
public void setLogFileName(String logFileName) {
this.logFileName = logFileName;
}
public int getMaxBlocksPerFile() {
return maxBlocksPerFile;
}
public void setMaxBlocksPerFile(int maxBlocksPerFile) {
this.maxBlocksPerFile = maxBlocksPerFile;
}
public int getMaxBuffers() {
return maxBuffers;
}
public void setMaxBuffers(int maxBuffers) {
this.maxBuffers = maxBuffers;
}
public int getMaxLogFiles() {
return maxLogFiles;
}
public void setMaxLogFiles(int maxLogFiles) {
this.maxLogFiles = maxLogFiles;
}
public int getMinBuffers() {
return minBuffers;
}
public void setMinBuffers(int minBuffers) {
this.minBuffers = minBuffers;
}
public int getThreadsWaitingForceThreshold() {
return threadsWaitingForceThreshold;
}
public void setThreadsWaitingForceThreshold(int threadsWaitingForceThreshold) {
this.threadsWaitingForceThreshold = threadsWaitingForceThreshold;
}
public XidFactory getXidFactory() {
return xidFactory;
}
public void setXidFactory(XidFactory xidFactory) {
this.xidFactory = xidFactory;
}
public File getServerBaseDir() {
return serverBaseDir;
}
public void setServerBaseDir(File serverBaseDir) {
this.serverBaseDir = serverBaseDir;
}
public static XidFactory createXidFactory() {
XidFactory xidFactory;
xidFactory = new XidFactoryImpl();
return xidFactory;
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spring.geronimo;
import java.util.Collection;
import org.apache.geronimo.transaction.log.UnrecoverableLog;
import org.apache.geronimo.transaction.log.HOWLLog;
import org.apache.geronimo.transaction.manager.TransactionLog;
import org.apache.geronimo.transaction.manager.XidFactory;
import org.apache.geronimo.transaction.manager.XidFactoryImpl;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.DisposableBean;
/**
* This FactoryBean creates and configures the Geronimo implementation
* of the TransactionManager interface.
*
* @author Thierry Templier
* @see UnrecoverableLog
* @see org.apache.geronimo.transaction.log.HOWLLog
* @org.apache.xbean.XBean element="transactionManager"
*/
public class TransactionManagerFactoryBean implements FactoryBean, InitializingBean, DisposableBean {
private GeronimoPlatformTransactionManager transactionManager;
private int defaultTransactionTimeoutSeconds = 600;
private XidFactory xidFactory;
private TransactionLog transactionLog;
private String transactionLogDir;
private boolean createdTransactionLog;
public Object getObject() throws Exception {
if (transactionManager == null) {
this.transactionManager = new GeronimoPlatformTransactionManager(
defaultTransactionTimeoutSeconds,
xidFactory,
transactionLog);
}
return transactionManager;
}
public void destroy() throws Exception {
if (createdTransactionLog && transactionLog instanceof HOWLLog) {
((HOWLLog)transactionLog).doStop();
}
}
public Class<?> getObjectType() {
return GeronimoPlatformTransactionManager.class;
}
public boolean isSingleton() {
return true;
}
/**
* Set the default transaction timeout in second.
*/
public void setDefaultTransactionTimeoutSeconds(int timeout) {
defaultTransactionTimeoutSeconds = timeout;
}
/**
* Set the transaction log for the transaction context manager.
*/
public void setTransactionLog(TransactionLog log) {
transactionLog = log;
}
public String getTransactionLogDir() {
return transactionLogDir;
}
public void setTransactionLogDir(String transactionLogDir) {
this.transactionLogDir = transactionLogDir;
}
public XidFactory getXidFactory() {
return xidFactory;
}
public void setXidFactory(XidFactory xidFactory) {
this.xidFactory = xidFactory;
}
/**
* Set the resource managers
*/
public void setResourceManagers(Collection<?> resourceManagers) {
// TODO: warn about deprecated method
}
public void afterPropertiesSet() throws Exception {
if (transactionLog == null) {
transactionLog = createTransactionLog(xidFactory, transactionLogDir);
createdTransactionLog = true;
}
if (xidFactory == null) {
xidFactory = new XidFactoryImpl();
}
}
public static TransactionLog createTransactionLog(XidFactory xidFactory, String logDir) throws Exception {
if (logDir == null) {
return new UnrecoverableLog();
} else {
HowlLogFactoryBean howlLogFactoryBean = new HowlLogFactoryBean();
howlLogFactoryBean.setXidFactory(xidFactory);
howlLogFactoryBean.setLogFileDir(logDir);
return (TransactionLog) howlLogFactoryBean.getObject();
}
}
}

View File

@ -26,13 +26,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import jakarta.jms.BytesMessage;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
@ -97,6 +97,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
container1.start();
Thread.sleep(2000);
@ -148,6 +149,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
container1.start();
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
@ -198,8 +200,11 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
container1.start();
container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
container2.afterPropertiesSet();
container2.start();
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
@ -244,7 +249,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
// entry.setQueue(QUEUE1_NAME);
entry.setMemoryLimit(1000);
entry.setMemoryLimit(1_000);
policyEntries.add(entry);
final PolicyMap policyMap = new PolicyMap();

View File

@ -26,13 +26,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import jakarta.jms.BytesMessage;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;

View File

@ -26,13 +26,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import jakarta.jms.BytesMessage;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;

View File

@ -19,15 +19,15 @@ package org.apache.bugs;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import junit.framework.TestCase;

View File

@ -22,11 +22,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import junit.framework.TestCase;

View File

@ -26,11 +26,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;

View File

@ -20,9 +20,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<amq:broker brokerName="test" useJmx="true" persistent="false">

View File

@ -20,9 +20,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- broker -->
@ -44,7 +44,6 @@
<property name="connectionFactory" ref="connectionFactory2"/>
</bean>
<bean id="connectionFactory2" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="smx"/>
@ -57,21 +56,21 @@
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="transactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
<bean id="transactionManager" class="org.apache.activemq.spring.geronimo.TransactionManagerFactoryBean">
<property name="defaultTransactionTimeoutSeconds" value="300"/>
</bean>
<bean id="connectionFactory" class="org.jencks.factory.ConnectionFactoryFactoryBean">
<bean id="connectionFactory" class="org.apache.activemq.spring.geronimo.ConnectionFactoryFactoryBean">
<property name="connectionManager" ref="jmsConnectionManager"/>
<property name="managedConnectionFactory" ref="jmsManagedConnectionFactory"/>
</bean>
<bean id="jmsConnectionManager" class="org.jencks.factory.ConnectionManagerFactoryBean">
<bean id="jmsConnectionManager" class="org.apache.activemq.spring.geronimo.ConnectionManagerFactoryBean">
<property name="transaction" value="xa"/>
<property name="transactionManager" ref="transactionManager"/>
<property name="poolMaxSize" value="20"/>
<property name="connectionTracker">
<bean class="org.jencks.factory.ConnectionTrackerFactoryBean">
<bean class="org.apache.activemq.spring.geronimo.ConnectionTrackerFactoryBean">
<property name="geronimoTransactionManager" ref="transactionManager"/>
</bean>
</property>
@ -87,4 +86,4 @@
</property>
</bean>
</beans>
</beans>