From 220ad62a5057fec622eb073fb97e0c2a76b2e586 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 14 Sep 2006 17:58:49 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-915 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@443430 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/AbstractConnection.java | 63 ++++++- .../activemq/state/ConnectionState.java | 16 ++ .../state/ConnectionStateTracker.java | 174 ++++++++++++++---- .../apache/activemq/state/ProducerState.java | 11 +- .../apache/activemq/state/SessionState.java | 6 +- .../org/apache/activemq/state/Tracked.java | 27 +++ .../activemq/state/TransactionState.java | 79 ++++++++ .../transport/failover/FailoverTransport.java | 34 +++- .../transport/fanout/FanoutTransport.java | 2 +- 9 files changed, 360 insertions(+), 52 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/state/Tracked.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 5ab20b9ce1..72cd37e24b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -62,6 +62,7 @@ import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.SessionState; +import org.apache.activemq.state.TransactionState; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -308,7 +309,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if( cs!=null ) { context = cs.getContext(); } - broker.beginTransaction(context, info.getTransactionId()); + + // Avoid replaying dup commands + if( cs.getTransactionState(info.getTransactionId())==null ) { + cs.addTransactionState(info.getTransactionId()); + broker.beginTransaction(context, info.getTransactionId()); + } return null; } @@ -325,9 +331,22 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if( cs!=null ) { context = cs.getContext(); } - int result = broker.prepareTransaction(context, info.getTransactionId()); - IntegerResponse response = new IntegerResponse(result); - return response; + + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if( transactionState == null ) + throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId()); + + // Avoid dups. + if( !transactionState.isPrepared() ) { + transactionState.setPrepared(true); + int result = broker.prepareTransaction(context, info.getTransactionId()); + transactionState.setPreparedResult(result); + IntegerResponse response = new IntegerResponse(result); + return response; + } else { + IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); + return response; + } } public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { @@ -336,8 +355,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if( cs!=null ) { context = cs.getContext(); } + + cs.removeTransactionState(info.getTransactionId()); broker.commitTransaction(context, info.getTransactionId(), true); + return null; + } public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { @@ -346,7 +369,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if( cs!=null ) { context = cs.getContext(); } - broker.commitTransaction(context, info.getTransactionId(), false); + + cs.removeTransactionState(info.getTransactionId()); + broker.commitTransaction(context, info.getTransactionId(), false); return null; } @@ -356,7 +381,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if( cs!=null ) { context = cs.getContext(); } - broker.rollbackTransaction(context, info.getTransactionId()); + + cs.removeTransactionState(info.getTransactionId()); + broker.rollbackTransaction(context, info.getTransactionId()); return null; } @@ -382,10 +409,32 @@ public abstract class AbstractConnection implements Service, Connection, Task, C public Response processMessage(Message messageSend) throws Exception { + ProducerId producerId = messageSend.getProducerId(); ConnectionState state = lookupConnectionState(producerId); ConnectionContext context = state.getContext(); - broker.send(context, messageSend); + + // If the message originates from this client connection, + // then, finde the associated producer state so we can do some dup detection. + ProducerState producerState=null; + if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) { + SessionState ss = state.getSessionState(producerId.getParentId()); + if( ss == null ) + throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId()); + producerState = ss.getProducerState(producerId); + } + + if( producerState == null ) { + broker.send(context, messageSend); + } else { + // Avoid Dups. + long seq = messageSend.getMessageId().getProducerSequenceId(); + if( seq > producerState.getLastSequenceId() ) { + producerState.setLastSequenceId(seq); + broker.send(context, messageSend); + } + } + return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java index 8473011367..10cdbfd8a5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java @@ -30,6 +30,7 @@ import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +38,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; public class ConnectionState { final ConnectionInfo info; + private final ConcurrentHashMap transactions = new ConcurrentHashMap(); private final ConcurrentHashMap sessions = new ConcurrentHashMap(); private final List tempDestinations = Collections.synchronizedList(new ArrayList()); private final AtomicBoolean shutdown = new AtomicBoolean(false); @@ -64,6 +66,20 @@ public class ConnectionState { } } } + + public void addTransactionState(TransactionId id) { + checkShutdown(); + transactions.put(id, new TransactionState(id)); + } + public TransactionState getTransactionState(TransactionId id) { + return (TransactionState)transactions.get(id); + } + public Collection getTransactionStates() { + return transactions.values(); + } + public TransactionState removeTransactionState(TransactionId id) { + return (TransactionState) transactions.remove(id); + } public void addSession(SessionInfo info) { checkShutdown(); diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index ebf1a9549c..367470cc7d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -55,21 +55,39 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; */ public class ConnectionStateTracker implements CommandVisitor { - private final static Response TRACKED_RESPONSE_MARKER = new Response(); + private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); - boolean trackTransactions = false; - boolean trackMessages = false; - boolean trackAcks = false; + private boolean trackTransactions = false; private boolean restoreSessions=true; - boolean restoreConsumers=true; + private boolean restoreConsumers=true; private boolean restoreProducers=true; + private boolean restoreTransaction=true; protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); - - public boolean track(Command command) throws IOException { + + private class RemoveTransactionAction implements Runnable { + private final TransactionInfo info; + public RemoveTransactionAction(TransactionInfo info) { + this.info = info; + } + public void run() { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + cs.removeTransactionState(info.getTransactionId()); + } + } + + /** + * + * + * @param command + * @return null if the command is not state tracked. + * @throws IOException + */ + public Tracked track(Command command) throws IOException { try { - return command.visit(this)!=null; + return (Tracked) command.visit(this); } catch (IOException e) { throw e; } catch (Throwable e) { @@ -86,10 +104,23 @@ public class ConnectionStateTracker implements CommandVisitor { if( restoreSessions ) restoreSessions(transport, connectionState); + + if( restoreTransaction ) + restoreTransactions(transport, connectionState); } } - /** + private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { + for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) { + TransactionState transactionState = (TransactionState) iter.next(); + for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) { + Command command = (Command) iterator.next(); + transport.oneway(command); + } + } + } + + /** * @param transport * @param connectionState * @throws IOException @@ -227,26 +258,103 @@ public class ConnectionStateTracker implements CommandVisitor { return null; } public Response processMessage(Message send) throws Exception { - return null; - } + if( trackTransactions && send.getTransactionId() != null ) { + ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); + transactionState.addCommand(send); + return TRACKED_RESPONSE_MARKER; + } + return null; + } public Response processMessageAck(MessageAck ack) throws Exception { - return null; + if( trackTransactions && ack.getTransactionId() != null ) { + ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(ack.getTransactionId()); + transactionState.addCommand(ack); + return TRACKED_RESPONSE_MARKER; + } + return null; } + public Response processBeginTransaction(TransactionInfo info) throws Exception { - return null; - } + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + cs.addTransactionState(info.getTransactionId()); + return TRACKED_RESPONSE_MARKER; + } + return null; + } public Response processPrepareTransaction(TransactionInfo info) throws Exception { - return null; + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + transactionState.addCommand(info); + return TRACKED_RESPONSE_MARKER; + } + return null; } + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { - return null; - } + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if( transactionState !=null ) { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info)); + } + } + return null; + } public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { - return null; + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if( transactionState !=null ) { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info)); + } + } + return null; } + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if( transactionState !=null ) { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info)); + } + } + return null; + } + + public Response processEndTransaction(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + transactionState.addCommand(info); + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public Response processRecoverTransactions(TransactionInfo info) { return null; } + public Response processForgetTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processWireFormat(WireFormatInfo info) throws Exception { return null; } @@ -260,18 +368,6 @@ public class ConnectionStateTracker implements CommandVisitor { return null; } - public Response processRecoverTransactions(TransactionInfo info) { - return null; - } - - public Response processForgetTransaction(TransactionInfo info) throws Exception { - return null; - } - - public Response processEndTransaction(TransactionInfo info) throws Exception { - return null; - } - public Response processFlush(FlushCommand command) throws Exception { return null; } @@ -307,5 +403,21 @@ public class ConnectionStateTracker implements CommandVisitor { public void setRestoreSessions(boolean restoreSessions) { this.restoreSessions = restoreSessions; } - + + public boolean isTrackTransactions() { + return trackTransactions; + } + + public void setTrackTransactions(boolean trackTransactions) { + this.trackTransactions = trackTransactions; + } + + public boolean isRestoreTransaction() { + return restoreTransaction; + } + + public void setRestoreTransaction(boolean restoreTransaction) { + this.restoreTransaction = restoreTransaction; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java b/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java index 56dedd5e0c..b9d85b8892 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java @@ -21,7 +21,8 @@ package org.apache.activemq.state; import org.apache.activemq.command.ProducerInfo; public class ProducerState { - final ProducerInfo info; + final ProducerInfo info; + private long lastSequenceId=-1; public ProducerState(ProducerInfo info) { this.info = info; @@ -31,5 +32,11 @@ public class ProducerState { } public ProducerInfo getInfo() { return info; - } + } + public void setLastSequenceId(long lastSequenceId) { + this.lastSequenceId = lastSequenceId; + } + public long getLastSequenceId() { + return lastSequenceId; + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java b/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java index 093bf91b7a..38e81dccab 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java @@ -69,11 +69,13 @@ public class SessionState { } public Set getProducerIds() { return producers.keySet(); - } - + } public Collection getProducerStates() { return producers.values(); } + public ProducerState getProducerState(ProducerId producerId) { + return (ProducerState) producers.get(producerId); + } public Collection getConsumerStates() { return consumers.values(); diff --git a/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java b/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java new file mode 100644 index 0000000000..9bc76da08b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java @@ -0,0 +1,27 @@ +/** + * + */ +package org.apache.activemq.state; + +import org.apache.activemq.command.Response; + +public class Tracked extends Response { + + private Runnable runnable; + + public Tracked(Runnable runnable) { + this.runnable = runnable; + } + + public void onResponses() { + if( runnable != null ) { + runnable.run(); + runnable=null; + } + } + + public boolean isWaitingForResponse() { + return runnable!=null; + } + +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java b/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java new file mode 100644 index 0000000000..ff2d8b14aa --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java @@ -0,0 +1,79 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.state; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.command.Command; +import org.apache.activemq.command.TransactionId; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +public class TransactionState { + final TransactionId id; + + public final ArrayList commands = new ArrayList(); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + private boolean prepared; + + private int preparedResult; + + public TransactionState(TransactionId id) { + this.id = id; + } + public String toString() { + return id.toString(); + } + + public void addCommand(Command operation) { + checkShutdown(); + commands.add(operation); + } + + public List getCommands() { + return commands; + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + shutdown.set(false); + } + public TransactionId getId() { + return id; + } + + public void setPrepared(boolean prepared) { + this.prepared = prepared; + } + public boolean isPrepared() { + return prepared; + } + public void setPreparedResult(int preparedResult) { + this.preparedResult = preparedResult; + } + public int getPreparedResult() { + return preparedResult; + } + +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 58cae92a02..221d6f143b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -28,6 +28,7 @@ import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; import org.apache.activemq.state.ConnectionStateTracker; +import org.apache.activemq.state.Tracked; import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -89,7 +90,10 @@ public class FailoverTransport implements CompositeTransport { return; } if (command.isResponse()) { - requestMap.remove(new Integer(((Response) command).getCorrelationId())); + Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId())); + if( object!=null && object.getClass() == Tracked.class ) { + ((Tracked)object).onResponses(); + } } if (!initialized){ if (command.isBrokerInfo()){ @@ -136,6 +140,8 @@ public class FailoverTransport implements CompositeTransport { public FailoverTransport() throws InterruptedIOException { + stateTracker.setTrackTransactions(true); + // Setup a task that is used to reconnect the a connection async. reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { @@ -372,7 +378,10 @@ public class FailoverTransport implements CompositeTransport { // the state tracker, // then hold it in the requestMap so that we can replay // it later. - if (!stateTracker.track(command) && command.isResponseRequired()) { + Tracked tracked = stateTracker.track(command); + if( tracked!=null && tracked.isWaitingForResponse() ) { + requestMap.put(new Integer(command.getCommandId()), tracked); + } else if ( tracked==null && command.isResponseRequired()) { requestMap.put(new Integer(command.getCommandId()), command); } @@ -380,13 +389,20 @@ public class FailoverTransport implements CompositeTransport { try { connectedTransport.oneway(command); } catch (IOException e) { - // If there is an IOException in the send, remove the command from the requestMap - if (!stateTracker.track(command) && command.isResponseRequired()) { - requestMap.remove(new Integer(command.getCommandId()), command); - } - - // Rethrow the exception so it will handled by the outer catch - throw e; + + // If the command was not tracked.. we will retry in this method + if( tracked==null ) { + + // since we will retry in this method.. take it out of the request + // map so that it is not sent 2 times on recovery + if( command.isResponseRequired() ) { + requestMap.remove(new Integer(command.getCommandId())); + } + + // Rethrow the exception so it will handled by the outer catch + throw e; + } + } return; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 212138b7aa..b423666f72 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -340,7 +340,7 @@ public class FanoutTransport implements CompositeTransport { // then hold it in the requestMap so that we can replay // it later. boolean fanout = isFanoutCommand(command); - if (!stateTracker.track(command) && command.isResponseRequired() ) { + if (stateTracker.track(command)==null && command.isResponseRequired() ) { int size = fanout ? minAckCount : 1; requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); }