This commit is contained in:
Howard Gao 2020-02-13 11:10:36 +08:00
commit 05b9bf6a8f
4 changed files with 39 additions and 14 deletions

View File

@ -290,6 +290,8 @@ public class TransportConstants {
public static final String QUIET_PERIOD = "quietPeriod";
public static final String DISABLE_STOMP_SERVER_HEADER = "disableStompServerHeader";
/** We let this to be defined as a System Variable, as we need a different timeout over our testsuite.
* When running on a real server, this is the default we want.
* When running on a test suite, we need it to be 0, You should see a property on the main pom.xml.
@ -371,6 +373,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.TRUST_MANAGER_FACTORY_PLUGIN_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.SHUTDOWN_TIMEOUT);
allowableAcceptorKeys.add(TransportConstants.QUIET_PERIOD);
allowableAcceptorKeys.add(TransportConstants.DISABLE_STOMP_SERVER_HEADER);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);

View File

@ -85,7 +85,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
// server
response.addHeader(Stomp.Headers.Connected.SERVER, connection.getActiveMQServerName());
Object disableServerHeader = connection.getAcceptorUsed().getConfiguration().get(TransportConstants.DISABLE_STOMP_SERVER_HEADER);
if (disableServerHeader == null || !Boolean.parseBoolean(disableServerHeader.toString())) {
response.addHeader(Stomp.Headers.Connected.SERVER, connection.getActiveMQServerName());
}
if (requestID != null) {
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);

View File

@ -31,10 +31,8 @@ import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -42,12 +40,9 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -176,21 +171,17 @@ public abstract class StompTestBase extends ActiveMQTestBase {
* @throws Exception
*/
protected ActiveMQServer createServer() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "," + MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME);
params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
String stompAcceptorURI = "tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_STOMP_PORT + "?" + TransportConstants.STOMP_CONSUMERS_CREDIT + "=-1";
if (isEnableStompMessageId()) {
params.put(TransportConstants.STOMP_ENABLE_MESSAGE_ID, true);
stompAcceptorURI += ";" + TransportConstants.STOMP_ENABLE_MESSAGE_ID + "=true";
}
if (getStompMinLargeMessageSize() != null) {
params.put(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, 2048);
stompAcceptorURI += ";" + TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE + "=2048";
}
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled())
.setPersistenceEnabled(isPersistenceEnabled())
.addAcceptorConfiguration(stompTransport)
.addAcceptorConfiguration("stomp", stompAcceptorURI)
.addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()))
.setConnectionTtlCheckInterval(500)
.addQueueConfiguration(new CoreQueueConfiguration().setAddress(getQueueName()).setName(getQueueName()).setRoutingType(RoutingType.ANYCAST))

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@ -237,6 +238,33 @@ public class StompV11Test extends StompTestBase {
}
@Test
public void testServerFrame() throws Exception {
{ // the default case
ClientStompFrame frame = conn.connect(defUser, defPass);
conn.disconnect();
assertTrue(frame.getHeader(Stomp.Headers.Connected.SERVER) != null);
server.getRemotingService().destroyAcceptor("stomp");
}
{ // explicitly set disableStompServerHeader=false
server.getRemotingService().createAcceptor("stomp", "tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_STOMP_PORT + "?" + TransportConstants.DISABLE_STOMP_SERVER_HEADER + "=false").start();
conn = StompClientConnectionFactory.createClientConnection(uri);
ClientStompFrame frame = conn.connect(defUser, defPass);
conn.disconnect();
assertTrue(frame.getHeader(Stomp.Headers.Connected.SERVER) != null);
server.getRemotingService().destroyAcceptor("stomp");
}
{ // explicitly set disableStompServerHeader=true
server.getRemotingService().createAcceptor("stomp", "tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_STOMP_PORT + "?" + TransportConstants.DISABLE_STOMP_SERVER_HEADER + "=true").start();
conn = StompClientConnectionFactory.createClientConnection(uri);
ClientStompFrame frame = conn.connect(defUser, defPass);
conn.disconnect();
assertTrue(frame.getHeader(Stomp.Headers.Connected.SERVER) == null);
}
}
@Test
public void testSendAndReceive() throws Exception {
conn.connect(defUser, defPass);