Move STOMP unit tests into the STOMP module and clean them up:

All tests now use a common StompTestSupport base class to remove cut and paste test code.
All tests are now using JUnit 4 tests.
All tests are run with AutoFailSupport on so they won't hang.
All tests use dynamic port assignment so they shouldn't clash with others.

Cleaned up the pom file

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1408161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-11-12 00:15:50 +00:00
parent fdd0534fcb
commit 1236bbf6f4
39 changed files with 1487 additions and 1276 deletions

View File

@ -1,77 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
<property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
</bean>
<bean id="springContext" class="org.apache.activemq.spring.SpringBrokerContext" />
<broker useJmx="true" schedulerSupport="true" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true" brokerContext="#springContext">
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager"
groups="users,admins"/>
<authenticationUser username="user" password="password"
groups="users"/>
<authenticationUser username="guest" password="password" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
<transportConnectors>
<transportConnector name="stomp+nio" uri="stomp+nio://localhost:61612"/>
</transportConnectors>
</broker>
</beans>

View File

@ -1,79 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
<property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
</bean>
<bean id="springContext" class="org.apache.activemq.spring.SpringBrokerContext" />
<broker start="false" useJmx="true" schedulerSupport="true" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true" brokerContext="#springContext">
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager"
groups="users,admins"/>
<authenticationUser username="user" password="password"
groups="users"/>
<authenticationUser username="guest" password="password" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
<transportConnectors>
<transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>
<transportConnector name="stomp+nio+ssl" uri="stomp+nio+ssl://localhost:61613"/>
</transportConnectors>
</broker>
</beans>

View File

@ -1,78 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
<property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
</bean>
<bean id="springContext" class="org.apache.activemq.spring.SpringBrokerContext" />
<!-- lets create an embedded ActiveMQ Broker -->
<amq:broker useJmx="true" schedulerSupport="true" persistent="false" start="false" brokerContext="#springContext">
<amq:plugins>
<amq:jaasCertificateAuthenticationPlugin configuration="cert-login"/>
<!-- lets configure a destination based authorization mechanism -->
<amq:authorizationPlugin>
<amq:map>
<amq:authorizationMap>
<amq:authorizationEntries>
<amq:authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<amq:authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<amq:authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<amq:authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<amq:authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<amq:authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<amq:authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</amq:authorizationEntries>
<!-- let's assign roles to temporary destinations. comment this entry if we don't want any roles assigned to temp destinations -->
<amq:tempDestinationAuthorizationEntry>
<amq:tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
</amq:tempDestinationAuthorizationEntry>
</amq:authorizationMap>
</amq:map>
</amq:authorizationPlugin>
</amq:plugins>
<amq:sslContext>
<amq:sslContext
keyStore="server.keystore" keyStorePassword="password"
trustStore="client.keystore" trustStorePassword="password"/>
</amq:sslContext>
<amq:transportConnectors>
<amq:transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612?needClientAuth=true"/>
<amq:transportConnector name="ssl" uri="ssl://localhost:61617?needClientAuth=true"/>
</amq:transportConnectors>
</amq:broker>
</beans>

View File

@ -1,76 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
<property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
</bean>
<bean id="springContext" class="org.apache.activemq.spring.SpringBrokerContext" />
<broker useJmx="true" schedulerSupport="true" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true" brokerContext="#springContext">
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager"
groups="users,admins"/>
<authenticationUser username="user" password="password"
groups="users"/>
<authenticationUser username="guest" password="password" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
<transportConnectors>
<transportConnector name="stomp" uri="stomp://localhost:61613?trace=true&amp;transport.keepAlive=true&amp;transport.soLinger=0"/>
</transportConnectors>
</broker>
</beans>

View File

@ -38,60 +38,15 @@
<!-- =============================== -->
<!-- Required Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
<optional>false</optional>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<artifactId>activemq-broker</artifactId>
</dependency>
<!-- =============================== -->
<!-- Optional Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jaas</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.0.1B_spec</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-annotation_1.0_spec</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jacc_1.1_spec</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
@ -103,34 +58,26 @@
<optional>true</optional>
</dependency>
<!-- for XML parsing -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>xalan</groupId>
<artifactId>xalan</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<!-- =============================== -->
<!-- Testing Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-jaas</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@ -165,36 +112,6 @@
</resource>
</resources>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only.
It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
<versionRange>[0.0.0,)</versionRange>
<goals>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore />
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
@ -202,7 +119,7 @@
<forkMode>always</forkMode>
<argLine>${surefire.argLine}</argLine>
<runOrder>alphabetical</runOrder>
<failIfNoTests>false</failIfNoTests>
<systemProperties>
<property>
<name>org.apache.activemq.default.directory.prefix</name>
@ -225,17 +142,6 @@
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>

View File

@ -37,7 +37,6 @@ public class SamplePojo implements Serializable {
this.city = city;
}
public String getCity() {
return city;
}

View File

@ -18,16 +18,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class Stomp11NIOSSLTest extends Stomp11Test {
protected void setUp() throws Exception {
bindAddress = "stomp+nio+ssl://localhost:61613";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -37,8 +37,15 @@ public class Stomp11NIOSSLTest extends Stomp11Test {
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
return factory.createSocket("127.0.0.1", this.nioSslPort);
}
}

View File

@ -16,12 +16,21 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
public class Stomp11NIOTest extends Stomp11Test {
@Override
protected void setUp() throws Exception {
bindAddress = "stomp+nio://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
super.setUp();
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.nioPort);
}
}

View File

@ -18,26 +18,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
/**
*
*/
public class Stomp11SslAuthTest extends Stomp11Test {
protected void setUp() throws Exception {
// Test mutual authentication on both stomp and standard ssl transports
bindAddress = "stomp+ssl://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml";
jmsUri="ssl://localhost:61617";
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -48,9 +38,23 @@ public class Stomp11SslAuthTest extends Stomp11Test {
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
@Override
protected void addOpenWireConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"ssl://0.0.0.0:0?needClientAuth=true");
jmsUri = connector.getPublishableConnectString();
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
sslPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", this.sslPort);
}
}

View File

