Hiram R. Chirino 2006-09-14 17:58:49 +00:00
parent 295850da01
commit 220ad62a50
9 changed files with 360 additions and 52 deletions

View File

@ -62,6 +62,7 @@ import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -308,7 +309,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
broker.beginTransaction(context, info.getTransactionId());
// Avoid replaying dup commands
if( cs.getTransactionState(info.getTransactionId())==null ) {
cs.addTransactionState(info.getTransactionId());
broker.beginTransaction(context, info.getTransactionId());
}
return null;
}
@ -325,9 +331,22 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
int result = broker.prepareTransaction(context, info.getTransactionId());
IntegerResponse response = new IntegerResponse(result);
return response;
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState == null )
throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
// Avoid dups.
if( !transactionState.isPrepared() ) {
transactionState.setPrepared(true);
int result = broker.prepareTransaction(context, info.getTransactionId());
transactionState.setPreparedResult(result);
IntegerResponse response = new IntegerResponse(result);
return response;
} else {
IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
return response;
}
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
@ -336,8 +355,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), true);
return null;
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
@ -346,7 +369,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
broker.commitTransaction(context, info.getTransactionId(), false);
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), false);
return null;
}
@ -356,7 +381,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( cs!=null ) {
context = cs.getContext();
}
broker.rollbackTransaction(context, info.getTransactionId());
cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context, info.getTransactionId());
return null;
}
@ -382,10 +409,32 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public Response processMessage(Message messageSend) throws Exception {
ProducerId producerId = messageSend.getProducerId();
ConnectionState state = lookupConnectionState(producerId);
ConnectionContext context = state.getContext();
broker.send(context, messageSend);
// If the message originates from this client connection,
// then, finde the associated producer state so we can do some dup detection.
ProducerState producerState=null;
if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
SessionState ss = state.getSessionState(producerId.getParentId());
if( ss == null )
throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
producerState = ss.getProducerState(producerId);
}
if( producerState == null ) {
broker.send(context, messageSend);
} else {
// Avoid Dups.
long seq = messageSend.getMessageId().getProducerSequenceId();
if( seq > producerState.getLastSequenceId() ) {
producerState.setLastSequenceId(seq);
broker.send(context, messageSend);
}
}
return null;
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@ -37,6 +38,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class ConnectionState {
final ConnectionInfo info;
private final ConcurrentHashMap transactions = new ConcurrentHashMap();
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
@ -64,6 +66,20 @@ public class ConnectionState {
}
}
}
public void addTransactionState(TransactionId id) {
checkShutdown();
transactions.put(id, new TransactionState(id));
}
public TransactionState getTransactionState(TransactionId id) {
return (TransactionState)transactions.get(id);
}
public Collection getTransactionStates() {
return transactions.values();
}
public TransactionState removeTransactionState(TransactionId id) {
return (TransactionState) transactions.remove(id);
}
public void addSession(SessionInfo info) {
checkShutdown();

View File

@ -55,21 +55,39 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
*/
public class ConnectionStateTracker implements CommandVisitor {
private final static Response TRACKED_RESPONSE_MARKER = new Response();
private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
boolean trackTransactions = false;
boolean trackMessages = false;
boolean trackAcks = false;
private boolean trackTransactions = false;
private boolean restoreSessions=true;
boolean restoreConsumers=true;
private boolean restoreConsumers=true;
private boolean restoreProducers=true;
private boolean restoreTransaction=true;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
public boolean track(Command command) throws IOException {
private class RemoveTransactionAction implements Runnable {
private final TransactionInfo info;
public RemoveTransactionAction(TransactionInfo info) {
this.info = info;
}
public void run() {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
cs.removeTransactionState(info.getTransactionId());
}
}
/**
*
*
* @param command
* @return null if the command is not state tracked.
* @throws IOException
*/
public Tracked track(Command command) throws IOException {
try {
return command.visit(this)!=null;
return (Tracked) command.visit(this);
} catch (IOException e) {
throw e;
} catch (Throwable e) {
@ -86,10 +104,23 @@ public class ConnectionStateTracker implements CommandVisitor {
if( restoreSessions )
restoreSessions(transport, connectionState);
if( restoreTransaction )
restoreTransactions(transport, connectionState);
}
}
/**
private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
TransactionState transactionState = (TransactionState) iter.next();
for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
Command command = (Command) iterator.next();
transport.oneway(command);
}
}
}
/**
* @param transport
* @param connectionState
* @throws IOException
@ -227,26 +258,103 @@ public class ConnectionStateTracker implements CommandVisitor {
return null;
}
public Response processMessage(Message send) throws Exception {
return null;
}
if( trackTransactions && send.getTransactionId() != null ) {
ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
transactionState.addCommand(send);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processMessageAck(MessageAck ack) throws Exception {
return null;
if( trackTransactions && ack.getTransactionId() != null ) {
ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
transactionState.addCommand(ack);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processBeginTransaction(TransactionInfo info) throws Exception {
return null;
}
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
cs.addTransactionState(info.getTransactionId());
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
return null;
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
transactionState.addCommand(info);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
return null;
}
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState !=null ) {
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info));
}
}
return null;
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
return null;
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState !=null ) {
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info));
}
}
return null;
}
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if( transactionState !=null ) {
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info));
}
}
return null;
}
public Response processEndTransaction(TransactionInfo info) throws Exception {
if( trackTransactions ) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
transactionState.addCommand(info);
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public Response processRecoverTransactions(TransactionInfo info) {
return null;
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processWireFormat(WireFormatInfo info) throws Exception {
return null;
}
@ -260,18 +368,6 @@ public class ConnectionStateTracker implements CommandVisitor {
return null;
}
public Response processRecoverTransactions(TransactionInfo info) {
return null;
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processEndTransaction(TransactionInfo info) throws Exception {
return null;
}
public Response processFlush(FlushCommand command) throws Exception {
return null;
}
@ -307,5 +403,21 @@ public class ConnectionStateTracker implements CommandVisitor {
public void setRestoreSessions(boolean restoreSessions) {
this.restoreSessions = restoreSessions;
}
public boolean isTrackTransactions() {
return trackTransactions;
}
public void setTrackTransactions(boolean trackTransactions) {
this.trackTransactions = trackTransactions;
}
public boolean isRestoreTransaction() {
return restoreTransaction;
}
public void setRestoreTransaction(boolean restoreTransaction) {
this.restoreTransaction = restoreTransaction;
}
}

View File

@ -21,7 +21,8 @@ package org.apache.activemq.state;
import org.apache.activemq.command.ProducerInfo;
public class ProducerState {
final ProducerInfo info;
final ProducerInfo info;
private long lastSequenceId=-1;
public ProducerState(ProducerInfo info) {
this.info = info;
@ -31,5 +32,11 @@ public class ProducerState {
}
public ProducerInfo getInfo() {
return info;
}
}
public void setLastSequenceId(long lastSequenceId) {
this.lastSequenceId = lastSequenceId;
}
public long getLastSequenceId() {
return lastSequenceId;
}
}

View File

@ -69,11 +69,13 @@ public class SessionState {
}
public Set getProducerIds() {
return producers.keySet();
}
}
public Collection getProducerStates() {
return producers.values();
}
public ProducerState getProducerState(ProducerId producerId) {
return (ProducerState) producers.get(producerId);
}
public Collection getConsumerStates() {
return consumers.values();

View File

@ -0,0 +1,27 @@
/**
*
*/
package org.apache.activemq.state;
import org.apache.activemq.command.Response;
public class Tracked extends Response {
private Runnable runnable;
public Tracked(Runnable runnable) {
this.runnable = runnable;
}
public void onResponses() {
if( runnable != null ) {
runnable.run();
runnable=null;
}
}
public boolean isWaitingForResponse() {
return runnable!=null;
}
}

View File

@ -0,0 +1,79 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.state;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.TransactionId;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class TransactionState {
final TransactionId id;
public final ArrayList commands = new ArrayList();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private boolean prepared;
private int preparedResult;
public TransactionState(TransactionId id) {
this.id = id;
}
public String toString() {
return id.toString();
}
public void addCommand(Command operation) {
checkShutdown();
commands.add(operation);
}
public List getCommands() {
return commands;
}
private void checkShutdown() {
if( shutdown.get() )
throw new IllegalStateException("Disposed");
}
public void shutdown() {
shutdown.set(false);
}
public TransactionId getId() {
return id;
}
public void setPrepared(boolean prepared) {
this.prepared = prepared;
}
public boolean isPrepared() {
return prepared;
}
public void setPreparedResult(int preparedResult) {
this.preparedResult = preparedResult;
}
public int getPreparedResult() {
return preparedResult;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.state.Tracked;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
@ -89,7 +90,10 @@ public class FailoverTransport implements CompositeTransport {
return;
}
if (command.isResponse()) {
requestMap.remove(new Integer(((Response) command).getCorrelationId()));
Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
if( object!=null && object.getClass() == Tracked.class ) {
((Tracked)object).onResponses();
}
}
if (!initialized){
if (command.isBrokerInfo()){
@ -136,6 +140,8 @@ public class FailoverTransport implements CompositeTransport {
public FailoverTransport() throws InterruptedIOException {
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
@ -372,7 +378,10 @@ public class FailoverTransport implements CompositeTransport {
// the state tracker,
// then hold it in the requestMap so that we can replay
// it later.
if (!stateTracker.track(command) && command.isResponseRequired()) {
Tracked tracked = stateTracker.track(command);
if( tracked!=null && tracked.isWaitingForResponse() ) {
requestMap.put(new Integer(command.getCommandId()), tracked);
} else if ( tracked==null && command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), command);
}
@ -380,13 +389,20 @@ public class FailoverTransport implements CompositeTransport {
try {
connectedTransport.oneway(command);
} catch (IOException e) {
// If there is an IOException in the send, remove the command from the requestMap
if (!stateTracker.track(command) && command.isResponseRequired()) {
requestMap.remove(new Integer(command.getCommandId()), command);
}
// Rethrow the exception so it will handled by the outer catch
throw e;
// If the command was not tracked.. we will retry in this method
if( tracked==null ) {
// since we will retry in this method.. take it out of the request
// map so that it is not sent 2 times on recovery
if( command.isResponseRequired() ) {
requestMap.remove(new Integer(command.getCommandId()));
}
// Rethrow the exception so it will handled by the outer catch
throw e;
}
}
return;

View File

@ -340,7 +340,7 @@ public class FanoutTransport implements CompositeTransport {
// then hold it in the requestMap so that we can replay
// it later.
boolean fanout = isFanoutCommand(command);
if (!stateTracker.track(command) && command.isResponseRequired() ) {
if (stateTracker.track(command)==null && command.isResponseRequired() ) {
int size = fanout ? minAckCount : 1;
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
}