mirror of https://github.com/apache/activemq.git
added fix for AMQ-1253 to log a warning if users forget to start the connection within a small timeout period (500ms by default)
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@541256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e2b561e12b
commit
78cf4b1ac1
|
@ -140,6 +140,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
private int closeTimeout = 15000;
|
private int closeTimeout = 15000;
|
||||||
private boolean useSyncSend=false;
|
private boolean useSyncSend=false;
|
||||||
private boolean watchTopicAdvisories=true;
|
private boolean watchTopicAdvisories=true;
|
||||||
|
private long warnAboutUnstartedConnectionTimeout = 500L;
|
||||||
|
|
||||||
|
|
||||||
private final Transport transport;
|
private final Transport transport;
|
||||||
private final IdGenerator clientIdGenerator;
|
private final IdGenerator clientIdGenerator;
|
||||||
|
@ -176,6 +178,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
// Assume that protocol is the latest. Change to the actual protocol
|
// Assume that protocol is the latest. Change to the actual protocol
|
||||||
// version when a WireFormatInfo is received.
|
// version when a WireFormatInfo is received.
|
||||||
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
|
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
|
||||||
|
private long timeCreated;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct an <code>ActiveMQConnection</code>
|
* Construct an <code>ActiveMQConnection</code>
|
||||||
|
@ -207,6 +210,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
|
|
||||||
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
|
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
|
||||||
this.factoryStats.addConnection(this);
|
this.factoryStats.addConnection(this);
|
||||||
|
this.timeCreated = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1505,6 +1509,28 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
this.optimizeAcknowledge=optimizeAcknowledge;
|
this.optimizeAcknowledge=optimizeAcknowledge;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getWarnAboutUnstartedConnectionTimeout() {
|
||||||
|
return warnAboutUnstartedConnectionTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enables the timemout from a session creation to when a warning is generated
|
||||||
|
* if the connection is not properly started via {@link #start()}. It is a very
|
||||||
|
* common gotcha to forget to
|
||||||
|
* <a href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start the connection</a>
|
||||||
|
* so this option makes the default case to create a warning if the user forgets.
|
||||||
|
* To disable the warning just set the value to < 0 (say -1).
|
||||||
|
*/
|
||||||
|
public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
|
||||||
|
this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the time this connection was created
|
||||||
|
*/
|
||||||
|
public long getTimeCreated() {
|
||||||
|
return timeCreated;
|
||||||
|
}
|
||||||
|
|
||||||
private void waitForBrokerInfo() throws JMSException {
|
private void waitForBrokerInfo() throws JMSException {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.thread.Task;
|
import org.apache.activemq.thread.Task;
|
||||||
import org.apache.activemq.thread.TaskRunner;
|
import org.apache.activemq.thread.TaskRunner;
|
||||||
import org.apache.activemq.util.JMSExceptionSupport;
|
import org.apache.activemq.util.JMSExceptionSupport;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A utility class used by the Session for dispatching messages asynchronously to consumers
|
* A utility class used by the Session for dispatching messages asynchronously to consumers
|
||||||
|
@ -36,11 +38,14 @@ import org.apache.activemq.util.JMSExceptionSupport;
|
||||||
* @see javax.jms.Session
|
* @see javax.jms.Session
|
||||||
*/
|
*/
|
||||||
public class ActiveMQSessionExecutor implements Task {
|
public class ActiveMQSessionExecutor implements Task {
|
||||||
|
private static final transient Log log = LogFactory.getLog(ActiveMQSessionExecutor.class);
|
||||||
|
|
||||||
private ActiveMQSession session;
|
private ActiveMQSession session;
|
||||||
private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
|
private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
|
||||||
private boolean dispatchedBySessionPool;
|
private boolean dispatchedBySessionPool;
|
||||||
private TaskRunner taskRunner;
|
private TaskRunner taskRunner;
|
||||||
|
private boolean startedOrWarnedThatNotStarted;
|
||||||
|
private long warnAboutUnstartedConnectionTime = 500L;
|
||||||
|
|
||||||
ActiveMQSessionExecutor(ActiveMQSession session) {
|
ActiveMQSessionExecutor(ActiveMQSession session) {
|
||||||
this.session = session;
|
this.session = session;
|
||||||
|
@ -53,6 +58,24 @@ public class ActiveMQSessionExecutor implements Task {
|
||||||
|
|
||||||
|
|
||||||
void execute(MessageDispatch message) throws InterruptedException {
|
void execute(MessageDispatch message) throws InterruptedException {
|
||||||
|
if (!startedOrWarnedThatNotStarted) {
|
||||||
|
|
||||||
|
ActiveMQConnection connection = session.connection;
|
||||||
|
long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
|
||||||
|
if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
|
||||||
|
startedOrWarnedThatNotStarted = true;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
|
||||||
|
|
||||||
|
// lets only warn when a significant amount of time has passed just in case its normal operation
|
||||||
|
if (elapsedTime > aboutUnstartedConnectionTimeout) {
|
||||||
|
log.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection + " Received: " + message);
|
||||||
|
startedOrWarnedThatNotStarted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){
|
if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){
|
||||||
dispatch(message);
|
dispatch(message);
|
||||||
}else {
|
}else {
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1 $
|
||||||
|
*/
|
||||||
|
public class CreateConsumerButDontStartConnectionWarningTest extends JmsQueueSendReceiveTest {
|
||||||
|
private static final transient Log log = LogFactory.getLog(CreateConsumerButDontStartConnectionWarningTest.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void startConnection() throws JMSException {
|
||||||
|
// don't start the connection
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertMessagesAreReceived() throws JMSException {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.warn("Caught: " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -68,11 +68,15 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport {
|
||||||
log.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
|
log.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
|
||||||
consumer = createConsumer();
|
consumer = createConsumer();
|
||||||
consumer.setMessageListener(this);
|
consumer.setMessageListener(this);
|
||||||
connection.start();
|
startConnection();
|
||||||
|
|
||||||
log.info("Created connection: " + connection);
|
log.info("Created connection: " + connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void startConnection() throws JMSException {
|
||||||
|
connection.start();
|
||||||
|
}
|
||||||
|
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
log.info("Dumping stats...");
|
log.info("Dumping stats...");
|
||||||
//TODO
|
//TODO
|
||||||
|
|
Loading…
Reference in New Issue