mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418119 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2026e6e831
commit
35fb3d1c84
|
@ -179,10 +179,35 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
if (e instanceof IOException) {
|
||||
serviceTransportException((IOException) e);
|
||||
}
|
||||
|
||||
// Handle the case where the broker is stopped
|
||||
// But the client is still connected.
|
||||
else if (e.getClass() == BrokerStoppedException.class ) {
|
||||
if( !disposed ) {
|
||||
if( serviceLog.isDebugEnabled() )
|
||||
serviceLog.debug("Broker has been stopped. Notifying client and closing his connection.");
|
||||
|
||||
ConnectionError ce = new ConnectionError();
|
||||
ce.setException(e);
|
||||
dispatchSync(ce);
|
||||
|
||||
// Wait a little bit to try to get the output buffer to flush the exption notification to the client.
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
// Worst case is we just kill the connection before the notification gets to him.
|
||||
ServiceSupport.dispose(this);
|
||||
}
|
||||
}
|
||||
|
||||
else if( !disposed && !inServiceException ) {
|
||||
inServiceException = true;
|
||||
try {
|
||||
serviceLog.info("Async error occurred: "+e,e);
|
||||
if( serviceLog.isDebugEnabled() )
|
||||
serviceLog.debug("Async error occurred: "+e,e);
|
||||
ConnectionError ce = new ConnectionError();
|
||||
ce.setException(e);
|
||||
dispatchAsync(ce);
|
||||
|
@ -201,7 +226,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
response = command.visit(this);
|
||||
} catch ( Throwable e ) {
|
||||
if( responseRequired ) {
|
||||
serviceLog.info("Sync error occurred: "+e,e);
|
||||
if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class )
|
||||
serviceLog.debug("Error occured while processing sync command: "+e,e);
|
||||
response = new ExceptionResponse(e);
|
||||
} else {
|
||||
serviceException(e);
|
||||
|
@ -559,6 +585,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
processDispatch(message);
|
||||
}
|
||||
|
||||
|
||||
public void dispatchAsync(Command message) {
|
||||
if( taskRunner==null ) {
|
||||
dispatchSync( message );
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
/**
|
||||
* This exception is thrown by the broker when you try to use it after it has been stopped.
|
||||
*
|
||||
* @author chirino
|
||||
*/
|
||||
public class BrokerStoppedException extends IllegalStateException {
|
||||
|
||||
private static final long serialVersionUID = -3435230276850902220L;
|
||||
|
||||
public BrokerStoppedException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public BrokerStoppedException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public BrokerStoppedException(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
||||
public BrokerStoppedException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.broker;
|
|||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.Subscription;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
@ -37,7 +38,7 @@ import org.apache.activemq.command.SessionInfo;
|
|||
import org.apache.activemq.command.TransactionId;
|
||||
|
||||
/**
|
||||
* Implementation of the broker where all it's methods throw an IllegalStateException.
|
||||
* Implementation of the broker where all it's methods throw an BrokerStoppedException.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
|
@ -61,132 +62,132 @@ public class ErrorBroker implements Broker {
|
|||
}
|
||||
|
||||
public BrokerId getBrokerId() {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public String getBrokerName() {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public Connection[] getClients() throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public ActiveMQDestination[] getDestinations() throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void send(ConnectionContext context, Message message) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void gc() {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void addBroker(Connection connection,BrokerInfo info){
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
|
||||
}
|
||||
|
||||
public void removeBroker(Connection connection,BrokerInfo info){
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public BrokerInfo[] getPeerBrokerInfos(){
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void processDispatch(MessageDispatch messageDispatch){
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public boolean isSlaveBroker(){
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public boolean isStopped(){
|
||||
|
@ -194,21 +195,21 @@ public class ErrorBroker implements Broker {
|
|||
}
|
||||
|
||||
public Set getDurableDestinations(){
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
|
||||
}
|
||||
|
||||
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
|
||||
}
|
||||
|
||||
public boolean isFaultTolerantConfiguration(){
|
||||
throw new IllegalStateException(this.message);
|
||||
throw new BrokerStoppedException(this.message);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue