https://issues.apache.org/jira/browse/AMQ-5090 - improve failover transaction tracking

This commit is contained in:
Dejan Bosanac 2014-03-07 12:04:20 +01:00
parent 33b88d34a9
commit 8188f7f884
2 changed files with 13 additions and 8 deletions

View File

@ -537,7 +537,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
@Override @Override
public Response processPrepareTransaction(TransactionInfo info) throws Exception { public Response processPrepareTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) { if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId(); ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) { if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId); ConnectionState cs = connectionStates.get(connectionId);
@ -555,7 +555,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
@Override @Override
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) { if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId(); ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) { if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId); ConnectionState cs = connectionStates.get(connectionId);
@ -573,7 +573,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
@Override @Override
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) { if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId(); ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) { if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId); ConnectionState cs = connectionStates.get(connectionId);
@ -591,7 +591,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
@Override @Override
public Response processRollbackTransaction(TransactionInfo info) throws Exception { public Response processRollbackTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) { if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId(); ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) { if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId); ConnectionState cs = connectionStates.get(connectionId);
@ -609,7 +609,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
@Override @Override
public Response processEndTransaction(TransactionInfo info) throws Exception { public Response processEndTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) { if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId(); ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) { if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId); ConnectionState cs = connectionStates.get(connectionId);

View File

@ -257,8 +257,8 @@ public class FailoverTransport implements CompositeTransport {
if (canReconnect()) { if (canReconnect()) {
reconnectOk = true; reconnectOk = true;
} }
LOG.warn("Transport (" + transport + ") failed, reason: " + e LOG.warn("Transport (" + transport + ") failed, reason: "
+ (reconnectOk ? "," : ", not") + " attempting to automatically reconnect"); + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
initialized = false; initialized = false;
failedConnectTransportURI = connectedTransportURI; failedConnectTransportURI = connectedTransportURI;
@ -635,11 +635,16 @@ public class FailoverTransport implements CompositeTransport {
break; break;
} }
Tracked tracked = null;
try {
tracked = stateTracker.track(command);
} catch (IOException ioe) {
LOG.debug("Cannot track the command " + command, ioe);
}
// If it was a request and it was not being tracked by // If it was a request and it was not being tracked by
// the state tracker, // the state tracker,
// then hold it in the requestMap so that we can replay // then hold it in the requestMap so that we can replay
// it later. // it later.
Tracked tracked = stateTracker.track(command);
synchronized (requestMap) { synchronized (requestMap) {
if (tracked != null && tracked.isWaitingForResponse()) { if (tracked != null && tracked.isWaitingForResponse()) {
requestMap.put(Integer.valueOf(command.getCommandId()), tracked); requestMap.put(Integer.valueOf(command.getCommandId()), tracked);