git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1441212 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-31 21:50:04 +00:00
parent 75d4b7643f
commit 849baa6fbc
2 changed files with 150 additions and 0 deletions

View File

@ -224,6 +224,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Configure a single threaded executor who's core thread can timeout if
// idle
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
//Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
@ -318,6 +319,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
@ -352,6 +354,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @throws JMSException if the JMS provider fails to return the client ID
* for this connection due to some internal error.
*/
@Override
public String getClientID() throws JMSException {
checkClosedOrFailed();
return this.info.getClientId();
@ -395,6 +398,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* a connection's client ID at the wrong time or when it has
* been administratively configured.
*/
@Override
public void setClientID(String newClientID) throws JMSException {
checkClosedOrFailed();
@ -428,6 +432,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* metadata for this connection.
* @see javax.jms.ConnectionMetaData
*/
@Override
public ConnectionMetaData getMetaData() throws JMSException {
checkClosedOrFailed();
return ActiveMQConnectionMetaData.INSTANCE;
@ -445,6 +450,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* <CODE>ExceptionListener</CODE> for this connection.
* @see javax.jms.Connection#setExceptionListener(ExceptionListener)
*/
@Override
public ExceptionListener getExceptionListener() throws JMSException {
checkClosedOrFailed();
return this.exceptionListener;
@ -473,6 +479,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @throws JMSException if the JMS provider fails to set the exception
* listener for this connection.
*/
@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosedOrFailed();
this.exceptionListener = listener;
@ -511,6 +518,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* due to some internal error.
* @see javax.jms.Connection#stop()
*/
@Override
public void start() throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
@ -553,6 +561,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* due to some internal error.
* @see javax.jms.Connection#start()
*/
@Override
public void stop() throws JMSException {
checkClosedOrFailed();
if (started.compareAndSet(true, false)) {
@ -608,6 +617,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* release resources or to close a socket connection can
* cause this exception to be thrown.
*/
@Override
public void close() throws JMSException {
// Store the interrupted state and clear so that cleanup happens without
// leaking connection resources. Reset in finally to preserve state.
@ -744,6 +754,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
throws JMSException {
return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
@ -1031,6 +1042,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @return a lazily created destination source
* @throws JMSException
*/
@Override
public DestinationSource getDestinationSource() throws JMSException {
if (destinationSource == null) {
destinationSource = new DestinationSource(this);
@ -1105,6 +1117,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
@Override
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
@ -1133,6 +1146,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* invalid.
* @see javax.jms.ConnectionConsumer
*/
@Override
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
}
@ -1161,6 +1175,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* invalid.
* @see javax.jms.ConnectionConsumer
*/
@Override
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
}
@ -1190,6 +1205,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
}
@ -1250,6 +1266,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
@Override
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
@ -1432,6 +1449,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* @return statistics for this Connection
*/
@Override
public StatsImpl getStats() {
return stats;
}
@ -1590,6 +1608,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
started.set(false);
}
@Override
public void finalize() throws Throwable{
Scheduler s = this.scheduler;
if (s != null){
@ -1811,6 +1830,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* @param o - the command to consume
*/
@Override
public void onCommand(final Object o) {
final Command command = (Command)o;
if (!closed.get() && command != null) {
@ -1832,6 +1852,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
@ -1862,6 +1883,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
@Override
public Response processConnectionError(final ConnectionError error) throws Exception {
executor.execute(new Runnable() {
@Override
public void run() {
onAsyncException(error.getException());
}
@ -1922,6 +1944,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if ( !closed.get() && !closing.get() ) {
if ( this.clientInternalExceptionListener != null ) {
executor.execute(new Runnable() {
@Override
public void run() {
ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
}
@ -1948,6 +1971,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
final JMSException e = (JMSException)error;
executor.execute(new Runnable() {
@Override
public void run() {
ActiveMQConnection.this.exceptionListener.onException(e);
}
@ -1959,10 +1983,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
@Override
public void onException(final IOException error) {
onAsyncException(error);
if (!closing.get() && !closed.get()) {
executor.execute(new Runnable() {
@Override
public void run() {
transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
@ -1981,6 +2007,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
@Override
public void transportInterupted() {
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
if (LOG.isDebugEnabled()) {
@ -2003,6 +2030,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
@Override
public void transportResumed() {
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
@ -2141,34 +2169,42 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.objectMessageSerializationDefered = objectMessageSerializationDefered;
}
@Override
public InputStream createInputStream(Destination dest) throws JMSException {
return createInputStream(dest, null);
}
@Override
public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
return createInputStream(dest, messageSelector, false);
}
@Override
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
return createInputStream(dest, messageSelector, noLocal, -1);
}
@Override
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
}
@Override
public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
return createInputStream(dest, null, false);
}
@Override
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
return createDurableInputStream(dest, name, messageSelector, false);
}
@Override
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
}
@Override
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
}
@ -2183,6 +2219,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Creates a persistent output stream; individual messages will be written
* to disk/database by the broker
*/
@Override
public OutputStream createOutputStream(Destination dest) throws JMSException {
return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
}
@ -2207,6 +2244,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* {@link javax.jms.Message#setObjectProperty(String, Object)}
* method
*/
@Override
public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
@ -2232,6 +2270,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* specified.
* @since 1.1
*/
@Override
public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Assert;
public class AMQ4116Test extends EmbeddedBrokerTestSupport {
private final String tcpAddr = "tcp://localhost:0";
private String connectionUri;
/**
* In this test, a message is produced and consumed from the test queue.
* Memory usage on the test queue should be reset to 0. The memory that was
* consumed is then sent to a second queue. Memory usage on the original
* test queue should remain 0, but actually increased when the second
* enqueue occurs.
*/
public void testVMTransport() throws Exception {
runTest(connectionFactory);
}
/**
* This is an analog to the previous test, but occurs over TCP and passes.
*/
public void testTCPTransport() throws Exception {
runTest(new ActiveMQConnectionFactory(connectionUri));
}
private void runTest(ConnectionFactory connFactory) throws Exception {
// Verify that test queue is empty and not using any memory.
Destination physicalDestination = broker.getDestination(destination);
Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
// Enqueue a single message and verify that the test queue is using
// memory.
Connection conn = connFactory.createConnection();
conn.start();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
producer.send(new ActiveMQMessage());
// Commit, which ensures message is in queue and memory usage updated.
session.commit();
Assert.assertTrue(physicalDestination.getMemoryUsage().getUsage() > 0);
// Consume the message and verify that the test queue is no longer using
// any memory.
MessageConsumer consumer = session.createConsumer(destination);
Message received = consumer.receive();
Assert.assertNotNull(received);
// Commit, which ensures message is removed from queue and memory usage
// updated.
session.commit();
Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
// Resend the message to a different queue and verify that the original
// test queue is still not using any memory.
ActiveMQQueue secondDestination = new ActiveMQQueue(AMQ4116Test.class + ".second");
MessageProducer secondPproducer = session.createProducer(secondDestination);
secondPproducer.send(received);
// Commit, which ensures message is in queue and memory usage updated.
// NOTE: This assertion fails due to bug.
session.commit();
Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
conn.stop();
}
/**
* Create an embedded broker that has both TCP and VM connectors.
*/
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
connectionUri = broker.addConnector(tcpAddr).getPublishableConnectString();
return broker;
}
}