git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@663059 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-06-04 10:52:13 +00:00
parent 21568866ad
commit 0acf54586f
7 changed files with 109 additions and 12 deletions

View File

@ -118,6 +118,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Connection state variables
private final ConnectionInfo info;
private ExceptionListener exceptionListener;
private ClientInternalExceptionListener clientInternalExceptionListener;
private boolean clientIDSet;
private boolean isConnectionInfoSentToBroker;
private boolean userSpecifiedClientID;
@ -404,7 +405,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* associated with it.
*
* @return the <CODE>ExceptionListener</CODE> for this connection, or
* null. if no <CODE>ExceptionListener</CODE> is associated with
* null, if no <CODE>ExceptionListener</CODE> is associated with
* this connection.
* @throws JMSException if the JMS provider fails to get the
* <CODE>ExceptionListener</CODE> for this connection.
@ -443,6 +444,32 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.exceptionListener = listener;
}
/**
* Gets the <code>ClientInternalExceptionListener</code> object for this connection.
* Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
* associated with it.
*
* @return the listener or <code>null</code> if no listener is registered with the connection.
*/
public ClientInternalExceptionListener getClientInternalExceptionListener()
{
return clientInternalExceptionListener;
}
/**
* Sets a client internal exception listener for this connection.
* The connection will notify the listener, if one has been registered, of exceptions thrown by container components
* (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
* It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
* describing the problem.
*
* @param listener the exception listener
*/
public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
{
this.clientInternalExceptionListener = listener;
}
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call
* to <CODE>start</CODE> on a connection that has already been started is
@ -1672,7 +1699,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
});
} catch (Exception e) {
onAsyncException(e);
onClientInternalException(e);
}
}
@ -1686,6 +1713,30 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protocolVersion.set(info.getVersion());
}
/**
* Handles async client internal exceptions.
* A client internal exception is usually one that has been thrown
* by a container runtie component during asynchronous processing of a
* message that does not affect the connection itself.
* This method notifies the <code>ClientInternalExceptionListener</code> by invoking
* its <code>onException</code> method, if one has been registered with this connection.
*
* @param error the exception that the problem
*/
public void onClientInternalException(final Throwable error) {
if ( !closed.get() && !closing.get() ) {
if ( this.clientInternalExceptionListener != null ) {
asyncConnectionThread.execute(new Runnable() {
public void run() {
ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
}
});
} else {
LOG.debug("Async client internal exception occurred with no exception listener registered: "
+ error, error);
}
}
}
/**
* Used for handling async exceptions
*

View File

@ -139,7 +139,7 @@ public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQD
ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
session = (ActiveMQSession)queueSession.getNext();
} else {
connection.onAsyncException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
return;
}

View File

@ -1013,7 +1013,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
Thread.yield();
}
} catch (Exception e) {
session.connection.onAsyncException(e);
session.connection.onClientInternalException(e);
}
}
@ -1057,7 +1057,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
listener.onMessage(message);
afterMessageIsConsumed(md, false);
} catch (JMSException e) {
session.connection.onAsyncException(e);
session.connection.onClientInternalException(e);
}
return true;
}

View File

@ -186,7 +186,7 @@ public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
return answer;
}
} catch (JMSException e) {
this.session.connection.onAsyncException(e);
this.session.connection.onClientInternalException(e);
return null;
}

View File

@ -733,10 +733,15 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
try {
messageListener.onMessage(message);
} catch (Throwable e) {
// TODO: figure out proper way to handle error.
} catch (RuntimeException e) {
LOG.error("error dispatching message: ", e);
connection.onAsyncException(e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
}
try {
@ -786,7 +791,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onAsyncException(e);
connection.onClientInternalException(e);
}
if (deliveryListener != null) {
@ -1431,7 +1436,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
executor.execute(messageDispatch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
connection.onAsyncException(e);
connection.onClientInternalException(e);
}
}

View File

@ -72,7 +72,7 @@ public class AdvisoryConsumer implements ActiveMQDispatcher {
connection.asyncSendPacket(ack);
deliveredCounter = 0;
} catch (JMSException e) {
connection.onAsyncException(e);
connection.onClientInternalException(e);
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
/**
* An exception listener similar to the standard <code>javax.jms.ExceptionListener</code>
* which can be used by client code to be notified of exceptions thrown by container components
* (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
* <p>
* The <code>org.apache.activemq.ActiveMQConnection</code> that the listener has been registered with does
* this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> describing
* the problem.
* </p>
*
* @author Kai Hudalla
* @see ActiveMQConnection#setClientInternalExceptionListener(org.apache.activemq.ClientInternalExceptionListener)
*/
public interface ClientInternalExceptionListener
{
/**
* Notifies a client of an exception while asynchronously processing a message.
*
* @param exception the exception describing the problem
*/
void onException(Throwable exception);
}