diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 69a0c832ee..37205dc813 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -179,10 +179,35 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if (e instanceof IOException) { serviceTransportException((IOException) e); } + + // Handle the case where the broker is stopped + // But the client is still connected. + else if (e.getClass() == BrokerStoppedException.class ) { + if( !disposed ) { + if( serviceLog.isDebugEnabled() ) + serviceLog.debug("Broker has been stopped. Notifying client and closing his connection."); + + ConnectionError ce = new ConnectionError(); + ce.setException(e); + dispatchSync(ce); + + // Wait a little bit to try to get the output buffer to flush the exption notification to the client. + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + + // Worst case is we just kill the connection before the notification gets to him. + ServiceSupport.dispose(this); + } + } + else if( !disposed && !inServiceException ) { inServiceException = true; try { - serviceLog.info("Async error occurred: "+e,e); + if( serviceLog.isDebugEnabled() ) + serviceLog.debug("Async error occurred: "+e,e); ConnectionError ce = new ConnectionError(); ce.setException(e); dispatchAsync(ce); @@ -201,7 +226,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C response = command.visit(this); } catch ( Throwable e ) { if( responseRequired ) { - serviceLog.info("Sync error occurred: "+e,e); + if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class ) + serviceLog.debug("Error occured while processing sync command: "+e,e); response = new ExceptionResponse(e); } else { serviceException(e); @@ -559,6 +585,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C processDispatch(message); } + public void dispatchAsync(Command message) { if( taskRunner==null ) { dispatchSync( message ); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerStoppedException.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerStoppedException.java new file mode 100644 index 0000000000..cf83298b87 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerStoppedException.java @@ -0,0 +1,44 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +/** + * This exception is thrown by the broker when you try to use it after it has been stopped. + * + * @author chirino + */ +public class BrokerStoppedException extends IllegalStateException { + + private static final long serialVersionUID = -3435230276850902220L; + + public BrokerStoppedException() { + super(); + } + + public BrokerStoppedException(String message, Throwable cause) { + super(message, cause); + } + + public BrokerStoppedException(String s) { + super(s); + } + + public BrokerStoppedException(Throwable cause) { + super(cause); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 1300d1f06d..a48aaaaa80 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.util.Collections; import java.util.Map; import java.util.Set; + import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; @@ -37,7 +38,7 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; /** - * Implementation of the broker where all it's methods throw an IllegalStateException. + * Implementation of the broker where all it's methods throw an BrokerStoppedException. * * @version $Revision$ */ @@ -61,132 +62,132 @@ public class ErrorBroker implements Broker { } public BrokerId getBrokerId() { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public String getBrokerName() { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void addSession(ConnectionContext context, SessionInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public Connection[] getClients() throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public ActiveMQDestination[] getDestinations() throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void send(ConnectionContext context, Message message) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void gc() { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void start() throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void stop() throws Exception { - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void addBroker(Connection connection,BrokerInfo info){ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeBroker(Connection connection,BrokerInfo info){ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public BrokerInfo[] getPeerBrokerInfos(){ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void processDispatch(MessageDispatch messageDispatch){ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public boolean isSlaveBroker(){ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public boolean isStopped(){ @@ -194,21 +195,21 @@ public class ErrorBroker implements Broker { } public Set getDurableDestinations(){ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); } public boolean isFaultTolerantConfiguration(){ - throw new IllegalStateException(this.message); + throw new BrokerStoppedException(this.message); }