mirror of https://github.com/apache/activemq.git
Ensure that the connection check task is stopped once commands pass through the inactivity monitor to prevent the transport from being closed for no reason.
This commit is contained in:
parent
e996dbe7c2
commit
e47edd7a28
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.http;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.transport.InactivityMonitor;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Inactivity Monitor specialization for use with HTTP based transports.
|
||||
*/
|
||||
public class HttpInactivityMonitor extends InactivityMonitor {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HttpInactivityMonitor.class);
|
||||
|
||||
/**
|
||||
* @param next
|
||||
* The next Transport in the filter chain.
|
||||
*/
|
||||
public HttpInactivityMonitor(Transport next) {
|
||||
super(next, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCommand(Object command) {
|
||||
if (command.getClass() == ConnectionInfo.class || command.getClass() == BrokerInfo.class) {
|
||||
synchronized (this) {
|
||||
try {
|
||||
LOG.trace("Connection {} attempted on HTTP based transport: {}", command, this);
|
||||
processInboundWireFormatInfo(null);
|
||||
} catch (IOException e) {
|
||||
onException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
super.onCommand(command);
|
||||
}
|
||||
}
|
|
@ -23,7 +23,6 @@ import java.net.URISyntaxException;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.transport.InactivityMonitor;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportLoggerFactory;
|
||||
|
@ -60,7 +59,7 @@ public class HttpTransportFactory extends TransportFactory {
|
|||
if (wireFormat instanceof TextWireFormat) {
|
||||
return (TextWireFormat)wireFormat;
|
||||
}
|
||||
LOG.trace("Not created with a TextWireFormat: " + wireFormat);
|
||||
LOG.trace("Not created with a TextWireFormat: {}", wireFormat);
|
||||
return new XStreamWireFormat();
|
||||
}
|
||||
|
||||
|
@ -94,8 +93,8 @@ public class HttpTransportFactory extends TransportFactory {
|
|||
@SuppressWarnings("rawtypes")
|
||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||
transport = super.compositeConfigure(transport, format, options);
|
||||
HttpClientTransport httpTransport = (HttpClientTransport)transport.narrow(HttpClientTransport.class);
|
||||
if(httpTransport != null && httpTransport.isTrace() ) {
|
||||
HttpClientTransport httpTransport = transport.narrow(HttpClientTransport.class);
|
||||
if (httpTransport != null && httpTransport.isTrace()) {
|
||||
try {
|
||||
transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
|
||||
} catch (Throwable e) {
|
||||
|
@ -104,7 +103,7 @@ public class HttpTransportFactory extends TransportFactory {
|
|||
}
|
||||
boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
|
||||
if (useInactivityMonitor) {
|
||||
transport = new InactivityMonitor(transport, null /* ignore wire format as no negotiation over http */);
|
||||
transport = new HttpInactivityMonitor(transport);
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.http;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HttpTransportConnectTimeoutTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HttpTransportConnectTimeoutTest.class);
|
||||
|
||||
private BrokerService broker;
|
||||
private ActiveMQConnectionFactory factory;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
broker = new BrokerService();
|
||||
TransportConnector connector = broker.addConnector(
|
||||
"http://localhost:0?trace=true&transport.connectAttemptTimeout=2000");
|
||||
broker.setPersistent(false);
|
||||
broker.setUseJmx(false);
|
||||
broker.start();
|
||||
|
||||
String connectionUri = connector.getPublishableConnectString();
|
||||
factory = new ActiveMQConnectionFactory(connectionUri + "?trace=true");
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendReceiveAfterPause() throws Exception {
|
||||
final CountDownLatch failed = new CountDownLatch(1);
|
||||
|
||||
Connection connection = factory.createConnection();
|
||||
connection.start();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
LOG.info("Connection failed due to: {}", exception.getMessage());
|
||||
failed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertFalse(failed.await(3, TimeUnit.SECONDS));
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createTemporaryQueue();
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
producer.send(session.createMessage());
|
||||
|
||||
assertNotNull(consumer.receive(5000));
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@
|
|||
log4j.rootLogger=INFO, out, stdout
|
||||
|
||||
log4j.logger.org.apache.activemq.transport.ws=DEBUG
|
||||
log4j.logger.org.apache.activemq.transport.http=DEBUG
|
||||
|
||||
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
|
||||
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
|
||||
|
|
Loading…
Reference in New Issue