diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index c9c6d397f8..99d5bb64a3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -140,7 +140,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private int closeTimeout = 15000; private boolean useSyncSend=false; private boolean watchTopicAdvisories=true; - + private long warnAboutUnstartedConnectionTimeout = 500L; + + private final Transport transport; private final IdGenerator clientIdGenerator; private final JMSStatsImpl factoryStats; @@ -176,6 +178,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // Assume that protocol is the latest. Change to the actual protocol // version when a WireFormatInfo is received. private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); + private long timeCreated; /** * Construct an ActiveMQConnection @@ -207,6 +210,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); this.factoryStats.addConnection(this); + this.timeCreated = System.currentTimeMillis(); } @@ -1505,6 +1509,28 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.optimizeAcknowledge=optimizeAcknowledge; } + public long getWarnAboutUnstartedConnectionTimeout() { + return warnAboutUnstartedConnectionTimeout; + } + + /** + * Enables the timemout from a session creation to when a warning is generated + * if the connection is not properly started via {@link #start()}. It is a very + * common gotcha to forget to + * start the connection + * so this option makes the default case to create a warning if the user forgets. + * To disable the warning just set the value to < 0 (say -1). + */ + public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { + this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; + } + + /** + * Returns the time this connection was created + */ + public long getTimeCreated() { + return timeCreated; + } private void waitForBrokerInfo() throws JMSException { try { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index 7f298b1f34..8e35845d77 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -28,6 +28,8 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A utility class used by the Session for dispatching messages asynchronously to consumers @@ -36,11 +38,14 @@ import org.apache.activemq.util.JMSExceptionSupport; * @see javax.jms.Session */ public class ActiveMQSessionExecutor implements Task { - + private static final transient Log log = LogFactory.getLog(ActiveMQSessionExecutor.class); + private ActiveMQSession session; private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); private boolean dispatchedBySessionPool; private TaskRunner taskRunner; + private boolean startedOrWarnedThatNotStarted; + private long warnAboutUnstartedConnectionTime = 500L; ActiveMQSessionExecutor(ActiveMQSession session) { this.session = session; @@ -53,6 +58,24 @@ public class ActiveMQSessionExecutor implements Task { void execute(MessageDispatch message) throws InterruptedException { + if (!startedOrWarnedThatNotStarted) { + + ActiveMQConnection connection = session.connection; + long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout(); + if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) { + startedOrWarnedThatNotStarted = true; + } + else { + long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated(); + + // lets only warn when a significant amount of time has passed just in case its normal operation + if (elapsedTime > aboutUnstartedConnectionTimeout) { + log.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection + " Received: " + message); + startedOrWarnedThatNotStarted = true; + } + } + } + if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){ dispatch(message); }else { diff --git a/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java b/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java new file mode 100644 index 0000000000..8ff83cb913 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.JMSException; + +/** + * @version $Revision: 1.1 $ + */ +public class CreateConsumerButDontStartConnectionWarningTest extends JmsQueueSendReceiveTest { + private static final transient Log log = LogFactory.getLog(CreateConsumerButDontStartConnectionWarningTest.class); + + @Override + protected void startConnection() throws JMSException { + // don't start the connection + } + + @Override + protected void assertMessagesAreReceived() throws JMSException { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + log.warn("Caught: " + e, e); + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java index f1320f6d51..a156a984f2 100755 --- a/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java @@ -68,11 +68,15 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport { log.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); consumer = createConsumer(); consumer.setMessageListener(this); - connection.start(); + startConnection(); log.info("Created connection: " + connection); } + protected void startConnection() throws JMSException { + connection.start(); + } + protected void tearDown() throws Exception { log.info("Dumping stats..."); //TODO