mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5534 - generic jms pool reconnection
This commit is contained in:
parent
189a75afdc
commit
b53d8ea295
|
@ -21,16 +21,14 @@ import java.util.List;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.*;
|
||||||
import javax.jms.IllegalStateException;
|
import javax.jms.IllegalStateException;
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.Session;
|
|
||||||
import javax.jms.TemporaryQueue;
|
|
||||||
import javax.jms.TemporaryTopic;
|
|
||||||
|
|
||||||
import org.apache.commons.pool.KeyedPoolableObjectFactory;
|
import org.apache.commons.pool.KeyedPoolableObjectFactory;
|
||||||
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
|
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
|
||||||
import org.apache.commons.pool.impl.GenericObjectPool;
|
import org.apache.commons.pool.impl.GenericObjectPool;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holds a real JMS connection along with the session pools associated with it.
|
* Holds a real JMS connection along with the session pools associated with it.
|
||||||
|
@ -40,7 +38,8 @@ import org.apache.commons.pool.impl.GenericObjectPool;
|
||||||
* that the temporary destinations of the managed Connection are purged when all references
|
* that the temporary destinations of the managed Connection are purged when all references
|
||||||
* to this ConnectionPool are released.
|
* to this ConnectionPool are released.
|
||||||
*/
|
*/
|
||||||
public class ConnectionPool {
|
public class ConnectionPool implements ExceptionListener {
|
||||||
|
private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
|
||||||
|
|
||||||
protected Connection connection;
|
protected Connection connection;
|
||||||
private int referenceCount;
|
private int referenceCount;
|
||||||
|
@ -54,6 +53,8 @@ public class ConnectionPool {
|
||||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
|
private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
|
||||||
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
|
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
|
||||||
|
private boolean reconnectOnException;
|
||||||
|
private ExceptionListener parentExceptionListener;
|
||||||
|
|
||||||
public ConnectionPool(Connection connection) {
|
public ConnectionPool(Connection connection) {
|
||||||
|
|
||||||
|
@ -334,6 +335,46 @@ public class ConnectionPool {
|
||||||
this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout);
|
this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the underlying connection will be renewed on JMSException, false otherwise
|
||||||
|
*/
|
||||||
|
public boolean isReconnectOnException() {
|
||||||
|
return reconnectOnException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Controls weather the underlying connection should be reset (and renewed) on JMSException
|
||||||
|
*
|
||||||
|
* @param reconnectOnException
|
||||||
|
* Boolean value that configures whether reconnect on exception should happen
|
||||||
|
*/
|
||||||
|
public void setReconnectOnException(boolean reconnectOnException) {
|
||||||
|
this.reconnectOnException = reconnectOnException;
|
||||||
|
try {
|
||||||
|
if (isReconnectOnException()) {
|
||||||
|
if (connection.getExceptionListener() != null) {
|
||||||
|
parentExceptionListener = connection.getExceptionListener();
|
||||||
|
}
|
||||||
|
connection.setExceptionListener(this);
|
||||||
|
} else {
|
||||||
|
if (parentExceptionListener != null) {
|
||||||
|
connection.setExceptionListener(parentExceptionListener);
|
||||||
|
}
|
||||||
|
parentExceptionListener = null;
|
||||||
|
}
|
||||||
|
} catch (JMSException jmse) {
|
||||||
|
LOG.warn("Cannot set reconnect exception listener", jmse);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(JMSException exception) {
|
||||||
|
close();
|
||||||
|
if (parentExceptionListener != null) {
|
||||||
|
parentExceptionListener.onException(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ConnectionPool[" + connection + "]";
|
return "ConnectionPool[" + connection + "]";
|
||||||
|
|
|
@ -77,6 +77,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
||||||
private long expiryTimeout = 0l;
|
private long expiryTimeout = 0l;
|
||||||
private boolean createConnectionOnStartup = true;
|
private boolean createConnectionOnStartup = true;
|
||||||
private boolean useAnonymousProducers = true;
|
private boolean useAnonymousProducers = true;
|
||||||
|
private boolean reconnectOnException = true;
|
||||||
|
|
||||||
// Temporary value used to always fetch the result of makeObject.
|
// Temporary value used to always fetch the result of makeObject.
|
||||||
private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null);
|
private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null);
|
||||||
|
@ -115,6 +116,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
||||||
connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
|
connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
|
||||||
}
|
}
|
||||||
connection.setUseAnonymousProducers(isUseAnonymousProducers());
|
connection.setUseAnonymousProducers(isUseAnonymousProducers());
|
||||||
|
connection.setReconnectOnException(isReconnectOnException());
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Created new connection: {}", connection);
|
LOG.trace("Created new connection: {}", connection);
|
||||||
|
@ -559,6 +561,23 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
||||||
this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
|
this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the underlying connection will be renewed on JMSException, false otherwise
|
||||||
|
*/
|
||||||
|
public boolean isReconnectOnException() {
|
||||||
|
return reconnectOnException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Controls weather the underlying connection should be reset (and renewed) on JMSException
|
||||||
|
*
|
||||||
|
* @param reconnectOnException
|
||||||
|
* Boolean value that configures whether reconnect on exception should happen
|
||||||
|
*/
|
||||||
|
public void setReconnectOnException(boolean reconnectOnException) {
|
||||||
|
this.reconnectOnException = reconnectOnException;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by any superclass that implements a JNDIReferencable or similar that needs to collect
|
* Called by any superclass that implements a JNDIReferencable or similar that needs to collect
|
||||||
* the properties of this class for storage etc.
|
* the properties of this class for storage etc.
|
||||||
|
@ -577,5 +596,6 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
||||||
props.setProperty("createConnectionOnStartup", Boolean.toString(isCreateConnectionOnStartup()));
|
props.setProperty("createConnectionOnStartup", Boolean.toString(isCreateConnectionOnStartup()));
|
||||||
props.setProperty("useAnonymousProducers", Boolean.toString(isUseAnonymousProducers()));
|
props.setProperty("useAnonymousProducers", Boolean.toString(isUseAnonymousProducers()));
|
||||||
props.setProperty("blockIfSessionPoolIsFullTimeout", Long.toString(getBlockIfSessionPoolIsFullTimeout()));
|
props.setProperty("blockIfSessionPoolIsFullTimeout", Long.toString(getBlockIfSessionPoolIsFullTimeout()));
|
||||||
|
props.setProperty("reconnectOnException", Boolean.toString(isReconnectOnException()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue