Failover transport does not replay all the transaction operations on failover.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@443271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrian T. Co 2006-09-14 07:16:11 +00:00
parent a86a587788
commit 04c3b07a96
7 changed files with 249 additions and 46 deletions

View File

@ -62,6 +62,7 @@ import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -308,7 +309,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
broker.beginTransaction(context, info.getTransactionId());
// Avoid replaying dup commands
if( cs.getTransactionState(info.getTransactionId())==null ) {
cs.addTransactionState(info.getTransactionId());
broker.beginTransaction(context, info.getTransactionId());
}
return null;
}
@ -325,9 +331,22 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
int result = broker.prepareTransaction(context, info.getTransactionId());
IntegerResponse response = new IntegerResponse(result);
return response;
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState == null )
throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
// Avoid dups.
if( !transactionState.isPrepared() ) {
transactionState.setPrepared(true);
int result = broker.prepareTransaction(context, info.getTransactionId());
transactionState.setPreparedResult(result);
IntegerResponse response = new IntegerResponse(result);
return response;
} else {
IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
return response;
}
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
@ -336,8 +355,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), true);
return null;
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
@ -346,6 +369,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), false);
return null;
}
@ -356,6 +381,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context, info.getTransactionId());
return null;
}
@ -382,10 +409,32 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public Response processMessage(Message messageSend) throws Exception {
ProducerId producerId = messageSend.getProducerId();
ConnectionState state = lookupConnectionState(producerId);
ConnectionContext context = state.getContext();
broker.send(context, messageSend);
// If the message originates from this client connection,
// then, finde the associated producer state so we can do some dup detection.
ProducerState producerState=null;
if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
SessionState ss = state.getSessionState(producerId.getParentId());
if( ss == null )
throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
producerState = ss.getProducerState(producerId);
}
if( producerState == null ) {
broker.send(context, messageSend);
} else {
// Avoid Dups.
long seq = messageSend.getMessageId().getProducerSequenceId();
if( seq > producerState.getLastSequenceId() ) {
producerState.setLastSequenceId(seq);
broker.send(context, messageSend);
}
}
return null;
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@ -37,6 +38,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class ConnectionState {
final ConnectionInfo info;
private final ConcurrentHashMap transactions = new ConcurrentHashMap();
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
@ -65,6 +67,20 @@ public class ConnectionState {
}
}
public void addTransactionState(TransactionId id) {
checkShutdown();
transactions.put(id, new TransactionState(id));
}
public TransactionState getTransactionState(TransactionId id) {
return (TransactionState)transactions.get(id);
}
public Collection getTransactionStates() {
return transactions.values();
}
public TransactionState removeTransactionState(TransactionId id) {
return (TransactionState) transactions.remove(id);
}
public void addSession(SessionInfo info) {
checkShutdown();
sessions.put(info.getSessionId(), new SessionState(info));

View File

@ -55,21 +55,41 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
*/
public class ConnectionStateTracker implements CommandVisitor {
private final static Response TRACKED_RESPONSE_MARKER = new Response();
private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
boolean trackTransactions = false;
boolean trackMessages = false;
boolean trackAcks = false;
private boolean trackTransactions = false;
private boolean restoreSessions=true;
boolean restoreConsumers=true;
private boolean restoreConsumers=true;
private boolean restoreProducers=true;
private boolean restoreTransaction=true;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
public boolean track(Command command) throws IOException {
private class RemoveTransactionAction implements Runnable {
private final TransactionInfo info;
public RemoveTransactionAction(TransactionInfo info) {
this.info = info;
}
public void run() {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
if( cs != null ) {
cs.removeTransactionState(info.getTransactionId());
}
}
}
/**
*
*
* @param command
* @return null if the command is not state tracked.
* @throws IOException
*/
public Tracked track(Command command) throws IOException {
try {
return command.visit(this)!=null;
return (Tracked) command.visit(this);
} catch (IOException e) {
throw e;
} catch (Throwable e) {
@ -86,9 +106,22 @@ public class ConnectionStateTracker implements CommandVisitor {
if( restoreSessions )
restoreSessions(transport, connectionState);
if( restoreTransaction )
restoreTransactions(transport, connectionState);
}
}
private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
TransactionState transactionState = (TransactionState) iter.next();
for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
Command command = (Command) iterator.next();
transport.oneway(command);
}
}
}
/**
* @param transport
* @param connectionState
@ -227,26 +260,103 @@ public class ConnectionStateTracker implements CommandVisitor {
return null;
}
public Response processMessage(Message send) throws Exception {
return null;
if( trackTransactions && send.getTransactionId() != null ) {
ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
transactionState.addCommand(send);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processMessageAck(MessageAck ack) throws Exception {
return null;
if( trackTransactions && ack.getTransactionId() != null ) {
ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
transactionState.addCommand(ack);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processBeginTransaction(TransactionInfo info) throws Exception {
return null;
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
cs.addTransactionState(info.getTransactionId());
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
return null;
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
transactionState.addCommand(info);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
return null;
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState !=null ) {
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info));
}
}
return null;
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
return null;
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState !=null ) {
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info));
}
}
return null;
}
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState !=null ) {
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info));
}
}
return null;
}
public Response processEndTransaction(TransactionInfo info) throws Exception {
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
transactionState.addCommand(info);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processRecoverTransactions(TransactionInfo info) {
return null;
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processWireFormat(WireFormatInfo info) throws Exception {
return null;
}
@ -260,18 +370,6 @@ public class ConnectionStateTracker implements CommandVisitor {
return null;
}
public Response processRecoverTransactions(TransactionInfo info) {
return null;
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processEndTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processFlush(FlushCommand command) throws Exception {
return null;
}
@ -308,4 +406,19 @@ public class ConnectionStateTracker implements CommandVisitor {
this.restoreSessions = restoreSessions;
}
public boolean isTrackTransactions() {
return trackTransactions;
}
public void setTrackTransactions(boolean trackTransactions) {
this.trackTransactions = trackTransactions;
}
public boolean isRestoreTransaction() {
return restoreTransaction;
}
public void setRestoreTransaction(boolean restoreTransaction) {
this.restoreTransaction = restoreTransaction;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.command.ProducerInfo;
public class ProducerState {
final ProducerInfo info;
private long lastSequenceId=-1;
public ProducerState(ProducerInfo info) {
this.info = info;
@ -32,4 +33,10 @@ public class ProducerState {
public ProducerInfo getInfo() {
return info;
}
public void setLastSequenceId(long lastSequenceId) {
this.lastSequenceId = lastSequenceId;
}
public long getLastSequenceId() {
return lastSequenceId;
}
}

View File

@ -74,6 +74,9 @@ public class SessionState {
public Collection getProducerStates() {
return producers.values();
}
public ProducerState getProducerState(ProducerId producerId) {
return (ProducerState) producers.get(producerId);
}
public Collection getConsumerStates() {
return consumers.values();

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.state.Tracked;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
@ -89,7 +90,10 @@ public class FailoverTransport implements CompositeTransport {
return;
}
if (command.isResponse()) {
requestMap.remove(new Integer(((Response) command).getCorrelationId()));
Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
if( object!=null && object.getClass() == Tracked.class ) {
((Tracked)object).onResponses();
}
}
if (!initialized){
if (command.isBrokerInfo()){
@ -136,6 +140,8 @@ public class FailoverTransport implements CompositeTransport {
public FailoverTransport() throws InterruptedIOException {
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
@ -372,7 +378,10 @@ public class FailoverTransport implements CompositeTransport {
// the state tracker,
// then hold it in the requestMap so that we can replay
// it later.
if (!stateTracker.track(command) && command.isResponseRequired()) {
Tracked tracked = stateTracker.track(command);
if( tracked!=null && tracked.isWaitingForResponse() ) {
requestMap.put(new Integer(command.getCommandId()), tracked);
} else if ( tracked==null && command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), command);
}
@ -380,13 +389,19 @@ public class FailoverTransport implements CompositeTransport {
try {
connectedTransport.oneway(command);
} catch (IOException e) {
// If there is an IOException in the send, remove the command from the requestMap
if (!stateTracker.track(command) && command.isResponseRequired()) {
requestMap.remove(new Integer(command.getCommandId()), command);
}
// Rethrow the exception so it will handled by the outer catch
throw e;
// If the command was not tracked.. we will retry in this method
if( tracked==null ) {
// since we will retry in this method.. take it out of the request
// map so that it is not sent 2 times on recovery
if( command.isResponseRequired() ) {
requestMap.remove(new Integer(command.getCommandId()));
}
// Rethrow the exception so it will handled by the outer catch
throw e;
}
}
return;

View File

@ -340,7 +340,7 @@ public class FanoutTransport implements CompositeTransport {
// then hold it in the requestMap so that we can replay
// it later.
boolean fanout = isFanoutCommand(command);
if (!stateTracker.track(command) && command.isResponseRequired() ) {
if (stateTracker.track(command)==null && command.isResponseRequired() ) {
int size = fanout ? minAckCount : 1;
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
}