@ -16,13 +16,13 @@
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -34,78 +34,42 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Stomp11Test extends CombinationTestSupport {
public class Stomp11Test extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class);
protected String bindAddress = "stomp://localhost:61613";
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
private BrokerService broker;
private StompConnection stompConnection = new StompConnection();
private Connection connection;
private Session session;
private ActiveMQQueue queue;
@Override
protected void setUp() throws Exception {
public void setUp() throws Exception {
broker = BrokerFactory.createBroker(new URI(confUri));
broker.start();
broker.waitUntilStarted();
super.setUp();
stompConnect();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri);
connection = cf.createConnection("system", "manager");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getQueueName());
connection.start();
}
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri));
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
@Override
protected void tearDown() throws Exception {
try {
stompDisconnect();
} catch(Exception e) {
// Some tests explicitly disconnect from stomp so can ignore
} finally {
broker.stop();
broker.waitUntilStopped();
}
}
private void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
}
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
@Test
public void testConnect() throws Exception {
String connectFrame = "STOMP\n" +
@ -129,6 +93,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testConnectedNeverEncoded() throws Exception {
String connectFrame = "STOMP\n" +
@ -158,6 +123,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testConnectWithVersionOptions() throws Exception {
String connectFrame = "STOMP\n" +
@ -179,6 +145,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testConnectWithValidFallback() throws Exception {
String connectFrame = "STOMP\n" +
@ -200,6 +167,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testConnectWithInvalidFallback() throws Exception {
String connectFrame = "STOMP\n" +
@ -218,6 +186,7 @@ public class Stomp11Test extends CombinationTestSupport {
assertTrue(f.indexOf("message:") >= 0);
}
@Test
public void testHeartbeats() throws Exception {
String connectFrame = "STOMP\n" +
@ -261,6 +230,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testHeartbeatsDropsIdleConnection() throws Exception {
String connectFrame = "STOMP\n" +
@ -292,6 +262,7 @@ public class Stomp11Test extends CombinationTestSupport {
assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
}
@Test
public void testHeartbeatsKeepsConnectionOpen() throws Exception {
String connectFrame = "STOMP\n" +
@ -341,6 +312,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSendAfterMissingHeartbeat() throws Exception {
String connectFrame = "STOMP\n" + "login:system\n" +
@ -371,6 +343,7 @@ public class Stomp11Test extends CombinationTestSupport {
}
}
@Test
public void testRejectInvalidHeartbeats1() throws Exception {
String connectFrame = "STOMP\n" +
@ -390,6 +363,7 @@ public class Stomp11Test extends CombinationTestSupport {
assertTrue(f.indexOf("message:") >= 0);
}
@Test
public void testRejectInvalidHeartbeats2() throws Exception {
String connectFrame = "STOMP\n" +
@ -409,6 +383,7 @@ public class Stomp11Test extends CombinationTestSupport {
assertTrue(f.indexOf("message:") >= 0);
}
@Test
public void testRejectInvalidHeartbeats3() throws Exception {
String connectFrame = "STOMP\n" +
@ -428,6 +403,7 @@ public class Stomp11Test extends CombinationTestSupport {
assertTrue(f.indexOf("message:") >= 0);
}
@Test
public void testSubscribeAndUnsubscribe() throws Exception {
String connectFrame = "STOMP\n" +
@ -473,6 +449,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
@ -499,6 +476,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testUnsubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
@ -530,6 +508,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testAckMessageWithId() throws Exception {
String connectFrame = "STOMP\n" +
@ -568,6 +547,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testAckMessageWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
@ -609,6 +589,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testQueueBrowerSubscription() throws Exception {
final int MSG_COUNT = 10;
@ -675,6 +656,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSendMessageWithStandardHeadersEncoded() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -708,6 +690,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSendMessageWithRepeatedEntries() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -737,6 +720,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithMessageSentWithEncodedProperties() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "accept-version:1.1" + "\n\n" + Stomp.NULL;
@ -765,6 +749,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testNackMessage() throws Exception {
String connectFrame = "STOMP\n" +
@ -820,6 +805,7 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testHeaderValuesAreNotWSTrimmed() throws Exception {
stompConnection.setVersion(Stomp.V1_1);
String connectFrame = "STOMP\n" +
@ -862,13 +848,14 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testDurableSubAndUnSubOnTwoTopics() throws Exception {
stompConnection.setVersion(Stomp.V1_1);
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
String connectFrame = "STOMP\n" +
"login:system\n" + "passcode:manager\n" + "accept-version:1.1\n" +
@ -951,13 +938,14 @@ public class Stomp11Test extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testMultipleDurableSubsWithOfflineMessages() throws Exception {
stompConnection.setVersion(Stomp.V1_1);
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
"accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;

View File

@ -18,16 +18,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class Stomp12NIOSSLTest extends Stomp12Test {
protected void setUp() throws Exception {
bindAddress = "stomp+nio+ssl://localhost:61613";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -37,8 +37,15 @@ public class Stomp12NIOSSLTest extends Stomp12Test {
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
return factory.createSocket("127.0.0.1", this.nioSslPort);
}
}

View File

@ -16,12 +16,21 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
public class Stomp12NIOTest extends Stomp12Test {
@Override
protected void setUp() throws Exception {
bindAddress = "stomp+nio://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
super.setUp();
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.nioPort);
}
}

View File

@ -18,26 +18,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
/**
*
*/
public class Stomp12SslAuthTest extends Stomp12Test {
protected void setUp() throws Exception {
// Test mutual authentication on both stomp and standard ssl transports
bindAddress = "stomp+ssl://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml";
jmsUri="ssl://localhost:61617";
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -48,9 +38,23 @@ public class Stomp12SslAuthTest extends Stomp12Test {
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
@Override
protected void addOpenWireConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"ssl://0.0.0.0:0?needClientAuth=true");
jmsUri = connector.getPublishableConnectString();
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
sslPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", this.sslPort);
}
}

View File

@ -16,78 +16,50 @@
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Stomp12Test extends CombinationTestSupport {
public class Stomp12Test extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(Stomp12Test.class);
protected String bindAddress = "stomp://localhost:61613";
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
private BrokerService broker;
private StompConnection stompConnection = new StompConnection();
private Connection connection;
@Override
protected void setUp() throws Exception {
public void setUp() throws Exception {
broker = BrokerFactory.createBroker(new URI(confUri));
broker.start();
broker.waitUntilStarted();
super.setUp();
stompConnect();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri);
connection = cf.createConnection("system", "manager");
connection.start();
}
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri));
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
@Override
protected void tearDown() throws Exception {
try {
stompDisconnect();
} catch(Exception e) {
// Some tests explicitly disconnect from stomp so can ignore
} finally {
broker.stop();
broker.waitUntilStopped();
}
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.port);
}
private void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
}
@Override
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
@Test

View File

@ -17,41 +17,36 @@
package org.apache.activemq.transport.stomp;
import junit.framework.TestCase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.URISupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.File;
import java.net.Socket;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
public class StompAdvisoryTest extends TestCase {
public class StompAdvisoryTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompAdvisoryTest.class);
protected ConnectionFactory factory;
protected ActiveMQConnection connection;
protected BrokerService broker;
StompConnection stompConnection;
URI tcpBrokerUri;
URI stompBrokerUri;
@Override
protected void applyBrokerPolicies() throws Exception {
private PolicyEntry createPolicyEntry() {
PolicyEntry policy = new PolicyEntry();
policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
@ -61,75 +56,24 @@ public class StompAdvisoryTest extends TestCase {
policy.setAdvisoryWhenFull(true);
policy.setProducerFlowControl(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10);
policy.setPendingMessageLimitStrategy(strategy);
return policy;
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker://()/localhost?useJmx=false"));
broker.setPersistent(false);
PolicyEntry policy = new PolicyEntry();
policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true);
policy.setProducerFlowControl(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10);
policy.setPendingMessageLimitStrategy(strategy);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:0");
broker.addConnector("stomp://localhost:0");
return broker;
}
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("basedir") == null) {
File file = new File(".");
System.setProperty("basedir", file.getAbsolutePath());
}
broker = createBroker();
broker.start();
tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
LOG.info("Producing using TCP uri: " + tcpBrokerUri);
LOG.info("consuming using STOMP uri: " + stompBrokerUri);
stompConnection = new StompConnection();
stompConnection.open(new Socket("localhost", stompBrokerUri.getPort()));
}
protected void tearDown() throws Exception {
stompConnection.disconnect();
stompConnection.close();
broker.stop();
brokerService.setDestinationPolicy(pMap);
brokerService.setAdvisorySupport(true);
}
@Test
public void testConnectionAdvisory() throws Exception {
Destination dest = new ActiveMQQueue("testConnectionAdvisory");
stompConnection.connect("system", "manager");
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
Connection c = cf.createConnection("system", "manager");
c.start();
StompFrame f = stompConnection.receive();
@ -137,7 +81,6 @@ public class StompAdvisoryTest extends TestCase {
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
Map<String,String> headers = f.getHeaders();
c.stop();
c.close();
@ -150,10 +93,9 @@ public class StompAdvisoryTest extends TestCase {
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
}
@Test
public void testConnectionAdvisoryJSON() throws Exception {
Destination dest = new ActiveMQQueue("testConnectionAdvisory");
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_JSON.toString());
@ -162,8 +104,7 @@ public class StompAdvisoryTest extends TestCase {
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
Connection c = cf.createConnection("system", "manager");
c.start();
StompFrame f = stompConnection.receive();
@ -171,7 +112,6 @@ public class StompAdvisoryTest extends TestCase {
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
Map<String,String> headers = f.getHeaders();
c.stop();
c.close();
@ -184,10 +124,9 @@ public class StompAdvisoryTest extends TestCase {
assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
}
@Test
public void testConnectionAdvisoryXML() throws Exception {
Destination dest = new ActiveMQQueue("testConnectionAdvisory");
HashMap<String, String> subheaders = new HashMap<String, String>(1);
subheaders.put("transformation", Stomp.Transformations.JMS_XML.toString());
@ -196,8 +135,7 @@ public class StompAdvisoryTest extends TestCase {
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
Connection c = cf.createConnection("system", "manager");
c.start();
StompFrame f = stompConnection.receive();
@ -205,7 +143,6 @@ public class StompAdvisoryTest extends TestCase {
assertEquals(f.getAction(),"MESSAGE");
assertTrue("Should have a body", f.getBody().length() > 0);
assertTrue(f.getBody().startsWith("<ConnectionInfo>"));
Map<String,String> headers = f.getHeaders();
c.stop();
c.close();
@ -218,6 +155,7 @@ public class StompAdvisoryTest extends TestCase {
assertTrue(f.getBody().startsWith("<ConnectionInfo>"));
}
@Test
public void testConsumerAdvisory() throws Exception {
Destination dest = new ActiveMQQueue("testConsumerAdvisory");
@ -226,12 +164,12 @@ public class StompAdvisoryTest extends TestCase {
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Consumer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
Connection c = cf.createConnection("system", "manager");
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(dest);
assertNotNull(consumer);
StompFrame f = stompConnection.receive();
LOG.debug(f.toString());
@ -243,6 +181,7 @@ public class StompAdvisoryTest extends TestCase {
c.close();
}
@Test
public void testProducerAdvisory() throws Exception {
Destination dest = new ActiveMQQueue("testProducerAdvisory");
@ -251,8 +190,7 @@ public class StompAdvisoryTest extends TestCase {
stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
Connection c = cf.createConnection("system", "manager");
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -270,6 +208,7 @@ public class StompAdvisoryTest extends TestCase {
c.close();
}
@Test
public void testProducerAdvisoryXML() throws Exception {
Destination dest = new ActiveMQQueue("testProducerAdvisoryXML");
@ -282,8 +221,7 @@ public class StompAdvisoryTest extends TestCase {
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
Connection c = cf.createConnection("system", "manager");
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -301,6 +239,7 @@ public class StompAdvisoryTest extends TestCase {
c.close();
}
@Test
public void testProducerAdvisoryJSON() throws Exception {
Destination dest = new ActiveMQQueue("testProducerAdvisoryJSON");
@ -313,8 +252,7 @@ public class StompAdvisoryTest extends TestCase {
Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
// Now connect via openwire and check we get the advisory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
Connection c = factory.createConnection();
Connection c = cf.createConnection("system", "manager");
c.start();
Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -331,5 +269,4 @@ public class StompAdvisoryTest extends TestCase {
c.stop();
c.close();
}
}

View File

@ -16,20 +16,19 @@
*/
package org.apache.activemq.transport.stomp;
import java.util.HashMap;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import org.junit.Test;
public class StompFrameTest {
StompFrame underTest = new StompFrame();
@Test
public void testNoPasscodeInToString() throws Exception {
HashMap headers = new HashMap<String, String>();
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("userName", "bob");
headers.put("passcode", "please");
underTest.setHeaders(headers);

View File

@ -20,9 +20,6 @@ package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -34,30 +31,20 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompLoadTest {
public class StompLoadTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompLoadTest.class);
protected String bindAddress = "stomp://localhost:61613";
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
private static final int TASK_COUNT = 100;
private static final int MSG_COUNT = 250; // AMQ-3819: Above 250 or so and the CPU goes bonkers with NOI+SSL.
private BrokerService broker;
protected StompConnection stompConnection = new StompConnection();
protected Connection connection;
protected Session session;
protected ActiveMQQueue queue;
@ -67,26 +54,30 @@ public class StompLoadTest {
private CountDownLatch ready;
private AtomicInteger receiveCount;
@Before
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
@Override
public void setUp() throws Exception {
broker = BrokerFactory.createBroker(new URI(confUri));
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
broker.waitUntilStarted();
super.setUp();
stompConnect();
stompConnection.connect("system", "manager");
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri);
connection = cf.createConnection("system", "manager");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getDestinationName());
queue = new ActiveMQQueue(getTopicName());
connection.start();
executor = Executors.newFixedThreadPool(TASK_COUNT, new ThreadFactory() {
private long i = 0;
@Override
public Thread newThread(Runnable runnable) {
this.i++;
final Thread t = new Thread(runnable, "Test Worker " + this.i);
@ -99,16 +90,14 @@ public class StompLoadTest {
receiveCount = new AtomicInteger(0);
}
@After
@Override
public void tearDown() throws Exception {
try {
executor.shutdownNow();
connection.close();
stompDisconnect();
} catch(Exception e) {
} catch (Exception e) {
} finally {
broker.stop();
broker.waitUntilStopped();
super.tearDown();
}
}
@ -128,6 +117,7 @@ public class StompLoadTest {
StompConnection connection = new StompConnection();
try {
stompConnect(connection);
connection.connect("system", "manager");
} catch (Exception e) {
LOG.error("Caught Exception while connecting: " + e.getMessage());
}
@ -141,7 +131,7 @@ public class StompLoadTest {
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
connection.subscribe("/topic/" + getDestinationName(), "auto", headers);
connection.subscribe("/topic/" + getTopicName(), "auto", headers);
ready.await();
// Now that the main test thread is ready we wait a bit to let the tasks
@ -181,7 +171,7 @@ public class StompLoadTest {
for( int ix = 0; ix < MSG_COUNT; ix++) {
frame = "SEND\n" +
"destination:/topic/" + getDestinationName() +
"destination:/topic/" + getTopicName() +
"\nid:" + ix +
"\ncontent-length:5" + " \n\n" +
"\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
@ -206,33 +196,4 @@ public class StompLoadTest {
stompConnection.sendFrame(frame);
LOG.info("Test Finished.");
}
protected void stompConnect() throws Exception {
URI connectUri = new URI(bindAddress);
LOG.debug("Attempting connection to: " + bindAddress);
stompConnection.open(createSocket(connectUri));
stompConnection.connect("system", "manager");
}
private void stompConnect(StompConnection connection) throws Exception {
URI connectUri = new URI(bindAddress);
LOG.debug("Attempting connection to: " + bindAddress);
connection.open(createSocket(connectUri));
connection.connect("system", "manager");
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getDestinationName() {
return getClass().getName() + ".Tester";
}
protected void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
}
}
}

View File

@ -19,47 +19,30 @@ package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.HashMap;
import java.util.UUID;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.apache.activemq.broker.TransportConnector;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompMissingMessageTest {
public class StompMissingMessageTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompMissingMessageTest.class);
protected String bindAddress = "stomp://localhost:61613";
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
private BrokerService broker;
protected String destination;
@Before
@Override
public void setUp() throws Exception {
broker = BrokerFactory.createBroker(new URI(confUri));
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
broker.waitUntilStarted();
super.setUp();
destination = "/topic/" + getTopicName();
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
@Test
@ -79,7 +62,7 @@ public class StompMissingMessageTest {
public String doTestProducerConsumer(int index) throws Exception {
String message = null;
assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length);
assertEquals("Should not be any consumers", 0, brokerService.getAdminView().getTopicSubscribers().length);
StompConnection producer = stompConnect();
StompConnection consumer = stompConnect();
@ -122,7 +105,7 @@ public class StompMissingMessageTest {
public String doTestProducerDurableConsumer(int index) throws Exception {
String message = null;
assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length);
assertEquals("Should not be any consumers", 0, brokerService.getAdminView().getTopicSubscribers().length);
StompConnection producer = stompConnect();
StompConnection consumer = stompConnect("test");
@ -193,26 +176,21 @@ public class StompMissingMessageTest {
assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt);
}
@Override
protected StompConnection stompConnect() throws Exception {
return stompConnect(null);
StompConnection stompConnection = new StompConnection();
stompConnect(stompConnection);
stompConnection.connect("system", "manager", null);
return stompConnection;
}
protected StompConnection stompConnect(String clientId) throws Exception {
StompConnection stompConnection = new StompConnection();
URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri));
stompConnect(stompConnection);
stompConnection.connect("system", "manager", clientId);
return stompConnection;
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getTopicName() {
return getClass().getName() + ".Messages";
}
protected void stompDisconnect(StompConnection connection) throws Exception {
if (connection != null) {
String receiptId = UUID.randomUUID().toString();

View File

@ -17,22 +17,21 @@
package org.apache.activemq.transport.stomp;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
public class StompNIOLoadTest extends StompLoadTest {
@Before
@Override
public void setUp() throws Exception {
bindAddress = "stomp+nio://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
super.setUp();
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.nioPort);
}
}

View File

@ -19,21 +19,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.junit.After;
import org.junit.Before;
import org.apache.activemq.broker.TransportConnector;
public class StompNIOSSLLoadTest extends StompLoadTest {
@Before
@Override
public void setUp() throws Exception {
bindAddress = "stomp+nio+ssl://localhost:61613";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -43,15 +38,15 @@ public class StompNIOSSLLoadTest extends StompLoadTest {
super.setUp();
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket(URI connectUri) throws IOException {
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
return factory.createSocket("127.0.0.1", this.nioSslPort);
}
}

View File

@ -16,17 +16,18 @@
*/
package org.apache.activemq.transport.stomp;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.TransportConnector;
public class StompNIOSSLTest extends StompTest {
protected void setUp() throws Exception {
bindAddress = "stomp+nio+ssl://localhost:61613";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -37,13 +38,14 @@ public class StompNIOSSLTest extends StompTest {
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
}
protected Socket createSocket(URI connectUri) throws IOException {
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
return factory.createSocket("127.0.0.1", this.nioSslPort);
}
}

View File

@ -16,15 +16,21 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.broker.TransportConnector;
/**
*
*/
public class StompNIOTest extends StompTest {
protected void setUp() throws Exception {
bindAddress = "stomp+nio://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
super.setUp();
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.nioPort);
}
}

View File

@ -19,21 +19,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.junit.After;
import org.junit.Before;
import org.apache.activemq.broker.TransportConnector;
public class StompSSLLoadTest extends StompLoadTest {
@Before
@Override
public void setUp() throws Exception {
bindAddress = "stomp+ssl://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -43,15 +38,15 @@ public class StompSSLLoadTest extends StompLoadTest {
super.setUp();
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+sslPort);
sslPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket(URI connectUri) throws IOException {
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
return factory.createSocket("127.0.0.1", this.sslPort);
}
}

View File

@ -18,25 +18,18 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.security.JaasCertificateAuthenticationPlugin;
/**
*
*/
public class StompSslAuthTest extends StompTest {
protected void setUp() throws Exception {
// Test mutual authentication on both stomp and standard ssl transports
bindAddress = "stomp+ssl://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml";
jmsUri="ssl://localhost:61617";
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
@ -48,33 +41,63 @@ public class StompSslAuthTest extends StompTest {
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
@Override
protected BrokerPlugin configureAuthentication() throws Exception {
JaasCertificateAuthenticationPlugin plugin = new JaasCertificateAuthenticationPlugin();
plugin.setConfiguration("cert-login");
return plugin;
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
return factory.createSocket("127.0.0.1", this.sslPort);
}
@Override
protected void addOpenWireConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"ssl://0.0.0.0:0?needClientAuth=true");
jmsUri = connector.getPublishableConnectString();
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
sslPort = connector.getConnectUri().getPort();
}
// NOOP - These operations handled by jaas cert login module
@Override
public void testSubscribeWithReceiptNotAuthorized() throws Exception {
}
@Override
public void testConnectNotAuthenticatedWrongUser() throws Exception {
}
@Override
public void testConnectNotAuthenticatedWrongPassword() throws Exception {
}
@Override
public void testSendNotAuthorized() throws Exception {
}
@Override
public void testSubscribeNotAuthorized() throws Exception {
}
@Override
public void testJMSXUserIDIsSetInMessage() throws Exception {
}
@Override
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
}
@Override
public void testClientSetMessageIdIsIgnored() throws Exception {
}
}

View File

@ -18,19 +18,16 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
/**
*
*/
import org.apache.activemq.broker.TransportConnector;
public class StompSslTest extends StompTest {
protected void setUp() throws Exception {
bindAddress = "stomp+ssl://localhost:61612";
confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
@Override
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
@ -40,8 +37,15 @@ public class StompSslTest extends StompTest {
super.setUp();
}
protected Socket createSocket(URI connectUri) throws IOException {
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+sslPort);
sslPort = connector.getConnectUri().getPort();
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", connectUri.getPort());
return factory.createSocket("127.0.0.1", this.sslPort);
}
}

View File

@ -16,53 +16,36 @@
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class StompSubscriptionRemoveTest extends TestCase {
public class StompSubscriptionRemoveTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompSubscriptionRemoveTest.class);
private static final String COMMAND_MESSAGE = "MESSAGE";
private static final String HEADER_MESSAGE_ID = "message-id";
private StompConnection stompConnection = new StompConnection();
@Test
public void testRemoveSubscriber() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector("stomp://localhost:0").setName("Stomp");
broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
final String stompUri = broker.getConnectorByName("Stomp").getPublishableConnectString();
final int stompPort = new URI(stompUri).getPort();
final String openwireUri = broker.getConnectorByName("Default").getPublishableConnectString();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireUri);
Connection connection = factory.createConnection();
Connection connection = cf.createConnection("system", "manager");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
MessageProducer producer = session.createProducer(new ActiveMQQueue(getQueueName()));
Message message = session.createTextMessage("Testas");
for (int idx = 0; idx < 2000; ++idx) {
producer.send(message);
@ -72,13 +55,11 @@ public class StompSubscriptionRemoveTest extends TestCase {
session.close();
connection.close();
stompConnection.open(new Socket("localhost", stompPort));
String connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
String connectFrame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
stompConnection.receiveFrame();
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n" + Stomp.NULL;
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
int messagesCount = 0;
@ -99,14 +80,15 @@ public class StompSubscriptionRemoveTest extends TestCase {
Thread.sleep(1000);
stompConnection.close();
stompConnection.open(new Socket("localhost", stompPort));
stompDisconnect();
stompConnect();
connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
connectFrame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
stompConnection.receiveFrame();
frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n" + Stomp.NULL;
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
while (count != 2000) {
@ -126,7 +108,6 @@ public class StompSubscriptionRemoveTest extends TestCase {
stompConnection.sendFrame("DISCONNECT\n\n");
stompConnection.close();
broker.stop();
LOG.info("Total messages received: " + messagesCount);
assertTrue("Messages received after connection loss: " + messagesCount, messagesCount >= 2000);
@ -139,10 +120,6 @@ public class StompSubscriptionRemoveTest extends TestCase {
// Subscription without any connections
}
protected String getDestinationName() {
return getClass().getName() + "." + getName();
}
// These two methods could move to a utility class
protected String getCommand(String frame) {
return frame.substring(0, frame.indexOf('\n') + 1).trim();

View File

@ -16,40 +16,30 @@
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompTelnetTest extends CombinationTestSupport {
public class StompTelnetTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompTelnetTest.class);
private BrokerService broker;
@Override
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector("stomp://localhost:0");
broker.addConnector("stomp+nio://localhost:0");
broker.start();
broker.waitUntilStarted();
}
@Test
public void testCRLF() throws Exception {
for (TransportConnector connector : broker.getTransportConnectors()) {
for (TransportConnector connector : brokerService.getTransportConnectors()) {
LOG.info("try: " + connector.getConnectUri());
int port = connector.getConnectUri().getPort();
StompConnection stompConnection = new StompConnection();
stompConnection.open(createSocket(connector.getConnectUri()));
stompConnection.open(createSocket(port));
String frame = "CONNECT\r\n\r\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -60,13 +50,15 @@ public class StompTelnetTest extends CombinationTestSupport {
}
}
@Test
public void testCRLF11() throws Exception {
for (TransportConnector connector : broker.getTransportConnectors()) {
for (TransportConnector connector : brokerService.getTransportConnectors()) {
LOG.info("try: " + connector.getConnectUri());
int port = connector.getConnectUri().getPort();
StompConnection stompConnection = new StompConnection();
stompConnection.open(createSocket(connector.getConnectUri()));
stompConnection.open(createSocket(port));
String frame = "CONNECT\r\naccept-version:1.1\r\n\r\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -77,18 +69,30 @@ public class StompTelnetTest extends CombinationTestSupport {
}
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
@Override
protected BrokerPlugin configureAuthentication() throws Exception {
return null;
}
@Override
protected void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
protected BrokerPlugin configureAuthorization() throws Exception {
return null;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
}
protected Socket createSocket(int port) throws IOException {
return new Socket("127.0.0.1", port);
}
@Override
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
}

View File

@ -16,12 +16,14 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@ -34,6 +36,7 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@ -42,27 +45,19 @@ import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompTest extends CombinationTestSupport {
public class StompTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
protected String bindAddress = "stomp://localhost:61613";
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
private BrokerService broker;
protected StompConnection stompConnection = new StompConnection();
protected Connection connection;
protected Session session;
protected ActiveMQQueue queue;
@ -95,7 +90,7 @@ public class StompTest extends CombinationTestSupport {
+ "}}";
@Override
protected void setUp() throws Exception {
public void setUp() throws Exception {
// The order of the entries is different when using ibm jdk 5.
if (System.getProperty("java.vendor").equals("IBM Corporation")
&& System.getProperty("java.version").startsWith("1.5")) {
@ -116,56 +111,32 @@ public class StompTest extends CombinationTestSupport {
+ "]"
+ "}}";
}
broker = BrokerFactory.createBroker(new URI(confUri));
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
broker.waitUntilStarted();
super.setUp();
stompConnect();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri);
connection = cf.createConnection("system", "manager");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getQueueName());
connection.start();
}
protected void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri));
}
private void stompConnect(StompConnection connection) throws IOException, URISyntaxException, UnknownHostException {
URI connectUri = new URI(bindAddress);
connection.open(createSocket(connectUri));
}
protected Socket createSocket(URI connectUri) throws IOException {
return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
@Override
protected void tearDown() throws Exception {
public void tearDown() throws Exception {
try {
connection.close();
stompDisconnect();
} catch(Exception e) {
// Some tests explicitly disconnect from stomp so can ignore
} finally {
broker.stop();
broker.waitUntilStopped();
super.tearDown();
}
}
protected void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
}
@Override
protected void addStompConnector() throws Exception {
TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
public void sendMessage(String msg) throws Exception {
@ -186,6 +157,7 @@ public class StompTest extends CombinationTestSupport {
producer.send(message);
}
@Test
public void testConnect() throws Exception {
String connectFrame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "request-id:1\n" + "\n" + Stomp.NULL;
@ -196,6 +168,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(f.indexOf("response-id:1") >= 0);
}
@Test
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -221,6 +194,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(Math.abs(tnow - tmsg) < 1000);
}
@Test
public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -240,6 +214,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("TEST", ((ActiveMQTextMessage)message).getGroupID());
}
@Test
public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
@ -261,6 +236,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("bar", "123", message.getStringProperty("bar"));
}
@Test
public void testSendMessageWithDelay() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -281,6 +257,7 @@ public class StompTest extends CombinationTestSupport {
assertNotNull(message);
}
@Test
public void testSendMessageWithStandardHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -310,6 +287,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("GroupID", "abc", amqMessage.getGroupID());
}
@Test
public void testSendMessageWithNoPriorityReceivesDefault() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -331,11 +309,11 @@ public class StompTest extends CombinationTestSupport {
assertEquals("getJMSPriority", 4, message.getJMSPriority());
}
@Test
public void testReceipts() throws Exception {
StompConnection receiver = new StompConnection();
URI connectUri = new URI(bindAddress);
receiver.open(createSocket(connectUri));
receiver.open(createSocket());
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
@ -382,16 +360,15 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscriptionReceipts() throws Exception {
final int done = 500;
int count = 0;
int receiptId = 0;
URI connectUri = new URI(bindAddress);
do {
StompConnection sender = new StompConnection();
sender.open(createSocket(connectUri));
sender.open(createSocket());
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
sender.sendFrame(frame);
@ -406,7 +383,7 @@ public class StompTest extends CombinationTestSupport {
sender.disconnect();
StompConnection receiver = new StompConnection();
receiver.open(createSocket(connectUri));
receiver.open(createSocket());
frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
@ -433,6 +410,7 @@ public class StompTest extends CombinationTestSupport {
} while (count < done);
}
@Test
public void testSubscribeWithAutoAck() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -444,7 +422,7 @@ public class StompTest extends CombinationTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
sendMessage(getName());
sendMessage(name.getMethodName());
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
@ -453,6 +431,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -482,6 +461,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testBytesMessageWithNulls() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -508,6 +488,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSendMultipleBytesMessages() throws Exception {
final int MSG_COUNT = 50;
@ -540,6 +521,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithMessageSentWithProperties() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -570,6 +552,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testMessagesAreInOrder() throws Exception {
int ctr = 10;
String[] data = new String[ctr];
@ -610,6 +593,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithAutoAckAndSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -632,6 +616,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithAutoAckAndNumericSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -659,6 +644,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithAutoAckAndBooleanSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -686,6 +672,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithAutoAckAnFloatSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -713,6 +700,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSubscribeWithClientAck() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -737,6 +725,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(message.getJMSRedelivered());
}
@Test
public void testSubscribeWithClientAckedAndContentLength() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -780,6 +769,7 @@ public class StompTest extends CombinationTestSupport {
assertNull(message);
}
@Test
public void testUnsubscribe() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -815,6 +805,7 @@ public class StompTest extends CombinationTestSupport {
}
}
@Test
public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -837,6 +828,7 @@ public class StompTest extends CombinationTestSupport {
assertNotNull("Should have received a message", message);
}
@Test
public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -871,6 +863,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("second message", message.getText().trim());
}
@Test
public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
assertClients(1);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -885,6 +878,7 @@ public class StompTest extends CombinationTestSupport {
assertClients(1);
}
@Test
public void testConnectNotAuthenticatedWrongUser() throws Exception {
String frame = "CONNECT\n" + "login: dejanb\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -895,6 +889,7 @@ public class StompTest extends CombinationTestSupport {
assertClients(1);
}
@Test
public void testConnectNotAuthenticatedWrongPassword() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode: dejanb\n\n" + Stomp.NULL;
@ -906,6 +901,7 @@ public class StompTest extends CombinationTestSupport {
assertClients(1);
}
@Test
public void testSendNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
@ -921,6 +917,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(f.startsWith("ERROR"));
}
@Test
public void testSubscribeNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
@ -936,6 +933,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(frame.startsWith("ERROR"));
}
@Test
public void testSubscribeWithReceiptNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
@ -953,6 +951,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
}
@Test
public void testSubscribeWithInvalidSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -971,6 +970,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationUnknownTranslator() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -989,6 +989,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("Hello World", message.getText());
}
@Test
public void testTransformationFailed() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1008,6 +1009,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("Hello World", message.getText());
}
@Test
public void testTransformationSendXMLObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1021,12 +1023,18 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
ObjectMessage message = (ObjectMessage)consumer.receive(2500);
Message message = consumer.receive(2500);
assertNotNull(message);
SamplePojo object = (SamplePojo)message.getObject();
LOG.info("Broke sent: {}", message);
assertTrue(message instanceof ObjectMessage);
ObjectMessage objectMessage = (ObjectMessage)message;
SamplePojo object = (SamplePojo)objectMessage.getObject();
assertEquals("Dejan", object.getName());
}
@Test
public void testTransformationSendJSONObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1046,6 +1054,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("Dejan", object.getName());
}
@Test
public void testTransformationSubscribeXML() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1069,6 +1078,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationReceiveJSONObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1091,6 +1101,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationReceiveXMLObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1114,6 +1125,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationReceiveObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1137,6 +1149,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationReceiveXMLObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1168,6 +1181,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationReceiveJSONObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1199,6 +1213,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationSendAndReceiveXmlMap() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1221,6 +1236,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(frame.contains("jms-map-xml"));
}
@Test
public void testTransformationSendAndReceiveJsonMap() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1243,6 +1259,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(frame.contains("jms-map-json"));
}
@Test
public void testTransformationReceiveBytesMessage() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1273,6 +1290,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationNotOverrideSubscription() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1296,6 +1314,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationIgnoreTransformation() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
@ -1319,6 +1338,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationSendXMLMap() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1337,6 +1357,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals(message.getString("name"), "Dejan");
}
@Test
public void testTransformationSendJSONMap() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1355,6 +1376,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals(message.getString("name"), "Dejan");
}
@Test
public void testTransformationReceiveXMLMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1380,6 +1402,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransformationReceiveJSONMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
@ -1405,13 +1428,15 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testDurableUnsub() throws Exception {
// get broker JMX view
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
BrokerViewMBean view = (BrokerViewMBean)
brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
// connect
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\nclient-id:test\n\n" + Stomp.NULL;
@ -1457,13 +1482,15 @@ public class StompTest extends CombinationTestSupport {
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
}
@Test
public void testDurableSubAttemptOnQueueFails() throws Exception {
// get broker JMX view
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
BrokerViewMBean view = (BrokerViewMBean)
brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
// connect
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\nclient-id:test\n\n" + Stomp.NULL;
@ -1486,6 +1513,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testMessageIdHeader() throws Exception {
stompConnection.connect("system", "manager");
@ -1498,6 +1526,7 @@ public class StompTest extends CombinationTestSupport {
assertNull(stompMessage.getHeaders().get("transaction"));
}
@Test
public void testPrefetchSizeOfOneClientAck() throws Exception {
stompConnection.connect("system", "manager");
@ -1577,6 +1606,7 @@ public class StompTest extends CombinationTestSupport {
} catch (SocketTimeoutException soe) {}
}
@Test
public void testPrefetchSize() throws Exception {
stompConnection.connect("system", "manager");
@ -1636,6 +1666,7 @@ public class StompTest extends CombinationTestSupport {
stompDisconnect();
}
@Test
public void testTransactionsWithMultipleDestinations() throws Exception {
stompConnection.connect("system", "manager");
@ -1664,6 +1695,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.disconnect();
}
@Test
public void testTempDestination() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1682,6 +1714,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("Hello World", message.getBody());
}
@Test
public void testJMSXUserIDIsSetInMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -1701,6 +1734,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID));
}
@Test
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1719,6 +1753,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("system", message.getHeaders().get(Stomp.Headers.Message.USERID));
}
@Test
public void testClientSetMessageIdIsIgnored() throws Exception {
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Stomp.Headers.Message.MESSAGE_ID, "Thisisnotallowed");
@ -1745,6 +1780,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));
}
@Test
public void testExpire() throws Exception {
stompConnection.connect("system", "manager");
@ -1761,6 +1797,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.ORIGINAL_DESTINATION), "/queue/" + getQueueName());
}
@Test
public void testDefaultJMSReplyToDest() throws Exception {
stompConnection.connect("system", "manager");
@ -1776,6 +1813,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals("" + stompMessage, stompMessage.getHeaders().get(Stomp.Headers.Send.REPLY_TO), "JustAString");
}
@Test
public void testPersistent() throws Exception {
stompConnection.connect("system", "manager");
@ -1792,6 +1830,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT), "true");
}
@Test
public void testPersistentDefaultValue() throws Exception {
stompConnection.connect("system", "manager");
@ -1806,6 +1845,7 @@ public class StompTest extends CombinationTestSupport {
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
}
@Test
public void testReceiptNewQueue() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1842,6 +1882,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testTransactedClientAckBrokerStats() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -1882,6 +1923,7 @@ public class StompTest extends CombinationTestSupport {
assertEquals(0, queueView.getQueueSize());
}
@Test
public void testReplytoModification() throws Exception {
String replyto = "some destination";
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1903,6 +1945,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame("DISCONNECT\n" + "\n\n" + Stomp.NULL);
}
@Test
public void testReplyToDestinationNaming() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -1915,6 +1958,7 @@ public class StompTest extends CombinationTestSupport {
doTestActiveMQReplyToTempDestination("queue");
}
@Test
public void testSendNullBodyTextMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@ -1975,6 +2019,7 @@ public class StompTest extends CombinationTestSupport {
}
}
@Test
public void testReplyToAcrossConnections() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -2048,7 +2093,7 @@ public class StompTest extends CombinationTestSupport {
private BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
ObjectName brokerViewMBean = new ObjectName(
"org.apache.activemq:Type=Broker,BrokerName=localhost");
BrokerViewMBean proxy = (BrokerViewMBean) broker.getManagementContext()
BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext()
.newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
return proxy;
}
@ -2057,7 +2102,7 @@ public class StompTest extends CombinationTestSupport {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+ ":Type=Queue,Destination=" + name
+ ",BrokerName=localhost");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
@ -2067,15 +2112,16 @@ public class StompTest extends CombinationTestSupport {
{
@Override
public boolean isSatisified() throws Exception {
return broker.getBroker().getClients().length == expected;
return brokerService.getBroker().getClients().length == expected;
}
});
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
org.apache.activemq.broker.Connection[] clients = brokerService.getBroker().getClients();
int actual = clients.length;
assertEquals("Number of clients", expected, actual);
}
@Test
public void testDisconnectDoesNotDeadlockBroker() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestConnectionLeak();
@ -2133,6 +2179,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.close();
}
@Test
public void testHeaderValuesAreTrimmed1_0() throws Exception {
String connectFrame = "CONNECT\n" +
@ -2175,6 +2222,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
@Test
public void testSendReceiveBigMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;

View File

@ -0,0 +1,285 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.security.ProtectionDomain;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationMap;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.transport.stomp.util.ResourceLoadingSslContext;
import org.apache.activemq.transport.stomp.util.XStreamBrokerContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
public class StompTestSupport {
protected final AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
protected BrokerService brokerService;
protected int port;
protected int sslPort;
protected int nioPort;
protected int nioSslPort;
protected String jmsUri = "vm://localhost";
protected StompConnection stompConnection = new StompConnection();
protected ActiveMQConnectionFactory cf;
@Rule public TestName name = new TestName();
public File basedir() throws IOException {
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
public static void main(String[] args) throws Exception {
final StompTestSupport s = new StompTestSupport();
s.sslPort = 5675;
s.port = 5676;
s.nioPort = 5677;
s.nioSslPort = 5678;
s.startBroker();
while(true) {
Thread.sleep(100000);
}
}
public String getName() {
return name.getMethodName();
}
@Before
public void setUp() throws Exception {
autoFailTestSupport.startAutoFailThread();
startBroker();
stompConnect();
}
@After
public void tearDown() throws Exception {
autoFailTestSupport.stopAutoFailThread();
try {
stompDisconnect();
} catch (Exception ex) {
// its okay if the stomp connection is already closed.
} finally {
stopBroker();
}
}
public void startBroker() throws Exception {
createBroker();
XStreamBrokerContext context = new XStreamBrokerContext();
brokerService.setBrokerContext(context);
applyBrokerPolicies();
applyMemoryLimitPolicy();
// Setup SSL context...
File keyStore = new File(basedir(), "src/test/resources/server.keystore");
File trustStore = new File(basedir(), "src/test/resources/client.keystore");
final ResourceLoadingSslContext sslContext = new ResourceLoadingSslContext();
sslContext.setKeyStore(keyStore.getCanonicalPath());
sslContext.setKeyStorePassword("password");
sslContext.setTrustStore(trustStore.getCanonicalPath());
sslContext.setTrustStorePassword("password");
sslContext.afterPropertiesSet();
brokerService.setSslContext(sslContext);
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
addStompConnector();
addOpenWireConnector();
cf = new ActiveMQConnectionFactory(jmsUri);
BrokerPlugin authenticationPlugin = configureAuthentication();
if (authenticationPlugin != null) {
plugins.add(configureAuthorization());
}
BrokerPlugin authorizationPlugin = configureAuthorization();
if (authorizationPlugin != null) {
plugins.add(configureAuthentication());
}
if (!plugins.isEmpty()) {
BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
brokerService.setPlugins(plugins.toArray(array));
}
brokerService.start();
brokerService.waitUntilStarted();
}
protected void applyMemoryLimitPolicy() throws Exception {
}
protected void createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(true);
brokerService.setPopulateJMSXUserID(true);
}
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser("system", "manager", "users,admins"));
users.add(new AuthenticationUser("user", "password", "users"));
users.add(new AuthenticationUser("guest", "password", "guests"));
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
return authenticationPlugin;
}
protected BrokerPlugin configureAuthorization() throws Exception {
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
AuthorizationEntry entry = new AuthorizationEntry();
entry.setQueue(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("ActiveMQ.Advisory.>");
entry.setRead("guests,users");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
AuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
return authorizationPlugin;
}
protected void applyBrokerPolicies() throws Exception {
// NOOP here
}
protected void addOpenWireConnector() throws Exception {
}
protected void addStompConnector() throws Exception {
TransportConnector connector = null;
// Subclasses can tailor this list to speed up the test startup / shutdown
connector = brokerService.addConnector("stomp+ssl://0.0.0.0:"+sslPort);
sslPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
nioPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
nioSslPort = connector.getConnectUri().getPort();
}
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
protected StompConnection stompConnect() throws Exception {
if (stompConnection == null) {
stompConnection = new StompConnection();
}
stompConnection.open(createSocket());
return stompConnection;
}
protected StompConnection stompConnect(StompConnection connection) throws Exception {
connection.open(createSocket());
return stompConnection;
}
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.port);
}
protected String getQueueName() {
return getClass().getName() + "." + name.getMethodName();
}
protected String getTopicName() {
return getClass().getName() + "." + name.getMethodName();
}
protected void stompDisconnect() throws IOException {
if (stompConnection != null) {
stompConnection.close();
stompConnection = null;
}
}
}

View File

@ -31,62 +31,43 @@ import javax.management.ObjectName;
import junit.framework.Assert;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompVirtualTopicTest {
public class StompVirtualTopicTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompVirtualTopicTest.class);
private static final int NUM_MSGS = 100000;
private BrokerService broker = null;
private String failMsg = null;
private URI brokerUri;
@Before
public void setUp() throws Exception {
LOG.info("Starting up");
broker = createBroker();
broker.start();
broker.waitUntilStarted();
brokerUri = new URI(broker.getTransportConnectors().get(0).getPublishableConnectString());
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker://()/localhost"));
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("stomp://localhost:0?transport.closeAsync=false");
@Override
protected void createBroker() throws Exception {
brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost"));
brokerService.setUseJmx(true);
brokerService.setDeleteAllMessagesOnStartup(true);
File testDataDir = new File("target/activemq-data/StompVirtualTopicTest");
broker.setDataDirectoryFile(testDataDir);
brokerService.setDataDirectoryFile(testDataDir);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
broker.setPersistenceAdapter(persistenceAdapter);
applyMemoryLimitPolicy(broker);
return broker;
brokerService.setPersistenceAdapter(persistenceAdapter);
}
private void applyMemoryLimitPolicy(BrokerService broker) {
@Override
protected void applyMemoryLimitPolicy() throws Exception {
final SystemUsage memoryManager = new SystemUsage();
memoryManager.getMemoryUsage().setLimit(5818230784L);
memoryManager.getStoreUsage().setLimit(6442450944L);
memoryManager.getTempUsage().setLimit(3221225472L);
broker.setSystemUsage(memoryManager);
brokerService.setSystemUsage(memoryManager);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry();
@ -98,14 +79,7 @@ public class StompVirtualTopicTest {
final PolicyMap policyMap = new PolicyMap();
policyMap.setPolicyEntries(policyEntries);
broker.setDestinationPolicy(policyMap);
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
}
brokerService.setDestinationPolicy(policyMap);
}
@Test
@ -119,9 +93,7 @@ public class StompVirtualTopicTest {
consumerWorker.awaitStartCompleted();
Thread.sleep(500);
StompConnection stompConnection = new StompConnection();
stompConnection.open("localhost", brokerUri.getPort());
stompConnection.sendFrame("CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL);
stompConnection.sendFrame("CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL);
StompFrame frame = stompConnection.receive();
assertTrue(frame.toString().startsWith("CONNECTED"));
@ -173,9 +145,9 @@ public class StompVirtualTopicTest {
class StompConsumer implements Runnable {
final Logger log = LoggerFactory.getLogger(StompConsumer.class);
private StompVirtualTopicTest parent = null;
private CountDownLatch latch = new CountDownLatch(1);
private HashSet<String> received = new HashSet<String>();
private HashSet<String> dups = new HashSet<String>();
private final CountDownLatch latch = new CountDownLatch(1);
private final HashSet<String> received = new HashSet<String>();
private final HashSet<String> dups = new HashSet<String>();
public StompConsumer(StompVirtualTopicTest ref) {
parent = ref;
@ -188,6 +160,7 @@ public class StompVirtualTopicTest {
}
}
@Override
public void run() {
LOG.info("Running Stomp Consumer");
@ -196,8 +169,8 @@ public class StompVirtualTopicTest {
int counter = 0;
try {
stompConnection.open("localhost", brokerUri.getPort());
stompConnection.sendFrame("CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL);
stompConnection.open("localhost", StompVirtualTopicTest.this.port);
stompConnection.sendFrame("CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL);
StompFrame frame = stompConnection.receive();
assertTrue(frame.toString().startsWith("CONNECTED"));
stompConnection.subscribe("/queue/Consumer.A.VirtualTopic.FOO", "auto");
@ -271,7 +244,7 @@ public class StompVirtualTopicTest {
",Destination=Consumer.A.VirtualTopic.FOO" +
",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean)
broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
LOG.info("Consumer.A.VirtualTopic.FOO Inflight: " + queue.getInFlightCount() +
", enqueueCount: " + queue.getEnqueueCount() + ", dequeueCount: " +

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp.util;
import java.io.File;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import javax.annotation.PostConstruct;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.apache.activemq.broker.SslContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.UrlResource;
import org.springframework.util.ResourceUtils;
/**
* Extends the SslContext so that it's easier to configure from spring.
*/
public class ResourceLoadingSslContext extends SslContext {
private String keyStoreType="jks";
private String trustStoreType="jks";
private String secureRandomAlgorithm="SHA1PRNG";
private String keyStoreAlgorithm=KeyManagerFactory.getDefaultAlgorithm();
private String trustStoreAlgorithm=TrustManagerFactory.getDefaultAlgorithm();
private String keyStore;
private String trustStore;
private String keyStoreKeyPassword;
private String keyStorePassword;
private String trustStorePassword;
/**
*
* @throws Exception
* @org.apache.xbean.InitMethod
*/
@PostConstruct
public void afterPropertiesSet() throws Exception {
keyManagers.addAll(createKeyManagers());
trustManagers.addAll(createTrustManagers());
if( secureRandom == null ) {
secureRandom = createSecureRandom();
}
}
private SecureRandom createSecureRandom() throws NoSuchAlgorithmException {
return SecureRandom.getInstance(secureRandomAlgorithm);
}
private Collection<TrustManager> createTrustManagers() throws Exception {
KeyStore ks = createTrustManagerKeyStore();
if( ks ==null ) {
return new ArrayList<TrustManager>(0);
}
TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustStoreAlgorithm);
tmf.init(ks);
return Arrays.asList(tmf.getTrustManagers());
}
private Collection<KeyManager> createKeyManagers() throws Exception {
KeyStore ks = createKeyManagerKeyStore();
if( ks ==null ) {
return new ArrayList<KeyManager>(0);
}
KeyManagerFactory tmf = KeyManagerFactory.getInstance(keyStoreAlgorithm);
tmf.init(ks, keyStoreKeyPassword == null ? (keyStorePassword==null? null : keyStorePassword.toCharArray()) : keyStoreKeyPassword.toCharArray());
return Arrays.asList(tmf.getKeyManagers());
}
private KeyStore createTrustManagerKeyStore() throws Exception {
if( trustStore ==null ) {
return null;
}
KeyStore ks = KeyStore.getInstance(trustStoreType);
InputStream is=resourceFromString(trustStore).getInputStream();
try {
ks.load(is, trustStorePassword==null? null : trustStorePassword.toCharArray());
} finally {
is.close();
}
return ks;
}
private KeyStore createKeyManagerKeyStore() throws Exception {
if( keyStore ==null ) {
return null;
}
KeyStore ks = KeyStore.getInstance(keyStoreType);
InputStream is=resourceFromString(keyStore).getInputStream();
try {
ks.load(is, keyStorePassword==null? null : keyStorePassword.toCharArray());
} finally {
is.close();
}
return ks;
}
public String getTrustStoreType() {
return trustStoreType;
}
public String getKeyStoreType() {
return keyStoreType;
}
public String getKeyStore() {
return keyStore;
}
public void setKeyStore(String keyStore) throws MalformedURLException {
this.keyStore = keyStore;
}
public String getTrustStore() {
return trustStore;
}
public void setTrustStore(String trustStore) throws MalformedURLException {
this.trustStore = trustStore;
}
public String getKeyStoreAlgorithm() {
return keyStoreAlgorithm;
}
public void setKeyStoreAlgorithm(String keyAlgorithm) {
this.keyStoreAlgorithm = keyAlgorithm;
}
public String getTrustStoreAlgorithm() {
return trustStoreAlgorithm;
}
public void setTrustStoreAlgorithm(String trustAlgorithm) {
this.trustStoreAlgorithm = trustAlgorithm;
}
public String getKeyStoreKeyPassword() {
return keyStoreKeyPassword;
}
public void setKeyStoreKeyPassword(String keyPassword) {
this.keyStoreKeyPassword = keyPassword;
}
public String getKeyStorePassword() {
return keyStorePassword;
}
public void setKeyStorePassword(String keyPassword) {
this.keyStorePassword = keyPassword;
}
public String getTrustStorePassword() {
return trustStorePassword;
}
public void setTrustStorePassword(String trustPassword) {
this.trustStorePassword = trustPassword;
}
public void setKeyStoreType(String keyType) {
this.keyStoreType = keyType;
}
public void setTrustStoreType(String trustType) {
this.trustStoreType = trustType;
}
public String getSecureRandomAlgorithm() {
return secureRandomAlgorithm;
}
public void setSecureRandomAlgorithm(String secureRandomAlgorithm) {
this.secureRandomAlgorithm = secureRandomAlgorithm;
}
public static Resource resourceFromString(String uri) throws MalformedURLException {
Resource resource;
File file = new File(uri);
if (file.exists()) {
resource = new FileSystemResource(uri);
} else if (ResourceUtils.isUrl(uri)) {
resource = new UrlResource(uri);
} else {
resource = new ClassPathResource(uri);
}
return resource;
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp.util;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.transport.stomp.SamplePojo;
import com.thoughtworks.xstream.XStream;
public class XStreamBrokerContext implements BrokerContext {
private final Map<String, XStream> beansMap = new HashMap<String, XStream>();
public XStreamBrokerContext() {
XStream stream = new XStream();
stream.processAnnotations(SamplePojo.class);
beansMap.put("xstream", stream);
}
@Override
public Object getBean(String name) {
return this.beansMap.get(name);
}
@SuppressWarnings("rawtypes")
@Override
public Map getBeansOfType(Class type) {
if (type.equals(XStream.class)) {
return this.beansMap;
}
return null;
}
}

Binary file not shown.

View File

@ -0,0 +1,42 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq.transport.failover=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
#log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n
log4j.appender.out.file=target/activemq-test.log
log4j.appender.out.append=true

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
activemq-domain {
org.apache.activemq.jaas.PropertiesLoginModule required
debug=true
org.apache.activemq.jaas.properties.user="org/apache/activemq/security/users.properties"
org.apache.activemq.jaas.properties.group="org/apache/activemq/security/groups.properties";
};
activemq-guest-domain {
org.apache.activemq.jaas.PropertiesLoginModule sufficient
debug=true
org.apache.activemq.jaas.properties.user="org/apache/activemq/security/users.properties"
org.apache.activemq.jaas.properties.group="org/apache/activemq/security/groups.properties";
org.apache.activemq.jaas.GuestLoginModule sufficient
debug=true
org.apache.activemq.jaas.guest.user="guest"
org.apache.activemq.jaas.guest.group="guests";
};
activemq-guest-when-no-creds-only-domain {
org.apache.activemq.jaas.GuestLoginModule sufficient
debug=true
credentialsInvalidate=true
org.apache.activemq.jaas.guest.user="guest"
org.apache.activemq.jaas.guest.group="guests";
org.apache.activemq.jaas.PropertiesLoginModule requisite
debug=true
org.apache.activemq.jaas.properties.user="org/apache/activemq/security/users.properties"
org.apache.activemq.jaas.properties.group="org/apache/activemq/security/groups.properties";
};
cert-login {
org.apache.activemq.jaas.TextFileCertificateLoginModule required
debug=true
org.apache.activemq.jaas.textfiledn.user="org/apache/activemq/security/users.properties"
org.apache.activemq.jaas.textfiledn.group="org/apache/activemq/security/groups.properties";
};
broker1 {
org.apache.activemq.jaas.TextFileCertificateLoginModule required
debug=true
org.apache.activemq.jaas.textfiledn.user="org/apache/activemq/security/users1.properties"
org.apache.activemq.jaas.textfiledn.group="org/apache/activemq/security/groups.properties";
};
broker2 {
org.apache.activemq.jaas.TextFileCertificateLoginModule required
debug=true
org.apache.activemq.jaas.textfiledn.user="org/apache/activemq/security/users2.properties"
org.apache.activemq.jaas.textfiledn.group="org/apache/activemq/security/groups.properties";
};

View File

@ -0,0 +1,21 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
admins=system,sslclient,client,broker1,broker2
tempDestinationAdmins=system,user,sslclient,client,broker1,broker2
users=system,user,sslclient,client,broker1,broker2
guests=guest

View File

@ -0,0 +1,21 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
system=manager
user=password
guest=password
sslclient=CN=localhost, OU=activemq.org, O=activemq.org, L=LA, ST=CA, C=US

Binary file not shown.