This commit is contained in:
Justin Bertram 2020-11-18 15:04:52 -06:00
commit bfca1c59de
7 changed files with 187 additions and 5 deletions

View File

@ -39,7 +39,6 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
import org.apache.activemq.artemis.core.remoting.impl.netty.HttpAcceptorHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnable;
@ -47,6 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
@ -85,6 +85,10 @@ public class ProtocolHandler {
return new ProtocolDecoder(true, false);
}
public HttpKeepAliveRunnable getHttpKeepAliveRunnable() {
return httpKeepAliveRunnable;
}
public void close() {
if (httpKeepAliveRunnable != null) {
httpKeepAliveRunnable.close();
@ -97,6 +101,8 @@ public class ProtocolHandler {
class ProtocolDecoder extends ByteToMessageDecoder {
private static final String HTTP_HANDLER = "http-handler";
private final boolean http;
private final boolean httpEnabled;
@ -141,7 +147,7 @@ public class ProtocolHandler {
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, TransportConstants.DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH, nettyAcceptor.getConfiguration())));
ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this);
ctx.pipeline().remove("http-handler");
ctx.pipeline().remove(HTTP_HANDLER);
ctx.fireChannelRead(msg);
} else if (upgrade != null && upgrade.equalsIgnoreCase(NettyConnector.ACTIVEMQ_REMOTING)) { // HORNETQ-1391
// Send the response and close the connection if necessary.
@ -246,7 +252,7 @@ public class ProtocolHandler {
}
long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, nettyAcceptor.getConfiguration());
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime, ctx.channel());
ctx.pipeline().addLast("http-handler", httpHandler);
ctx.pipeline().addLast(HTTP_HANDLER, httpHandler);
p.addLast(new ProtocolDecoder(false, true));
p.remove(this);
}

View File

@ -70,6 +70,12 @@ public class HttpAcceptorHandler extends ChannelDuplexHandler {
channel = null;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
httpKeepAliveTask.unregisterKeepAliveHandler(this);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
FullHttpRequest request = (FullHttpRequest) msg;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
@ -43,6 +44,10 @@ public class HttpKeepAliveRunnable implements Runnable {
}
}
public List<HttpAcceptorHandler> getHandlers() {
return Collections.unmodifiableList(handlers);
}
public synchronized void registerKeepAliveHandler(final HttpAcceptorHandler httpAcceptorHandler) {
handlers.add(httpAcceptorHandler);
}

View File

@ -491,6 +491,11 @@ public class NettyAcceptor extends AbstractAcceptor {
return connections;
}
// Only for testing purposes
public ProtocolHandler getProtocolHandler() {
return protocolHandler;
}
// only for testing purposes
public void setKeyStorePath(String keyStorePath) {
this.keyStorePath = keyStorePath;

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.Mockito.spy;
/**
* HttpAcceptorHandlerTest
*/
@RunWith(MockitoJUnitRunner.class)
public class HttpAcceptorHandlerTest {
private static final String HTTP_HANDLER = "http-handler";
private HttpKeepAliveRunnable spy;
@Before
public void setUp() throws Exception {
spy = spy(new HttpKeepAliveRunnable());
}
@Test
public void testUnregisterIsCalledTwiceWhenChannelIsInactive() {
EmbeddedChannel channel = new EmbeddedChannel();
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(spy, 1000, channel);
channel.pipeline().addLast(HTTP_HANDLER, httpHandler);
channel.close();
Mockito.verify(spy, Mockito.times(2)).unregisterKeepAliveHandler(httpHandler);
}
@Test
public void testUnregisterIsCalledWhenHandlerIsRemovedFromPipeline() {
EmbeddedChannel channel = new EmbeddedChannel();
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(spy, 1000, channel);
channel.pipeline().addLast(HTTP_HANDLER, httpHandler);
channel.pipeline().remove(HTTP_HANDLER);
Mockito.verify(spy).unregisterKeepAliveHandler(httpHandler);
}
}

View File

@ -67,6 +67,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
protected static final Symbol GLOBAL = Symbol.getSymbol("global");
protected static final String BROKER_NAME = "localhost";
protected static final String NETTY_ACCEPTOR = "netty-acceptor";
protected String noprivUser = "noprivs";
protected String noprivPass = "noprivs";
@ -211,7 +212,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols());
HashMap<String, Object> amqpParams = new HashMap<>();
configureAMQPAcceptorParameters(amqpParams);
TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "netty-acceptor", amqpParams);
TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams);
configureAMQPAcceptorParameters(tc);
return tc;
}
@ -233,7 +234,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals("netty-acceptor")) {
if (tc.getName().equals(NETTY_ACCEPTOR)) {
tc.getExtraParams().put("anycastPrefix", "anycast://");
tc.getExtraParams().put("multicastPrefix", "multicast://");
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
/**
* Test connections can be established to remote peers via WebSockets
*/
public class WebSocketConnectionTest extends JMSClientTestSupport {
@Override
public boolean isUseWebSockets() {
return true;
}
@Test
public void testSingleKeepAliveIsReleasedWhenWebSocketUpgradeHappens() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
produceAndConsumeInNewConnection(factory);
assertKeepAliveCounterIsZero();
}
@Test
public void testMultipleKeepAliveAreReleasedWhenWebSocketUpgradeHappens() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
produceAndConsumeInNewConnection(factory);
produceAndConsumeInNewConnection(factory);
produceAndConsumeInNewConnection(factory);
produceAndConsumeInNewConnection(factory);
produceAndConsumeInNewConnection(factory);
assertKeepAliveCounterIsZero();
}
private void produceAndConsumeInNewConnection(JmsConnectionFactory factory) throws JMSException {
JmsConnection connection = (JmsConnection) factory.createConnection();
try {
Session session = connection.createSession();
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
producer.close();
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
assertNotNull(message);
} finally {
connection.close();
}
}
private void assertKeepAliveCounterIsZero() {
NettyAcceptor nettyAcceptor = (NettyAcceptor) server.getRemotingService().getAcceptor(NETTY_ACCEPTOR);
int httpAcceptorHandlerCount = nettyAcceptor.getProtocolHandler().getHttpKeepAliveRunnable().getHandlers().size();
Assert.assertEquals(0, httpAcceptorHandlerCount);
}
}