Created a ReconnectTest that showed problems with the synchronization used when a client reconnects to a server via failover before the server detects the client failure.
- InactivityMonitor : Better syncronization so that an inactivty exception is only raised once.
- Connection: Added serviceExceptionAsync() method and change all methods that are dispatching to use this instead of serviceException() to avoid possible deadlock that can occur during connection shutdown.
- MockTransport: finer grained sychonization to avoid deadlocks.
- PrefetchSubscription: it is possible it will get duplicate acks on a failover reconnect



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@471837 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-06 19:35:59 +00:00
parent 2cc9b26943
commit cafe4cbcc4
10 changed files with 552 additions and 206 deletions

View File

@ -71,6 +71,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
@ -93,23 +96,31 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
private boolean manageable;
protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
protected final Map brokerConnectionStates;
protected final Map brokerConnectionStates;
private WireFormatInfo wireFormatInfo;
protected boolean disposed=false;
protected final AtomicBoolean disposed=new AtomicBoolean(false);
protected IOException transportException;
private CountDownLatch stopLatch = new CountDownLatch(1);
static class ConnectionState extends org.apache.activemq.state.ConnectionState {
private final ConnectionContext context;
AbstractConnection connection;
public ConnectionState(ConnectionInfo info, ConnectionContext context) {
public ConnectionState(ConnectionInfo info, ConnectionContext context, AbstractConnection connection) {
super(info);
this.context = context;
this.connection=connection;
}
public ConnectionContext getContext() {
return context;
}
public AbstractConnection getConnection() {
return connection;
}
}
@ -152,32 +163,39 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public void stop() throws Exception{
if(disposed)
return;
disposed=true;
if(disposed.compareAndSet(false, true)) {
if( taskRunner!=null )
taskRunner.shutdown();
if( taskRunner!=null )
taskRunner.shutdown();
//
// Remove all logical connection associated with this connection
// from the broker.
if(!broker.isStopped()){
ArrayList l=new ArrayList(localConnectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId) iter.next();
try{
processRemoveConnection(connectionId);
}catch(Throwable ignore){}
}
if(brokerInfo!=null){
broker.removeBroker(this,brokerInfo);
}
// Clear out the dispatch queue to release any memory that
// is being held on to.
dispatchQueue.clear();
//
// Remove all logical connection associated with this connection
// from the broker.
if(!broker.isStopped()){
ArrayList l=new ArrayList(localConnectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId) iter.next();
try{
log.debug("Cleaning up connection resources.");
processRemoveConnection(connectionId);
}catch(Throwable ignore){
ignore.printStackTrace();
}
}
if(brokerInfo!=null){
broker.removeBroker(this,brokerInfo);
}
}
stopLatch.countDown();
}
}
public void serviceTransportException(IOException e) {
if( !disposed ) {
if( !disposed.get() ) {
transportException = e;
if( transportLog.isDebugEnabled() )
transportLog.debug("Transport failed: "+e,e);
@ -185,6 +203,28 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
}
/**
* Calls the serviceException method in an async thread. Since
* handling a service exception closes a socket, we should not tie
* up broker threads since client sockets may hang or cause deadlocks.
*
* @param e
*/
public void serviceExceptionAsync(final IOException e) {
new Thread("Async Exception Handler") {
public void run() {
serviceException(e);
}
}.start();
}
/**
* Closes a clients connection due to a detected error.
*
* Errors are ignored if: the client is closing or broker is closing.
* Otherwise, the connection error transmitted to the client before stopping it's
* transport.
*/
public void serviceException(Throwable e) {
// are we a transport exception such as not being able to dispatch
// synchronously to a transport
@ -195,9 +235,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// Handle the case where the broker is stopped
// But the client is still connected.
else if (e.getClass() == BrokerStoppedException.class ) {
if( !disposed ) {
if( !disposed.get() ) {
if( serviceLog.isDebugEnabled() )
serviceLog.debug("Broker has been stopped. Notifying client and closing his connection.");
serviceLog.debug("Broker has been stopped. Notifying client and closing his connection.");
ConnectionError ce = new ConnectionError();
ce.setException(e);
@ -205,21 +245,21 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// Wait a little bit to try to get the output buffer to flush the exption notification to the client.
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
Thread.sleep(500);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
// Worst case is we just kill the connection before the notification gets to him.
// Worst case is we just kill the connection before the notification gets to him.
ServiceSupport.dispose(this);
}
}
else if( !disposed && !inServiceException ) {
else if( !disposed.get() && !inServiceException ) {
inServiceException = true;
try {
if( serviceLog.isDebugEnabled() )
serviceLog.debug("Async error occurred: "+e,e);
serviceLog.debug("Async error occurred: "+e,e);
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchAsync(ce);
@ -238,8 +278,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
response = command.visit(this);
} catch ( Throwable e ) {
if( responseRequired ) {
if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class )
serviceLog.debug("Error occured while processing sync command: "+e,e);
if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class )
serviceLog.debug("Error occured while processing sync command: "+e,e);
response = new ExceptionResponse(e);
} else {
serviceException(e);
@ -312,7 +352,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// Avoid replaying dup commands
if( cs.getTransactionState(info.getTransactionId())==null ) {
cs.addTransactionState(info.getTransactionId());
cs.addTransactionState(info.getTransactionId());
broker.beginTransaction(context, info.getTransactionId());
}
return null;
@ -371,7 +411,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), false);
broker.commitTransaction(context, info.getTransactionId(), false);
return null;
}
@ -383,7 +423,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context, info.getTransactionId());
broker.rollbackTransaction(context, info.getTransactionId());
return null;
}
@ -418,21 +458,21 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// then, finde the associated producer state so we can do some dup detection.
ProducerState producerState=null;
if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
SessionState ss = state.getSessionState(producerId.getParentId());
if( ss == null )
throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
producerState = ss.getProducerState(producerId);
SessionState ss = state.getSessionState(producerId.getParentId());
if( ss == null )
throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
producerState = ss.getProducerState(producerId);
}
if( producerState == null ) {
broker.send(context, messageSend);
} else {
// Avoid Dups.
long seq = messageSend.getMessageId().getProducerSequenceId();
if( seq > producerState.getLastSequenceId() ) {
producerState.setLastSequenceId(seq);
broker.send(context, messageSend);
}
// Avoid Dups.
long seq = messageSend.getMessageId().getProducerSequenceId();
if( seq > producerState.getLastSequenceId() ) {
producerState.setLastSequenceId(seq);
broker.send(context, messageSend);
}
}
return null;
@ -454,10 +494,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public Response processBrokerInfo(BrokerInfo info) {
// We only expect to get one broker info command per connection
if( this.brokerInfo!=null ) {
log.warn("Unexpected extra broker info command received: "+info);
}
// We only expect to get one broker info command per connection
if( this.brokerInfo!=null ) {
log.warn("Unexpected extra broker info command received: "+info);
}
this.brokerInfo = info;
broker.addBroker(this, info);
@ -494,12 +534,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// Avoid replaying dup commands
if( !ss.getProducerIds().contains(info.getProducerId()) ) {
broker.addProducer(cs.getContext(), info);
try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
broker.addProducer(cs.getContext(), info);
try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
}
return null;
}
@ -531,12 +571,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// Avoid replaying dup commands
if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
}
return null;
@ -565,12 +605,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// Avoid replaying dup commands
if( !cs.getSessionIds().contains(info.getSessionId()) ) {
broker.addSession(cs.getContext(), info);
try {
cs.addSession(info);
} catch (IllegalStateException e) {
broker.removeSession(cs.getContext(), info);
}
broker.addSession(cs.getContext(), info);
try {
cs.addSession(info);
} catch (IllegalStateException e) {
broker.removeSession(cs.getContext(), info);
}
}
return null;
}
@ -613,34 +653,51 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
if( state == null ) {
// Setup the context.
ConnectionContext context = new ConnectionContext(info);
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
context.setTransactions(new ConcurrentHashMap());
context.setWireFormatInfo(wireFormatInfo);
this.manageable = info.isManageable();
context.incrementReference();
state = new ConnectionState(info, context);
brokerConnectionStates.put(info.getConnectionId(), state);
localConnectionStates.put(info.getConnectionId(), state);
broker.addConnection(context, info);
if (info.isManageable() && broker.isFaultTolerantConfiguration()){
//send ConnectionCommand
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
} else {
// We are a concurrent connection... it must be client reconnect.
localConnectionStates.put(info.getConnectionId(), state);
state.getContext().incrementReference();
if( state !=null ) {
// ConnectionInfo replay?? Chances are that it's a client reconnecting,
// and we have not detected that that old connection died.. Kill the old connection
// to make sure our state is in sync with the client.
if( this != state.getConnection() ) {
log.debug("Killing previous stale connection: "+state.getConnection());
state.getConnection().stop();
if( !state.getConnection().stopLatch.await(15, TimeUnit.SECONDS) ) {
throw new Exception("Previous connection could not be clean up.");
}
}
}
log.debug("Setting up new connection: "+this);
// Setup the context.
String clientId = info.getClientId();
ConnectionContext context = new ConnectionContext();
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
context.setTransactions(new ConcurrentHashMap());
context.setClientId(clientId);
context.setUserName(info.getUserName());
context.setConnectionId(info.getConnectionId());
context.setWireFormatInfo(wireFormatInfo);
context.incrementReference();
this.manageable = info.isManageable();
state = new ConnectionState(info, context, this);
brokerConnectionStates.put(info.getConnectionId(), state);
localConnectionStates.put(info.getConnectionId(), state);
broker.addConnection(context, info);
if (info.isManageable() && broker.isFaultTolerantConfiguration()){
//send ConnectionCommand
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
return null;
}
@ -680,11 +737,11 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
if( state != null ) {
// If we are the last reference, we should remove the state
// from the broker.
if( state.getContext().decrementReference() == 0 ){
brokerConnectionStates.remove(id);
}
// If we are the last reference, we should remove the state
// from the broker.
if( state.getContext().decrementReference() == 0 ){
brokerConnectionStates.remove(id);
}
}
return null;
}
@ -758,14 +815,14 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
* @return true if the Connection is connected
*/
public boolean isConnected() {
return !disposed;
return !disposed.get();
}
/**
* @return true if the Connection is active
*/
public boolean isActive() {
return !disposed;
return !disposed.get();
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.activemq.broker;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.command.Command;
@ -102,4 +104,6 @@ public interface Connection extends Service {
*/
public String getRemoteAddress();
public void serviceExceptionAsync(IOException e);
}

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* @version $Revision: 1.8 $
@ -46,6 +47,7 @@ public class TransportConnection extends AbstractConnection {
private boolean pendingStop;
private long timeStamp = 0;
private MasterBroker masterBroker; //used if this connection is used by a Slave
private AtomicBoolean stopped = new AtomicBoolean(false);
/**
* @param connector
@ -93,34 +95,41 @@ public class TransportConnection extends AbstractConnection {
}
}
public synchronized void stop() throws Exception {
public void stop() throws Exception {
// If we're in the middle of starting
// then go no further... for now.
pendingStop = true;
if (starting) {
log.debug("stop() called in the middle of start(). Delaying...");
return;
synchronized(this) {
pendingStop = true;
if (starting) {
log.debug("stop() called in the middle of start(). Delaying...");
return;
}
}
connector.onStopped(this);
try {
if (masterBroker != null) {
masterBroker.stop();
}
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
if (transportException == null) {
transport.oneway(new ShutdownInfo());
}
}
catch (Exception ignore) {
//ignore.printStackTrace();
}
if( stopped.compareAndSet(false, true) ) {
transport.stop();
active = false;
super.stop();
log.debug("Stopping connection: "+transport.getRemoteAddress());
connector.onStopped(this);
try {
if (masterBroker != null){
masterBroker.stop();
}
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
if( transportException == null ) {
transport.oneway(new ShutdownInfo());
}
} catch (Exception ignore) {
//ignore.printStackTrace();
}
transport.stop();
active = false;
super.stop();
log.debug("Stopped connection: "+transport.getRemoteAddress());
}
}
@ -269,7 +278,7 @@ public class TransportConnection extends AbstractConnection {
getStatistics().onCommand(command);
}
catch (IOException e) {
serviceException(e);
serviceExceptionAsync(e);
}
finally {
setMarkedCandidate(false);

View File

@ -242,8 +242,13 @@ public class ManagedRegionBroker extends RegionBroker {
topics.put(key,view);
}
}
registeredMBeans.add(key);
mbeanServer.registerMBean(view,key);
try {
mbeanServer.registerMBean(view,key);
registeredMBeans.add(key);
} catch (Throwable e) {
log.warn("Failed to register MBean: "+key);
log.debug("Failure reason: "+e,e);
}
}
protected void unregisterDestination(ObjectName key) throws Exception{
@ -252,7 +257,12 @@ public class ManagedRegionBroker extends RegionBroker {
temporaryQueues.remove(key);
temporaryTopics.remove(key);
if(registeredMBeans.remove(key)){
mbeanServer.unregisterMBean(key);
try {
mbeanServer.unregisterMBean(key);
} catch (Throwable e) {
log.warn("Failed to unregister MBean: "+key);
log.debug("Failure reason: "+e,e);
}
}
}
@ -279,7 +289,7 @@ public class ManagedRegionBroker extends RegionBroker {
registeredMBeans.remove(inactiveName);
mbeanServer.unregisterMBean(inactiveName);
}
}catch(Exception e){
}catch(Throwable e){
log.error("Unable to unregister inactive durable subscriber: "+subscriptionKey,e);
}
}else{
@ -287,8 +297,15 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
}
registeredMBeans.add(key);
mbeanServer.registerMBean(view,key);
try {
mbeanServer.registerMBean(view,key);
registeredMBeans.add(key);
} catch (Throwable e) {
log.warn("Failed to register MBean: "+key);
log.debug("Failure reason: "+e,e);
}
}
protected void unregisterSubscription(ObjectName key) throws Exception{
@ -298,7 +315,12 @@ public class ManagedRegionBroker extends RegionBroker {
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
if(registeredMBeans.remove(key)){
mbeanServer.unregisterMBean(key);
try {
mbeanServer.unregisterMBean(key);
} catch (Throwable e) {
log.warn("Failed to unregister MBean: "+key);
log.debug("Failure reason: "+e,e);
}
}
DurableSubscriptionView view=(DurableSubscriptionView) durableTopicSubscribers.remove(key);
if(view!=null){
@ -346,8 +368,15 @@ public class ManagedRegionBroker extends RegionBroker {
+","+"Type=Subscription,"+"active=false,"+"name="
+JMXSupport.encodeObjectNamePart(key.toString())+"");
SubscriptionView view=new InactiveDurableSubscriptionView(this,key.getClientId(),info);
registeredMBeans.add(objectName);
mbeanServer.registerMBean(view,objectName);
try {
mbeanServer.registerMBean(view,objectName);
registeredMBeans.add(objectName);
} catch (Throwable e) {
log.warn("Failed to register MBean: "+key);
log.debug("Failure reason: "+e,e);
}
inactiveDurableTopicSubscribers.put(objectName,view);
subscriptionKeys.put(key,objectName);
}catch(Exception e){

View File

@ -46,16 +46,18 @@ public class ManagedTransportConnection extends TransportConnection {
private final MBeanServer server;
private final ObjectName connectorName;
private ConnectionViewMBean mbean;
private ObjectName name;
private String connectionId;
private ObjectName byClientIdName;
private ObjectName byAddressName;
public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker, TaskRunnerFactory factory, MBeanServer server,
ObjectName connectorName, String connectionId) throws IOException {
ObjectName connectorName) throws IOException {
super(connector, transport, broker, factory);
this.server = server;
this.connectorName = connectorName;
this.mbean = new ConnectionView(this);
setConnectionId(connectionId);
byAddressName = createByAddressObjectName("address", transport.getRemoteAddress());
registerMBean(byAddressName);
}
public synchronized void stop() throws Exception {
@ -63,60 +65,61 @@ public class ManagedTransportConnection extends TransportConnection {
setPendingStop(true);
return;
}
unregisterMBean();
synchronized(this) {
unregisterMBean(byClientIdName);
unregisterMBean(byAddressName);
byClientIdName=null;
byAddressName=null;
}
super.stop();
}
public String getConnectionId() {
return connectionId;
}
/**
* Sets the connection ID of this connection. On startup this connection ID
* is set to an incrementing counter; once the client registers it is set to
* the clientID of the JMS client.
*/
public void setConnectionId(String connectionId) throws IOException {
this.connectionId = connectionId;
unregisterMBean();
name = createObjectName();
registerMBean();
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
Response answer = super.processAddConnection(info);
String clientId = info.getClientId();
if (clientId != null) {
// lets update the MBean name
setConnectionId(clientId);
if(byClientIdName==null) {
byClientIdName = createByClientIdObjectName(clientId);
registerMBean(byClientIdName);
}
}
return answer;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void registerMBean() throws IOException {
try {
server.registerMBean(mbean, name);
}
catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
protected void registerMBean(ObjectName name) {
if( name!=null ) {
try {
server.registerMBean(mbean, name);
} catch (Throwable e) {
log.warn("Failed to register MBean: "+name);
log.debug("Failure reason: "+e,e);
}
}
}
protected void unregisterMBean() {
protected void unregisterMBean(ObjectName name) {
if (name != null) {
try {
server.unregisterMBean(name);
}
catch (Throwable e) {
log.warn("Failed to unregister mbean: " + name);
log.debug("Failure reason: "+e,e);
}
}
}
protected ObjectName createObjectName() throws IOException {
protected ObjectName createByAddressObjectName(String type, String value) throws IOException {
// Build the object name for the destination
Hashtable map = connectorName.getKeyPropertyList();
try {
@ -125,11 +128,31 @@ public class ManagedTransportConnection extends TransportConnection {
"BrokerName="+JMXSupport.encodeObjectNamePart((String) map.get("BrokerName"))+","+
"Type=Connection,"+
"ConnectorName="+JMXSupport.encodeObjectNamePart((String) map.get("ConnectorName"))+","+
"Connection="+JMXSupport.encodeObjectNamePart(connectionId)
"ViewType="+JMXSupport.encodeObjectNamePart(type)+","+
"Name="+JMXSupport.encodeObjectNamePart(value)
);
}
catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
protected ObjectName createByClientIdObjectName(String value) throws IOException {
// Build the object name for the destination
Hashtable map = connectorName.getKeyPropertyList();
try {
return new ObjectName(
connectorName.getDomain()+":"+
"BrokerName="+JMXSupport.encodeObjectNamePart((String) map.get("BrokerName"))+","+
"Type=Connection,"+
"ConnectorName="+JMXSupport.encodeObjectNamePart((String) map.get("ConnectorName"))+","+
"Connection="+JMXSupport.encodeObjectNamePart(value)
);
}
catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
}

View File

@ -53,8 +53,7 @@ public class ManagedTransportConnector extends TransportConnector {
}
protected Connection createConnection(Transport transport) throws IOException {
String connectionId = "" + getNextConnectionId();
return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), mbeanServer, connectorName, connectionId);
return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), mbeanServer, connectorName);
}
protected static synchronized long getNextConnectionId() {

View File

@ -20,12 +20,12 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@ -250,7 +250,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if( isSlaveBroker() ) {
throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack+") was not in the dispatch list: "+dispatched);
} else {
throw new JMSException("Invalid acknowledgment: "+ack);
log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
}
}
@ -415,7 +415,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
try{
dispatchMatched();
}catch(IOException e){
context.getConnection().serviceException(e);
context.getConnection().serviceExceptionAsync(e);
}
}
}

View File

@ -38,7 +38,7 @@ public class InactivityMonitor extends TransportFilter {
private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo;
private boolean monitorStarted=false;
private final AtomicBoolean monitorStarted= new AtomicBoolean(false);
private final AtomicBoolean commandSent=new AtomicBoolean(false);
private final AtomicBoolean inSend=new AtomicBoolean(false);
@ -145,13 +145,15 @@ public class InactivityMonitor extends TransportFilter {
}
public void onException(IOException error) {
stopMonitorThreads();
transportListener.onException(error);
if( monitorStarted.get() ) {
stopMonitorThreads();
getTransportListener().onException(error);
}
}
synchronized private void startMonitorThreads() throws IOException {
if( monitorStarted )
if( monitorStarted.get() )
return;
if( localWireFormatInfo == null )
return;
@ -160,9 +162,9 @@ public class InactivityMonitor extends TransportFilter {
long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
if( l > 0 ) {
monitorStarted.set(true);
Scheduler.executePeriodically(writeChecker, l/2);
Scheduler.executePeriodically(readChecker, l);
monitorStarted=true;
}
}
@ -170,10 +172,9 @@ public class InactivityMonitor extends TransportFilter {
*
*/
synchronized private void stopMonitorThreads() {
if( monitorStarted ) {
if( monitorStarted.compareAndSet(true, false) ) {
Scheduler.cancel(readChecker);
Scheduler.cancel(writeChecker);
monitorStarted=false;
}
}

View File

@ -44,9 +44,9 @@ public class MockTransport extends DefaultTransportListener implements Transport
synchronized public void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null)
next.setTransportListener(null);
getNext().setTransportListener(null);
else
next.setTransportListener(this);
getNext().setTransportListener(this);
}
@ -55,26 +55,26 @@ public class MockTransport extends DefaultTransportListener implements Transport
* @throws IOException if the next channel has not been set.
*/
public void start() throws Exception {
if( next == null )
if( getNext() == null )
throw new IOException("The next channel has not been set.");
if( transportListener == null )
throw new IOException("The command listener has not been set.");
next.start();
getNext().start();
}
/**
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception {
next.stop();
getNext().stop();
}
synchronized public void onCommand(Object command) {
transportListener.onCommand(command);
public void onCommand(Object command) {
getTransportListener().onCommand(command);
}
/**
* @return Returns the next.
* @return Returns the getNext().
*/
synchronized public Transport getNext() {
return next;
@ -87,49 +87,49 @@ public class MockTransport extends DefaultTransportListener implements Transport
return transportListener;
}
synchronized public String toString() {
return next.toString();
public String toString() {
return getNext().toString();
}
synchronized public void oneway(Object command) throws IOException {
next.oneway(command);
public void oneway(Object command) throws IOException {
getNext().oneway(command);
}
synchronized public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
return next.asyncRequest(command, null);
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
return getNext().asyncRequest(command, null);
}
synchronized public Object request(Object command) throws IOException {
return next.request(command);
public Object request(Object command) throws IOException {
return getNext().request(command);
}
public Object request(Object command,int timeout) throws IOException {
return next.request(command, timeout);
return getNext().request(command, timeout);
}
synchronized public void onException(IOException error) {
transportListener.onException(error);
public void onException(IOException error) {
getTransportListener().onException(error);
}
synchronized public Object narrow(Class target) {
public Object narrow(Class target) {
if( target.isAssignableFrom(getClass()) ) {
return this;
}
return next.narrow(target);
return getNext().narrow(target);
}
synchronized public void setNext(Transport next) {
this.next = next;
}
synchronized public void install(TransportFilter filter) {
public void install(TransportFilter filter) {
filter.setTransportListener(this);
getNext().setTransportListener(filter);
setNext(filter);
}
public String getRemoteAddress() {
return next.getRemoteAddress();
return getNext().getRemoteAddress();
}
}

View File

@ -0,0 +1,224 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.mock.MockTransport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @version $Revision: 1.1 $
*/
public class ReconnectTest extends TestCase {
protected static final Log log = LogFactory.getLog(ReconnectTest.class);
public static final int MESSAGES_PER_ITTERATION = 10;
public static final int WORKER_COUNT = 10;
private BrokerService bs;
private URI tcpUri;
private AtomicInteger interruptedCount = new AtomicInteger();
private Worker[] workers;
class Worker implements Runnable, ExceptionListener {
private ActiveMQConnection connection;
private AtomicBoolean stop=new AtomicBoolean(false);
public AtomicInteger iterations = new AtomicInteger();
public CountDownLatch stopped = new CountDownLatch(1);
private Throwable error;
public Worker() throws URISyntaxException, JMSException {
URI uri = new URI("failover://(mock://("+tcpUri+"))");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = (ActiveMQConnection)factory.createConnection();
connection.setExceptionListener(this);
connection.addTransportListener(new TransportListener() {
public void onCommand(Object command) {
}
public void onException(IOException error) {
setError(error);
}
public void transportInterupted() {
interruptedCount.incrementAndGet();
}
public void transportResumed() {
}});
connection.start();
}
public void failConnection() {
MockTransport mockTransport = (MockTransport)connection.getTransportChannel().narrow(MockTransport.class);
mockTransport.onException(new IOException("Simulated error"));
}
public void start() {
new Thread(this).start();
}
public void stop() {
stop.set(true);
try {
if( !stopped.await(5, TimeUnit.SECONDS) ) {
connection.close();
stopped.await();
} else {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void run() {
try {
ActiveMQQueue queue = new ActiveMQQueue("FOO");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
while( !stop.get() ) {
for( int i=0; i < MESSAGES_PER_ITTERATION; i++) {
producer.send(session.createTextMessage("TEST:"+i));
}
for( int i=0; i < MESSAGES_PER_ITTERATION; i++) {
consumer.receive();
}
iterations.incrementAndGet();
}
session.close();
} catch (JMSException e) {
setError(e);
} finally {
stopped.countDown();
}
}
public void onException(JMSException error) {
setError(error);
stop();
}
public synchronized Throwable getError() {
return error;
}
public synchronized void setError(Throwable error) {
this.error = error;
}
public synchronized void assertNoErrors() {
if( error !=null ) {
error.printStackTrace();
fail("Got Exception: "+error);
}
}
}
public void testReconnects() throws Exception {
for( int k=1; k < 5; k++ ) {
System.out.println("Test run: "+k);
// Wait for at least one iteration to occur...
for (int i=0; i < WORKER_COUNT; i++) {
for( int j=0; workers[i].iterations.get() == 0 && j < 5; j++ ) {
workers[i].assertNoErrors();
System.out.println("Waiting for worker "+i+" to finish an iteration.");
Thread.sleep(1000);
}
assertTrue("Worker "+i+" never completed an interation.", workers[i].iterations.get()!=0);
workers[i].assertNoErrors();
}
System.out.println("Simulating transport error to cause reconnect.");
// Simulate a transport failure.
for (int i=0; i < WORKER_COUNT; i++) {
workers[i].failConnection();
}
// Wait for the connections to get interrupted...
while ( interruptedCount.get() < WORKER_COUNT ) {
System.out.println("Waiting for connections to get interrupted.. at: "+interruptedCount.get());
Thread.sleep(1000);
}
// let things stablize..
System.out.println("Pausing before starting next iterations...");
Thread.sleep(1000);
// Reset the counters..
interruptedCount.set(0);
for (int i=0; i < WORKER_COUNT; i++) {
workers[i].iterations.set(0);
}
}
}
protected void setUp() throws Exception {
bs = new BrokerService();
bs.setPersistent(false);
bs.setUseJmx(true);
TransportConnector connector = bs.addConnector("tcp://localhost:0");
bs.start();
tcpUri = connector.getConnectUri();
workers = new Worker[WORKER_COUNT];
for (int i=0; i < WORKER_COUNT; i++) {
workers[i] = new Worker();
workers[i].start();
}
}
protected void tearDown() throws Exception {
for (int i=0; i < WORKER_COUNT; i++) {
workers[i].stop();
}
new ServiceStopper().stop(bs);
}
}