git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@664082 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-06-06 18:59:11 +00:00
parent 944145b191
commit 58e5b9acce
6 changed files with 158 additions and 97 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.ra;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
@ -61,24 +62,23 @@ public class ActiveMQEndpointWorker {
}
}
protected MessageResourceAdapter adapter;
protected ActiveMQEndpointActivationKey endpointActivationKey;
protected MessageEndpointFactory endpointFactory;
protected WorkManager workManager;
protected boolean transacted;
protected ActiveMQConnection connection;
protected final ActiveMQEndpointActivationKey endpointActivationKey;
protected final MessageEndpointFactory endpointFactory;
protected final WorkManager workManager;
protected final boolean transacted;
private final ActiveMQDestination dest;
private final Work connectWork;
private final AtomicBoolean connecting = new AtomicBoolean(false);
private final String shutdownMutex = "shutdownMutex";
private ActiveMQConnection connection;
private ConnectionConsumer consumer;
private ServerSessionPoolImpl serverSessionPool;
private ActiveMQDestination dest;
private boolean running;
private Work connectWork;
private long reconnectDelay = INITIAL_RECONNECT_DELAY;
public ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
this.endpointActivationKey = key;
this.adapter = adapter;
this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
this.workManager = adapter.getBootstrapContext().getWorkManager();
try {
@ -88,42 +88,97 @@ public class ActiveMQEndpointWorker {
}
connectWork = new Work() {
long currentReconnectDelay = INITIAL_RECONNECT_DELAY;
public void release() {
//
}
public synchronized void run() {
if (!isRunning()) {
return;
}
if (connection != null) {
return;
currentReconnectDelay = INITIAL_RECONNECT_DELAY;
MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
if ( LOG.isInfoEnabled() ) {
LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
}
MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
while ( connecting.get() && running ) {
try {
connection = adapter.makeConnection(activationSpec);
connection.start();
connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException error) {
if (!serverSessionPool.isClosing()) {
reconnect(error);
// initiate reconnection only once, i.e. on initial exception
// and only if not already trying to connect
LOG.error("Connection to broker failed: " + error.getMessage(), error);
if ( connecting.compareAndSet(false, true) ) {
synchronized ( connectWork ) {
disconnect();
serverSessionPool.closeIdleSessions();
connect();
}
} else {
// connection attempt has already been initiated
LOG.info("Connection attempt already in progress, ignoring connection exception");
}
}
}
});
connection.start();
int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
if (activationSpec.isDurableSubscription()) {
consumer = connection.createDurableConnectionConsumer((Topic)dest, activationSpec.getSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), serverSessionPool,
activationSpec.getMaxMessagesPerSessionsIntValue(), activationSpec.getNoLocalBooleanValue());
consumer = connection.createDurableConnectionConsumer(
(Topic) dest,
activationSpec.getSubscriptionName(),
emptyToNull(activationSpec.getMessageSelector()),
serverSessionPool,
prefetchSize,
activationSpec.getNoLocalBooleanValue());
} else {
consumer = connection.createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, activationSpec.getMaxMessagesPerSessionsIntValue(),
consumer = connection.createConnectionConsumer(
dest,
emptyToNull(activationSpec.getMessageSelector()),
serverSessionPool,
prefetchSize,
activationSpec.getNoLocalBooleanValue());
}
if ( connecting.compareAndSet(true, false) ) {
if ( LOG.isInfoEnabled() ) {
LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
}
} else {
LOG.error("Could not release connection lock");
}
} catch (JMSException error) {
LOG.debug("Fail to to connect: " + error, error);
reconnect(error);
if ( LOG.isDebugEnabled() ) {
LOG.debug("Failed to connect: " + error.getMessage(), error);
}
disconnect();
pause(error);
}
}
}
private void pause(JMSException error) {
if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: "
+ error.getMessage(), error);
LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
}
try {
synchronized ( shutdownMutex ) {
// shutdownMutex will be notified by stop() method in
// order to accelerate shutdown of endpoint
shutdownMutex.wait(currentReconnectDelay);
}
} catch ( InterruptedException e ) {
Thread.interrupted();
}
currentReconnectDelay *= 2;
if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
currentReconnectDelay = MAX_RECONNECT_DELAY;
}
}
};
@ -139,25 +194,13 @@ public class ActiveMQEndpointWorker {
}
/**
* @param s
*/
public static void safeClose(Session s) {
try {
if (s != null) {
s.close();
}
} catch (JMSException e) {
//
}
}
/**
* @param c
*/
public static void safeClose(Connection c) {
try {
if (c != null) {
LOG.debug("Closing connection to broker");
c.close();
}
} catch (JMSException e) {
@ -171,6 +214,7 @@ public class ActiveMQEndpointWorker {
public static void safeClose(ConnectionConsumer cc) {
try {
if (cc != null) {
LOG.debug("Closing ConnectionConsumer");
cc.close();
}
} catch (JMSException e) {
@ -181,35 +225,44 @@ public class ActiveMQEndpointWorker {
/**
*
*/
public synchronized void start() throws WorkException, ResourceException {
if (running) {
public void start() throws ResourceException {
synchronized (connectWork) {
if (running)
return;
}
running = true;
LOG.debug("Starting");
if ( connecting.compareAndSet(false, true) ) {
LOG.info("Starting");
serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
connect();
LOG.debug("Started");
} else {
LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
}
}
}
/**
*
*/
public synchronized void stop() throws InterruptedException {
if (!running) {
public void stop() throws InterruptedException {
synchronized (shutdownMutex) {
if (!running)
return;
}
running = false;
LOG.info("Stopping");
// wake up pausing reconnect attempt
shutdownMutex.notifyAll();
serverSessionPool.close();
disconnect();
}
}
private boolean isRunning() {
return running;
}
private synchronized void connect() {
private void connect() {
synchronized ( connectWork ) {
if (!running) {
return;
}
@ -221,44 +274,18 @@ public class ActiveMQEndpointWorker {
LOG.error("Work Manager did not accept work: ", e);
}
}
}
/**
*
*/
private synchronized void disconnect() {
private void disconnect() {
synchronized ( connectWork ) {
safeClose(consumer);
consumer = null;
safeClose(connection);
connection = null;
}
private void reconnect(JMSException error) {
LOG.debug("Reconnect cause: ", error);
long reconnectDelay;
synchronized (this) {
reconnectDelay = this.reconnectDelay;
// Only log errors if the server is really down.. And not a temp
// failure.
if (reconnectDelay == MAX_RECONNECT_DELAY) {
LOG.error("Endpoint connection to JMS broker failed: " + error.getMessage());
LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
}
}
try {
disconnect();
Thread.sleep(reconnectDelay);
synchronized (this) {
// Use exponential rollback.
this.reconnectDelay *= 2;
if (this.reconnectDelay > MAX_RECONNECT_DELAY) {
this.reconnectDelay = MAX_RECONNECT_DELAY;
}
}
connect();
} catch (InterruptedException e) {
//
}
}
protected void registerThreadSession(Session session) {
@ -269,6 +296,16 @@ public class ActiveMQEndpointWorker {
THREAD_LOCAL.set(null);
}
protected ActiveMQConnection getConnection() {
// make sure we only return a working connection
// in particular make sure that we do not return null
// after the resource adapter got disconnected from
// the broker via the disconnect() method
synchronized ( connectWork ) {
return connection;
}
}
private String emptyToNull(String value) {
if (value == null || value.length() == 0) {
return null;

View File

@ -71,6 +71,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
this.bootstrapContext = bootstrapContext;
if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
brokerStartThread = new Thread("Starting ActiveMQ Broker") {
@Override
public void run () {
try {
synchronized( ActiveMQResourceAdapter.this ) {
@ -110,21 +111,21 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
* @param activationSpec
*/
public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
ActiveMQConnectionFactory connectionFactory = this.connectionFactory;
if (connectionFactory == null) {
connectionFactory = createConnectionFactory(getInfo());
ActiveMQConnectionFactory cf = getConnectionFactory();
if (cf == null) {
cf = createConnectionFactory(getInfo());
}
String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
String clientId = activationSpec.getClientId();
if (clientId != null) {
connectionFactory.setClientID(clientId);
cf.setClientID(clientId);
} else {
if (activationSpec.isDurableSubscription()) {
log.warn("No clientID specified for durable subscription: " + activationSpec);
}
}
ActiveMQConnection physicalConnection = (ActiveMQConnection)connectionFactory.createConnection(userName, password);
ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password);
// have we configured a redelivery policy
RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
@ -318,8 +319,8 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
return connectionFactory;
}
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
this.connectionFactory = aConnectionFactory;
}

View File

@ -22,6 +22,8 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author <a href="mailto:michael.gaffney@panacya.com">Michael Gaffney </a>
@ -30,6 +32,7 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
private static final MessageEndpointState ALIVE = new MessageEndpointAlive();
private static final MessageEndpointState DEAD = new MessageEndpointDead();
private static final Log LOG = LogFactory.getLog(MessageEndpointProxy.class);
private static int proxyCount;
private final int proxyID;
@ -52,18 +55,22 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
}
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
LOG.trace("Invoking MessageEndpoint.beforeDelivery()");
state.beforeDelivery(this, method);
}
public void onMessage(Message message) {
LOG.trace("Invoking MessageEndpoint.onMethod()");
state.onMessage(this, message);
}
public void afterDelivery() throws ResourceException {
LOG.trace("Invoking MessageEndpoint.afterDelivery()");
state.afterDelivery(this);
}
public void release() {
LOG.trace("Invoking MessageEndpoint.release()");
state.release(this);
}

View File

@ -30,7 +30,7 @@ import org.apache.activemq.ActiveMQConnection;
*
* @version $Revision$
*/
interface MessageResourceAdapter extends ResourceAdapter {
public interface MessageResourceAdapter extends ResourceAdapter {
/**
*/

View File

@ -104,6 +104,10 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
return session;
}
protected boolean isStale() {
return stale || !session.isRunning();
}
public MessageProducer getMessageProducer() throws JMSException {
if (messageProducer == null) {
messageProducer = getSession().createProducer(null);
@ -156,12 +160,12 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
*/
public void run() {
log.debug("Running");
currentBatchSize = 0;
while (true) {
log.debug("run loop start");
try {
if ( session.isRunning() ) {
InboundContextSupport.register(this);
currentBatchSize = 0;
if ( session.isRunning() ) {
session.run();
} else {
log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale");
@ -169,8 +173,11 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
}
} catch (Throwable e) {
stale = true;
if ( log.isInfoEnabled() ) {
log.info("Endpoint failed to process message. Reason: " + e.getMessage());
} else if ( log.isDebugEnabled() ) {
log.debug("Endpoint failed to process message.", e);
log.info("Endpoint failed to process message. Reason: " + e);
}
} finally {
InboundContextSupport.unregister(this);
log.debug("run loop end");
@ -246,6 +253,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "ServerSessionImpl:" + serverSessionId;
}
@ -254,12 +262,12 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
try {
endpoint.release();
} catch (Throwable e) {
log.debug("Endpoint did not release properly: " + e, e);
log.debug("Endpoint did not release properly: " + e.getMessage(), e);
}
try {
session.close();
} catch (Throwable e) {
log.debug("Session did not close properly: " + e, e);
log.debug("Session did not close properly: " + e.getMessage(), e);
}
}

View File

@ -60,7 +60,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
private ServerSessionImpl createServerSessionImpl() throws JMSException {
MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
MessageEndpoint endpoint;
try {
int batchSize = 0;
@ -188,13 +188,21 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
}
public void returnToPool(ServerSessionImpl ss) {
if (LOG.isDebugEnabled()) {
LOG.debug("Session returned to pool: " + ss);
}
sessionLock.lock();
try {
activeSessions.remove(ss);
try {
// make sure we only return non-stale sessions to the pool
if ( ss.isStale() ) {
if ( LOG.isDebugEnabled() ) {
LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss);
}
ss.close();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("ServerSession returned to pool: " + ss);
}
idleSessions.add(ss);
}
} finally {
sessionLock.unlock();
}
@ -243,7 +251,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
} else if (s instanceof ActiveMQTopicSession) {
session = (ActiveMQSession) s;
} else {
activeMQAsfEndpointWorker.connection
activeMQAsfEndpointWorker.getConnection()
.onAsyncException(new JMSException(
"Session pool provided an invalid session type: "
+ s.getClass()));
@ -275,7 +283,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
}
private int closeIdleSessions() {
protected int closeIdleSessions() {
sessionLock.lock();
try {
for (ServerSessionImpl ss : idleSessions) {