avoid ugly npe post endpoint disassociate and add additional trace to recovery

This commit is contained in:
gtully 2014-02-17 14:36:40 +00:00
parent ec249f4dac
commit f42d56c1f8
3 changed files with 20 additions and 11 deletions

View File

@ -294,18 +294,18 @@ public class ActiveMQEndpointWorker {
private void connect() { private void connect() {
synchronized ( connectWork ) { synchronized ( connectWork ) {
if (!running) { if (!running) {
return; return;
} }
try { try {
workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
} catch (WorkException e) { } catch (WorkException e) {
running = false; running = false;
LOG.error("Work Manager did not accept work: ", e); LOG.error("Work Manager did not accept work: ", e);
}
} }
} }
}
/** /**
* *

View File

@ -127,7 +127,10 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
} }
public Xid[] recover(int arg0) throws XAException { public Xid[] recover(int arg0) throws XAException {
return transactionContext.recover(arg0); Xid[] answer = null;
answer = transactionContext.recover(arg0);
LOG.trace("{} recover({}) = {}", new Object[]{this, arg0, answer});
return answer;
} }
public void rollback(Xid arg0) throws XAException { public void rollback(Xid arg0) throws XAException {

View File

@ -30,6 +30,7 @@ import javax.jms.Session;
import javax.resource.spi.UnavailableException; import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint; import javax.resource.spi.endpoint.MessageEndpoint;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQQueueSession; import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession; import org.apache.activemq.ActiveMQTopicSession;
@ -60,7 +61,12 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
private ServerSessionImpl createServerSessionImpl() throws JMSException { private ServerSessionImpl createServerSessionImpl() throws JMSException {
MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge); final ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
if (connection == null) {
// redispatch of pending prefetched messages after disconnect can have a null connection
return null;
}
final ActiveMQSession session = (ActiveMQSession)connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
MessageEndpoint endpoint; MessageEndpoint endpoint;
try { try {
int batchSize = 0; int batchSize = 0;