mirror of https://github.com/apache/activemq.git
Cleanup some unused dependencies and trash
(cherry picked from commit 3791d17e1f
)
This commit is contained in:
parent
30314f162c
commit
769ae5adae
70
pom.xml
70
pom.xml
|
@ -41,14 +41,10 @@
|
|||
<activemq-protobuf-version>1.1</activemq-protobuf-version>
|
||||
<activesoap-version>1.3</activesoap-version>
|
||||
<annogen-version>0.1.0</annogen-version>
|
||||
<ant-bundle-version>1.10.7_1</ant-bundle-version>
|
||||
<aopalliance-version>1.0</aopalliance-version>
|
||||
<aries-version>1.1.0</aries-version>
|
||||
<aries-transaction-version>1.1.1</aries-transaction-version>
|
||||
<axion-version>1.0-M3-dev</axion-version>
|
||||
<camel-version>2.25.2</camel-version>
|
||||
<camel-version-range>[2.20,3)</camel-version-range>
|
||||
<cglib-version>2.2</cglib-version>
|
||||
<commons-beanutils-version>1.9.4</commons-beanutils-version>
|
||||
<commons-collections-version>3.2.2</commons-collections-version>
|
||||
<commons-daemon-version>1.2.3</commons-daemon-version>
|
||||
|
@ -62,13 +58,10 @@
|
|||
<directory-version>2.0.0.AM25</directory-version>
|
||||
<ecj.version>3.17.0</ecj.version>
|
||||
<ftpserver-version>1.1.1</ftpserver-version>
|
||||
<geronimo-version>1.0</geronimo-version>
|
||||
<guava-version>28.2-jre</guava-version>
|
||||
<hadoop-version>1.2.1</hadoop-version>
|
||||
<hawtbuf-version>1.11</hawtbuf-version>
|
||||
<hawtdispatch-version>1.22</hawtdispatch-version>
|
||||
<howl-version>0.1.8</howl-version>
|
||||
<hsqldb-version>1.8.0.12</hsqldb-version>
|
||||
<httpclient-version>4.5.13</httpclient-version>
|
||||
<httpcore-version>4.4.13</httpcore-version>
|
||||
<insight-version>1.2.0.Beta4</insight-version>
|
||||
|
@ -88,7 +81,6 @@
|
|||
<json-simple-version>1.1.1</json-simple-version>
|
||||
<junit-version>4.13.1</junit-version>
|
||||
<hamcrest-version>1.3</hamcrest-version>
|
||||
<jxta-version>2.0</jxta-version>
|
||||
<karaf-version>4.2.10</karaf-version>
|
||||
<leveldb-api-version>0.9</leveldb-api-version>
|
||||
<leveldb-version>0.9</leveldb-version>
|
||||
|
@ -98,10 +90,8 @@
|
|||
<owasp-dependency-check-version>5.2.4</owasp-dependency-check-version>
|
||||
<powermock-version>1.6.5</powermock-version>
|
||||
<mqtt-client-version>1.16</mqtt-client-version>
|
||||
<openjpa-version>1.2.0</openjpa-version>
|
||||
<org-apache-derby-version>10.14.2.0</org-apache-derby-version>
|
||||
<osgi-version>6.0.0</osgi-version>
|
||||
<p2psockets-version>1.1.2</p2psockets-version>
|
||||
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
|
||||
<zookeeper-version>3.4.14</zookeeper-version>
|
||||
<qpid-proton-version>0.33.8</qpid-proton-version>
|
||||
|
@ -111,8 +101,6 @@
|
|||
<netty-all-version>4.1.53.Final</netty-all-version>
|
||||
<regexp-version>1.3</regexp-version>
|
||||
<rome-version>1.15.0</rome-version>
|
||||
<saxon-version>9.5.1-5</saxon-version>
|
||||
<saxon-bundle-version>9.5.1-5_1</saxon-bundle-version>
|
||||
<scala-plugin-version>3.1.0</scala-plugin-version>
|
||||
<scala-version>2.11.11</scala-version>
|
||||
<shiro-version>1.7.0</shiro-version>
|
||||
|
@ -123,8 +111,6 @@
|
|||
<taglibs-version>1.2.5</taglibs-version>
|
||||
<velocity-version>2.2</velocity-version>
|
||||
<xalan-version>2.7.2</xalan-version>
|
||||
<xmlbeans-version>3.1.0</xmlbeans-version>
|
||||
<xmlbeans-bundle-version>2.6.0_2</xmlbeans-bundle-version>
|
||||
<xpp3-version>1.1.4c</xpp3-version>
|
||||
<xstream-version>1.4.15</xstream-version>
|
||||
<xbean-version>4.18</xbean-version>
|
||||
|
@ -693,12 +679,6 @@
|
|||
<version>${commons-collections-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.openjpa</groupId>
|
||||
<artifactId>openjpa-persistence-jdbc</artifactId>
|
||||
<version>${openjpa-version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Optional Shiro Support -->
|
||||
<dependency>
|
||||
<groupId>org.apache.shiro</groupId>
|
||||
|
@ -842,15 +822,6 @@
|
|||
<version>${regexp-version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Optional HSQL DB Support -->
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>hsqldb</groupId>
|
||||
<artifactId>hsqldb</artifactId>
|
||||
<version>${hsqldb-version}</version>
|
||||
</dependency>
|
||||
-->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-dbcp2</artifactId>
|
||||
|
@ -863,15 +834,6 @@
|
|||
<version>${commons-pool2-version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Optional Journal Implementation -->
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>howl</groupId>
|
||||
<artifactId>howl-logger</artifactId>
|
||||
<version>${howl-version}</version>
|
||||
</dependency>
|
||||
-->
|
||||
|
||||
<!-- Optional Jabber support -->
|
||||
<dependency>
|
||||
<groupId>activemq</groupId>
|
||||
|
@ -885,21 +847,6 @@
|
|||
<version>1.5.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- =============================== -->
|
||||
<!-- XML processing dependencies -->
|
||||
<!-- =============================== -->
|
||||
<!-- For XMLBeans -->
|
||||
<dependency>
|
||||
<groupId>org.apache.xmlbeans</groupId>
|
||||
<artifactId>xmlbeans</artifactId>
|
||||
<version>${xmlbeans-version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.xmlbeans</groupId>
|
||||
<artifactId>xmlbeans-xpath</artifactId>
|
||||
<version>${xmlbeans-version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- To use XPath using JAXP 1.3 (std in Java 5) -->
|
||||
<dependency>
|
||||
<groupId>activesoap</groupId>
|
||||
|
@ -991,12 +938,6 @@
|
|||
<version>${taglibs-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>aopalliance</groupId>
|
||||
<artifactId>aopalliance</artifactId>
|
||||
<version>${aopalliance-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.jasypt</groupId>
|
||||
<artifactId>jasypt</artifactId>
|
||||
|
@ -1088,17 +1029,6 @@
|
|||
<version>${mqtt-client-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>p2psockets</groupId>
|
||||
<artifactId>p2psockets-core</artifactId>
|
||||
<version>${p2psockets-version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>jxta</groupId>
|
||||
<artifactId>jxta</artifactId>
|
||||
<version>${jxta-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
|
|
@ -1,249 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-parent</artifactId>
|
||||
<version>5.9-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>activemq-optional</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>ActiveMQ :: Optional</name>
|
||||
<description>Optional ActiveMQ features</description>
|
||||
|
||||
<dependencies>
|
||||
<!-- activemq -->
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>activemq-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>activemq-broker</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>activeio-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>activemq-console</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-jms</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>aopalliance</groupId>
|
||||
<artifactId>aopalliance</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.thoughtworks.xstream</groupId>
|
||||
<artifactId>xstream</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>xpp3</groupId>
|
||||
<artifactId>xpp3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.aggregate</groupId>
|
||||
<artifactId>jetty-all-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-webapp</artifactId>
|
||||
<version>${jetty-version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-websocket</artifactId>
|
||||
<version>${jetty-version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-pool2</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- log4j jms appender and test tool needs this -->
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<scope>compile</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>compile</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.xmlbeans</groupId>
|
||||
<artifactId>xmlbeans</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.xmlbeans</groupId>
|
||||
<artifactId>xmlbeans-xpath</artifactId>
|
||||
<optional>true</optional>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>net.sf.saxon</groupId>
|
||||
<artifactId>saxon</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.sf.saxon</groupId>
|
||||
<artifactId>Saxon-HE</artifactId>
|
||||
<version>${saxon-version}</version>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>xerces</groupId>
|
||||
<artifactId>xercesImpl</artifactId>
|
||||
<version>${xerces-version}</version>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>xalan</groupId>
|
||||
<artifactId>xalan</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-oxm</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.xbean</groupId>
|
||||
<artifactId>xbean-spring</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jettison</groupId>
|
||||
<artifactId>jettison</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!--
|
||||
TODO: Not needed, but OSGi bundle on the way: https://issues.apache.org/jira/browse/SMX4-1238
|
||||
<dependency>
|
||||
<groupId>net.sf.josql</groupId>
|
||||
<artifactId>gentlyweb-utils</artifactId>
|
||||
<version>1.5</version>
|
||||
</dependency>
|
||||
-->
|
||||
<dependency>
|
||||
<groupId>org.seleniumhq.selenium</groupId>
|
||||
<artifactId>selenium-java</artifactId>
|
||||
<version>2.25.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.seleniumhq.selenium</groupId>
|
||||
<artifactId>selenium-chrome-driver</artifactId>
|
||||
<version>2.25.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.seleniumhq.selenium</groupId>
|
||||
<artifactId>selenium-firefox-driver</artifactId>
|
||||
<version>2.25.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.felix</groupId>
|
||||
<artifactId>maven-bundle-plugin</artifactId>
|
||||
<configuration>
|
||||
<instructions>
|
||||
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
|
||||
<Fragment-Host>org.apache.activemq.activemq-core</Fragment-Host>
|
||||
<Export-Package>
|
||||
org.apache.activemq.transport.http*;version=${project.version};-noimport:=;-split-package:=merge-last,
|
||||
org.apache.activemq.transport.https*;version=${project.version};-noimport:=;-split-package:=merge-last
|
||||
</Export-Package>
|
||||
<Import-Package>
|
||||
org.eclipse.jetty*;version="[7.6,8.0)";resolution:=optional,
|
||||
!org.apache.activemq.transport.ws*;version=${project.version},
|
||||
!org.apache.activemq.transport.xstream;version=${project.version},
|
||||
!org.apache.activemq.transport.util;version=${project.version},
|
||||
org.apache.activemq*;version=${project.version};resolution:=optional
|
||||
</Import-Package>
|
||||
</instructions>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>bundle-manifest</id>
|
||||
<phase>process-classes</phase>
|
||||
<goals>
|
||||
<goal>manifest</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<!-- Configure which tests are included/excuded -->
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -1,220 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.benchmark;
|
||||
|
||||
import java.text.NumberFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
|
||||
/**
|
||||
* Abstract base class for some simple benchmark tools
|
||||
*/
|
||||
public class BenchmarkSupport {
|
||||
|
||||
protected int connectionCount = 1;
|
||||
protected int batch = 1000;
|
||||
protected Destination destination;
|
||||
protected String[] subjects;
|
||||
|
||||
private boolean topic = true;
|
||||
private boolean durable;
|
||||
private ActiveMQConnectionFactory factory;
|
||||
private String url;
|
||||
private int counter;
|
||||
private List<Object> resources = new ArrayList<Object>();
|
||||
private NumberFormat formatter = NumberFormat.getInstance();
|
||||
private AtomicInteger connectionCounter = new AtomicInteger(0);
|
||||
private IdGenerator idGenerator = new IdGenerator();
|
||||
private boolean timerLoop;
|
||||
|
||||
public BenchmarkSupport() {
|
||||
}
|
||||
|
||||
public void start() {
|
||||
System.out.println("Using: " + connectionCount + " connection(s)");
|
||||
subjects = new String[connectionCount];
|
||||
for (int i = 0; i < connectionCount; i++) {
|
||||
subjects[i] = "BENCHMARK.FEED" + i;
|
||||
}
|
||||
if (useTimerLoop()) {
|
||||
Thread timer = new Thread() {
|
||||
public void run() {
|
||||
timerLoop();
|
||||
}
|
||||
};
|
||||
timer.start();
|
||||
}
|
||||
}
|
||||
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
public void setUrl(String url) {
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
public boolean isTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(boolean topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public ActiveMQConnectionFactory getFactory() {
|
||||
return factory;
|
||||
}
|
||||
|
||||
public void setFactory(ActiveMQConnectionFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
public void setSubject(String subject) {
|
||||
connectionCount = 1;
|
||||
subjects = new String[] {
|
||||
subject
|
||||
};
|
||||
}
|
||||
|
||||
public boolean isDurable() {
|
||||
return durable;
|
||||
}
|
||||
|
||||
public void setDurable(boolean durable) {
|
||||
this.durable = durable;
|
||||
}
|
||||
|
||||
public int getConnectionCount() {
|
||||
return connectionCount;
|
||||
}
|
||||
|
||||
public void setConnectionCount(int connectionCount) {
|
||||
this.connectionCount = connectionCount;
|
||||
}
|
||||
|
||||
protected Session createSession() throws JMSException {
|
||||
if (factory == null) {
|
||||
factory = createFactory();
|
||||
}
|
||||
Connection connection = factory.createConnection();
|
||||
int value = connectionCounter.incrementAndGet();
|
||||
System.out.println("Created connection: " + value + " = " + connection);
|
||||
if (durable) {
|
||||
connection.setClientID(idGenerator.generateId());
|
||||
}
|
||||
addResource(connection);
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
addResource(session);
|
||||
return session;
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createFactory() {
|
||||
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(getUrl());
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected synchronized void count(int count) {
|
||||
counter += count;
|
||||
/*
|
||||
* if (counter > batch) { counter = 0; long current =
|
||||
* System.currentTimeMillis(); double end = current - time; end /= 1000;
|
||||
* time = current; System.out.println("Processed " + batch + " messages
|
||||
* in " + end + " (secs)"); }
|
||||
*/
|
||||
}
|
||||
|
||||
protected synchronized int resetCount() {
|
||||
int answer = counter;
|
||||
counter = 0;
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected void timerLoop() {
|
||||
int times = 0;
|
||||
int total = 0;
|
||||
int dumpVmStatsFrequency = 10;
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
int processed = resetCount();
|
||||
double average = 0;
|
||||
if (processed > 0) {
|
||||
total += processed;
|
||||
times++;
|
||||
}
|
||||
if (times > 0) {
|
||||
average = total / (double) times;
|
||||
}
|
||||
|
||||
System.out.println(getClass().getName() + " Processed: " + processed + " messages this second. Average: " + average);
|
||||
|
||||
if ((times % dumpVmStatsFrequency) == 0 && times != 0) {
|
||||
System.out.println("Used memory: " + asMemoryString(runtime.totalMemory() - runtime.freeMemory()) + " Free memory: " + asMemoryString(runtime.freeMemory()) + " Total memory: "
|
||||
+ asMemoryString(runtime.totalMemory()) + " Max memory: " + asMemoryString(runtime.maxMemory()));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
protected String asMemoryString(long value) {
|
||||
return formatter.format(value / 1024) + " K";
|
||||
}
|
||||
|
||||
protected boolean useTimerLoop() {
|
||||
return timerLoop;
|
||||
}
|
||||
|
||||
protected Destination createDestination(Session session, String subject) throws JMSException {
|
||||
if (topic) {
|
||||
return session.createTopic(subject);
|
||||
} else {
|
||||
return session.createQueue(subject);
|
||||
}
|
||||
}
|
||||
|
||||
protected void addResource(Object resource) {
|
||||
resources.add(resource);
|
||||
}
|
||||
|
||||
public int getCounter() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
public void setTimerLoop(boolean timerLoop) {
|
||||
this.timerLoop = timerLoop;
|
||||
}
|
||||
|
||||
protected static boolean parseBoolean(String text) {
|
||||
return text.equalsIgnoreCase("true");
|
||||
}
|
||||
}
|
|
@ -1,104 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.benchmark;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
public class Consumer extends BenchmarkSupport implements MessageListener {
|
||||
|
||||
public Consumer() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Consumer tool = new Consumer();
|
||||
if (args.length > 0) {
|
||||
tool.setUrl(args[0]);
|
||||
}
|
||||
if (args.length > 1) {
|
||||
tool.setTopic(parseBoolean(args[1]));
|
||||
}
|
||||
if (args.length > 2) {
|
||||
tool.setSubject(args[2]);
|
||||
}
|
||||
if (args.length > 3) {
|
||||
tool.setDurable(parseBoolean(args[3]));
|
||||
}
|
||||
if (args.length > 4) {
|
||||
tool.setConnectionCount(Integer.parseInt(args[4]));
|
||||
}
|
||||
|
||||
try {
|
||||
tool.run();
|
||||
} catch (Exception e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void run() throws JMSException {
|
||||
start();
|
||||
subscribe();
|
||||
}
|
||||
|
||||
protected void subscribe() throws JMSException {
|
||||
for (int i = 0; i < subjects.length; i++) {
|
||||
subscribe(subjects[i]);
|
||||
}
|
||||
}
|
||||
|
||||
protected void subscribe(String subject) throws JMSException {
|
||||
Session session = createSession();
|
||||
|
||||
Destination destination = createDestination(session, subject);
|
||||
|
||||
System.out.println("Consuming on : " + destination + " of type: " + destination.getClass().getName());
|
||||
|
||||
MessageConsumer consumer = null;
|
||||
if (isDurable() && isTopic()) {
|
||||
consumer = session.createDurableSubscriber((Topic)destination, getClass().getName());
|
||||
} else {
|
||||
consumer = session.createConsumer(destination);
|
||||
}
|
||||
consumer.setMessageListener(this);
|
||||
addResource(consumer);
|
||||
}
|
||||
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
TextMessage textMessage = (TextMessage)message;
|
||||
|
||||
// lets force the content to be deserialized
|
||||
textMessage.getText();
|
||||
count(1);
|
||||
|
||||
// lets count the messages
|
||||
|
||||
// message.acknowledge();
|
||||
} catch (JMSException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,183 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.benchmark;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
public class Producer extends BenchmarkSupport {
|
||||
|
||||
int loops = -1;
|
||||
int loopSize = 1000;
|
||||
private int messageSize = 1000;
|
||||
|
||||
public Producer() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Producer tool = new Producer();
|
||||
if (args.length > 0) {
|
||||
tool.setUrl(args[0]);
|
||||
}
|
||||
if (args.length > 1) {
|
||||
tool.setTopic(parseBoolean(args[1]));
|
||||
}
|
||||
if (args.length > 2) {
|
||||
tool.setSubject(args[2]);
|
||||
}
|
||||
if (args.length > 3) {
|
||||
tool.setDurable(parseBoolean(args[3]));
|
||||
}
|
||||
if (args.length > 4) {
|
||||
tool.setMessageSize(Integer.parseInt(args[4]));
|
||||
}
|
||||
if (args.length > 5) {
|
||||
tool.setConnectionCount(Integer.parseInt(args[5]));
|
||||
}
|
||||
try {
|
||||
tool.run();
|
||||
} catch (Exception e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void run() throws Exception {
|
||||
start();
|
||||
publish();
|
||||
}
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
public int getMessageSize() {
|
||||
return messageSize;
|
||||
}
|
||||
|
||||
public void setMessageSize(int messageSize) {
|
||||
this.messageSize = messageSize;
|
||||
}
|
||||
|
||||
public int getLoopSize() {
|
||||
return loopSize;
|
||||
}
|
||||
|
||||
public void setLoopSize(int loopSize) {
|
||||
this.loopSize = loopSize;
|
||||
}
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
protected void publish() throws Exception {
|
||||
final String text = getMessage();
|
||||
|
||||
System.out.println("Publishing to: " + subjects.length + " subject(s)");
|
||||
|
||||
for (int i = 0; i < subjects.length; i++) {
|
||||
final String subject = subjects[i];
|
||||
Thread thread = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
publish(text, subject);
|
||||
} catch (JMSException e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected String getMessage() {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
for (int i = 0; i < messageSize; i++) {
|
||||
char ch = 'X';
|
||||
buffer.append(ch);
|
||||
}
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
protected void publish(String text, String subject) throws JMSException {
|
||||
Session session = createSession();
|
||||
|
||||
Destination destination = createDestination(session, subject);
|
||||
|
||||
MessageProducer publisher = session.createProducer(destination);
|
||||
if (isDurable()) {
|
||||
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
} else {
|
||||
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
}
|
||||
|
||||
System.out.println("Starting publisher on : " + destination + " of type: " + destination.getClass().getName());
|
||||
System.out.println("Message length: " + text.length());
|
||||
|
||||
if (loops <= 0) {
|
||||
while (true) {
|
||||
publishLoop(session, publisher, text);
|
||||
}
|
||||
} else {
|
||||
for (int i = 0; i < loops; i++) {
|
||||
publishLoop(session, publisher, text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException {
|
||||
for (int i = 0; i < loopSize; i++) {
|
||||
Message message = session.createTextMessage(text);
|
||||
|
||||
publisher.send(message);
|
||||
count(1);
|
||||
}
|
||||
}
|
||||
|
||||
protected String loadFile(String file) throws IOException {
|
||||
System.out.println("Loading file: " + file);
|
||||
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
BufferedReader in = new BufferedReader(new FileReader(file));
|
||||
while (true) {
|
||||
String line = in.readLine();
|
||||
if (line == null) {
|
||||
break;
|
||||
}
|
||||
buffer.append(line);
|
||||
buffer.append(File.separator);
|
||||
}
|
||||
in.close();
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
public int getLoops() {
|
||||
return loops;
|
||||
}
|
||||
|
||||
public void setLoops(int loops) {
|
||||
this.loops = loops;
|
||||
}
|
||||
}
|
|
@ -1,80 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.benchmark;
|
||||
|
||||
public class ProducerConsumer extends Producer {
|
||||
|
||||
private Consumer consumer = new Consumer();
|
||||
|
||||
public ProducerConsumer() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
ProducerConsumer tool = new ProducerConsumer();
|
||||
if (args.length > 0) {
|
||||
tool.setUrl(args[0]);
|
||||
}
|
||||
if (args.length > 1) {
|
||||
tool.setTopic(parseBoolean(args[1]));
|
||||
}
|
||||
if (args.length > 2) {
|
||||
tool.setSubject(args[2]);
|
||||
}
|
||||
if (args.length > 3) {
|
||||
tool.setDurable(Boolean.getBoolean(args[3]));
|
||||
}
|
||||
if (args.length > 4) {
|
||||
tool.setConnectionCount(Integer.parseInt(args[4]));
|
||||
}
|
||||
try {
|
||||
tool.run();
|
||||
} catch (Exception e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void run() throws Exception {
|
||||
consumer.start();
|
||||
consumer.subscribe();
|
||||
start();
|
||||
publish();
|
||||
}
|
||||
|
||||
public void setTopic(boolean topic) {
|
||||
super.setTopic(topic);
|
||||
consumer.setTopic(topic);
|
||||
}
|
||||
|
||||
public void setSubject(String subject) {
|
||||
super.setSubject(subject);
|
||||
consumer.setSubject(subject);
|
||||
}
|
||||
|
||||
public void setUrl(String url) {
|
||||
super.setUrl(url);
|
||||
consumer.setUrl(url);
|
||||
}
|
||||
|
||||
protected boolean useTimerLoop() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public Consumer getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
}
|
|
@ -1,362 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class AcidTestTool extends TestCase {
|
||||
|
||||
|
||||
// Worker configuration.
|
||||
protected int recordSize = 1024;
|
||||
protected int batchSize = 5;
|
||||
protected int workerThinkTime = 500;
|
||||
protected Destination target;
|
||||
|
||||
private Random random = new Random();
|
||||
private byte data[];
|
||||
private int workerCount = 10;
|
||||
private AtomicBoolean ignoreJMSErrors = new AtomicBoolean(false);
|
||||
private ActiveMQConnectionFactory factory;
|
||||
private Connection connection;
|
||||
private AtomicInteger publishedBatches = new AtomicInteger(0);
|
||||
private AtomicInteger consumedBatches = new AtomicInteger(0);
|
||||
private List<Throwable> errors = Collections.synchronizedList(new ArrayList<Throwable>());
|
||||
|
||||
private interface Worker extends Runnable {
|
||||
boolean waitForExit(long i) throws InterruptedException;
|
||||
}
|
||||
|
||||
private final class ProducerWorker implements Worker {
|
||||
|
||||
private Session session;
|
||||
private MessageProducer producer;
|
||||
private BytesMessage message;
|
||||
private CountDownLatch doneLatch = new CountDownLatch(1);
|
||||
|
||||
ProducerWorker(Session session, String workerId) throws JMSException {
|
||||
this.session = session;
|
||||
producer = session.createProducer(target);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
message = session.createBytesMessage();
|
||||
message.setStringProperty("workerId", workerId);
|
||||
message.writeBytes(data);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
for (int batchId = 0; true; batchId++) {
|
||||
// System.out.println("Sending batch: "+workerId+"
|
||||
// "+batchId);
|
||||
for (int msgId = 0; msgId < batchSize; msgId++) {
|
||||
// Sleep some random amount of time less than
|
||||
// workerThinkTime
|
||||
try {
|
||||
Thread.sleep(random.nextInt(workerThinkTime));
|
||||
} catch (InterruptedException e1) {
|
||||
return;
|
||||
}
|
||||
|
||||
message.setIntProperty("batch-id", batchId);
|
||||
message.setIntProperty("msg-id", msgId);
|
||||
|
||||
producer.send(message);
|
||||
}
|
||||
session.commit();
|
||||
publishedBatches.incrementAndGet();
|
||||
// System.out.println("Commited send batch: "+workerId+"
|
||||
// "+batchId);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
if (!ignoreJMSErrors.get()) {
|
||||
e.printStackTrace();
|
||||
errors.add(e);
|
||||
}
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
errors.add(e);
|
||||
return;
|
||||
} finally {
|
||||
System.out.println("Producer exiting.");
|
||||
doneLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean waitForExit(long i) throws InterruptedException {
|
||||
return doneLatch.await(i, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private final class ConsumerWorker implements Worker {
|
||||
|
||||
private Session session;
|
||||
private MessageConsumer consumer;
|
||||
private final long timeout;
|
||||
private CountDownLatch doneLatch = new CountDownLatch(1);
|
||||
|
||||
ConsumerWorker(Session session, String workerId, long timeout) throws JMSException {
|
||||
this.session = session;
|
||||
this.timeout = timeout;
|
||||
consumer = session.createConsumer(target, "workerId='" + workerId + "'");
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
int batchId = 0;
|
||||
while (true) {
|
||||
for (int msgId = 0; msgId < batchSize; msgId++) {
|
||||
|
||||
// Sleep some random amount of time less than
|
||||
// workerThinkTime
|
||||
try {
|
||||
Thread.sleep(random.nextInt(workerThinkTime));
|
||||
} catch (InterruptedException e1) {
|
||||
return;
|
||||
}
|
||||
|
||||
Message message = consumer.receive(timeout);
|
||||
if (msgId > 0) {
|
||||
assertNotNull(message);
|
||||
assertEquals(message.getIntProperty("batch-id"), batchId);
|
||||
assertEquals(message.getIntProperty("msg-id"), msgId);
|
||||
} else {
|
||||
if (message == null) {
|
||||
System.out.println("At end of batch an don't have a next batch to process. done.");
|
||||
return;
|
||||
}
|
||||
assertEquals(msgId, message.getIntProperty("msg-id"));
|
||||
batchId = message.getIntProperty("batch-id");
|
||||
// System.out.println("Receiving batch: "+workerId+"
|
||||
// "+batchId);
|
||||
}
|
||||
|
||||
}
|
||||
session.commit();
|
||||
consumedBatches.incrementAndGet();
|
||||
// System.out.println("Commited receive batch: "+workerId+"
|
||||
// "+batchId);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
if (!ignoreJMSErrors.get()) {
|
||||
e.printStackTrace();
|
||||
errors.add(e);
|
||||
}
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
errors.add(e);
|
||||
return;
|
||||
} finally {
|
||||
System.out.println("Consumer exiting.");
|
||||
doneLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean waitForExit(long i) throws InterruptedException {
|
||||
return doneLatch.await(i, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @see junit.framework.TestCase#setUp()
|
||||
*/
|
||||
protected void setUp() throws Exception {
|
||||
factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
this.target = new ActiveMQQueue(getClass().getName());
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws InterruptedException
|
||||
* @throws JMSException
|
||||
* @throws JMSException
|
||||
*/
|
||||
private void reconnect() throws InterruptedException, JMSException {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
connection = null;
|
||||
}
|
||||
|
||||
long reconnectDelay = 1000;
|
||||
|
||||
while (connection == null) {
|
||||
if (reconnectDelay > 1000 * 10) {
|
||||
reconnectDelay = 1000 * 10;
|
||||
}
|
||||
try {
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
} catch (JMSException e) {
|
||||
Thread.sleep(reconnectDelay);
|
||||
reconnectDelay *= 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Throwable
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testAcidTransactions() throws Throwable {
|
||||
|
||||
System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: " + batchSize + ", Worker Think Time: " + workerThinkTime);
|
||||
|
||||
// Create the record and fill it with some values.
|
||||
data = new byte[recordSize];
|
||||
for (int i = 0; i < data.length; i++) {
|
||||
data[i] = (byte)i;
|
||||
}
|
||||
|
||||
System.out.println("==============================================");
|
||||
System.out.println("===> Start the server now.");
|
||||
System.out.println("==============================================");
|
||||
reconnect();
|
||||
|
||||
System.out.println("Starting " + workerCount + " Workers...");
|
||||
ArrayList<Worker> workers = new ArrayList<Worker>();
|
||||
for (int i = 0; i < workerCount; i++) {
|
||||
String workerId = "worker-" + i;
|
||||
|
||||
Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 1000 * 5);
|
||||
workers.add(w);
|
||||
new Thread(w, "Consumer:" + workerId).start();
|
||||
|
||||
w = new ProducerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId);
|
||||
workers.add(w);
|
||||
new Thread(w, "Producer:" + workerId).start();
|
||||
}
|
||||
|
||||
System.out.println("Waiting for " + (workerCount * 10) + " batches to be delivered.");
|
||||
|
||||
//
|
||||
// Wait for about 5 batches of messages per worker to be consumed before
|
||||
// restart.
|
||||
//
|
||||
while (publishedBatches.get() < workerCount * 5) {
|
||||
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
System.out.println("==============================================");
|
||||
System.out.println("===> Server is under load now. Kill it!");
|
||||
System.out.println("==============================================");
|
||||
ignoreJMSErrors.set(true);
|
||||
|
||||
// Wait for all the workers to finish.
|
||||
System.out.println("Waiting for all workers to exit due to server shutdown.");
|
||||
for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) {
|
||||
Worker worker = iter.next();
|
||||
while (!worker.waitForExit(1000)) {
|
||||
System.out.println("==============================================");
|
||||
System.out.println("===> Server is under load now. Kill it!");
|
||||
System.out.println("==============================================");
|
||||
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
|
||||
}
|
||||
}
|
||||
workers.clear();
|
||||
|
||||
// No errors should have occurred so far.
|
||||
if (errors.size() > 0) {
|
||||
throw errors.get(0);
|
||||
}
|
||||
|
||||
System.out.println("==============================================");
|
||||
System.out.println("===> Start the server now.");
|
||||
System.out.println("==============================================");
|
||||
reconnect();
|
||||
|
||||
System.out.println("Restarted.");
|
||||
|
||||
// Validate the all transactions were commited as a uow. Looking for
|
||||
// partial commits.
|
||||
for (int i = 0; i < workerCount; i++) {
|
||||
String workerId = "worker-" + i;
|
||||
Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 5 * 1000);
|
||||
workers.add(w);
|
||||
new Thread(w, "Consumer:" + workerId).start();
|
||||
}
|
||||
|
||||
System.out.println("Waiting for restarted consumers to finish consuming all messages..");
|
||||
for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) {
|
||||
Worker worker = iter.next();
|
||||
while (!worker.waitForExit(1000 * 5)) {
|
||||
System.out.println("Waiting for restarted consumers to finish consuming all messages..");
|
||||
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
|
||||
}
|
||||
}
|
||||
workers.clear();
|
||||
|
||||
System.out.println("Workers finished..");
|
||||
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
|
||||
|
||||
if (errors.size() > 0) {
|
||||
throw errors.get(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
try {
|
||||
AcidTestTool tool = new AcidTestTool();
|
||||
tool.setUp();
|
||||
tool.testAcidTransactions();
|
||||
tool.tearDown();
|
||||
} catch (Throwable e) {
|
||||
System.out.println("Test Failed: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,133 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tool;
|
||||
|
||||
import java.io.IOException;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
/**
|
||||
* A simple tool for consuming messages
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class ConsumerTool extends ToolSupport implements MessageListener {
|
||||
|
||||
protected int count;
|
||||
protected int dumpCount = 10;
|
||||
protected boolean verbose = true;
|
||||
protected int maxiumMessages;
|
||||
private boolean pauseBeforeShutdown;
|
||||
|
||||
public static void main(String[] args) {
|
||||
ConsumerTool tool = new ConsumerTool();
|
||||
if (args.length > 0) {
|
||||
tool.url = args[0];
|
||||
}
|
||||
if (args.length > 1) {
|
||||
tool.topic = args[1].equalsIgnoreCase("true");
|
||||
}
|
||||
if (args.length > 2) {
|
||||
tool.subject = args[2];
|
||||
}
|
||||
if (args.length > 3) {
|
||||
tool.durable = args[3].equalsIgnoreCase("true");
|
||||
}
|
||||
if (args.length > 4) {
|
||||
tool.maxiumMessages = Integer.parseInt(args[4]);
|
||||
}
|
||||
tool.run();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
System.out.println("Connecting to URL: " + url);
|
||||
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
|
||||
System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
|
||||
|
||||
Connection connection = createConnection();
|
||||
Session session = createSession(connection);
|
||||
MessageConsumer consumer = null;
|
||||
if (durable && topic) {
|
||||
consumer = session.createDurableSubscriber((Topic)destination, consumerName);
|
||||
} else {
|
||||
consumer = session.createConsumer(destination);
|
||||
}
|
||||
if (maxiumMessages <= 0) {
|
||||
consumer.setMessageListener(this);
|
||||
}
|
||||
connection.start();
|
||||
|
||||
if (maxiumMessages > 0) {
|
||||
consumeMessagesAndClose(connection, session, consumer);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
if (message instanceof TextMessage) {
|
||||
TextMessage txtMsg = (TextMessage)message;
|
||||
if (verbose) {
|
||||
|
||||
String msg = txtMsg.getText();
|
||||
if (msg.length() > 50) {
|
||||
msg = msg.substring(0, 50) + "...";
|
||||
}
|
||||
|
||||
System.out.println("Received: " + msg);
|
||||
}
|
||||
} else {
|
||||
if (verbose) {
|
||||
System.out.println("Received: " + message);
|
||||
}
|
||||
}
|
||||
/*
|
||||
* if (++count % dumpCount == 0) { dumpStats(connection); }
|
||||
*/
|
||||
} catch (JMSException e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
|
||||
System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
|
||||
|
||||
for (int i = 0; i < maxiumMessages; i++) {
|
||||
Message message = consumer.receive();
|
||||
onMessage(message);
|
||||
}
|
||||
System.out.println("Closing connection");
|
||||
consumer.close();
|
||||
session.close();
|
||||
connection.close();
|
||||
if (pauseBeforeShutdown) {
|
||||
System.out.println("Press return to shut down");
|
||||
System.in.read();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tool;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Queue;
|
||||
import javax.naming.InitialContext;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class JndiProducerTool extends ProducerTool {
|
||||
|
||||
public static void main(String[] args) {
|
||||
runTool(args, new JndiProducerTool());
|
||||
}
|
||||
|
||||
protected Connection createConnection() throws Exception {
|
||||
InitialContext jndiContext = new InitialContext();
|
||||
|
||||
ConnectionFactory queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
|
||||
Connection connection = queueConnectionFactory.createConnection();
|
||||
destination = (Queue) jndiContext.lookup(subject);
|
||||
return connection;
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,131 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tool;
|
||||
|
||||
import java.util.Date;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
/**
|
||||
* A simple tool for publishing messages
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class ProducerTool extends ToolSupport {
|
||||
|
||||
protected int messageCount = 10;
|
||||
protected long sleepTime;
|
||||
protected boolean verbose = true;
|
||||
protected int messageSize = 255;
|
||||
|
||||
public static void main(String[] args) {
|
||||
runTool(args, new ProducerTool());
|
||||
}
|
||||
|
||||
protected static void runTool(String[] args, ProducerTool tool) {
|
||||
if (args.length > 0) {
|
||||
tool.url = args[0];
|
||||
}
|
||||
if (args.length > 1) {
|
||||
tool.topic = args[1].equalsIgnoreCase("true");
|
||||
}
|
||||
if (args.length > 2) {
|
||||
tool.subject = args[2];
|
||||
}
|
||||
if (args.length > 3) {
|
||||
tool.durable = args[3].equalsIgnoreCase("true");
|
||||
}
|
||||
if (args.length > 4) {
|
||||
tool.messageCount = Integer.parseInt(args[4]);
|
||||
}
|
||||
if (args.length > 5) {
|
||||
tool.messageSize = Integer.parseInt(args[5]);
|
||||
}
|
||||
tool.run();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
System.out.println("Connecting to URL: " + url);
|
||||
System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
|
||||
System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
|
||||
|
||||
Connection connection = createConnection();
|
||||
Session session = createSession(connection);
|
||||
MessageProducer producer = createProducer(session);
|
||||
// connection.start();
|
||||
|
||||
sendLoop(session, producer);
|
||||
|
||||
System.out.println("Done.");
|
||||
close(connection, session);
|
||||
} catch (Exception e) {
|
||||
System.out.println("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
protected MessageProducer createProducer(Session session) throws JMSException {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
if (durable) {
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
} else {
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
}
|
||||
return producer;
|
||||
}
|
||||
|
||||
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
|
||||
TextMessage message = session.createTextMessage(createMessageText(i));
|
||||
|
||||
if (verbose) {
|
||||
String msg = message.getText();
|
||||
if (msg.length() > 50) {
|
||||
msg = msg.substring(0, 50) + "...";
|
||||
}
|
||||
System.out.println("Sending message: " + msg);
|
||||
}
|
||||
|
||||
producer.send(message);
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
producer.send(session.createMessage());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param i
|
||||
* @return
|
||||
*/
|
||||
private String createMessageText(int index) {
|
||||
StringBuffer buffer = new StringBuffer(messageSize);
|
||||
buffer.append("Message: " + index + " sent at: " + new Date());
|
||||
if (buffer.length() > messageSize) {
|
||||
return buffer.substring(0, messageSize);
|
||||
}
|
||||
for (int i = buffer.length(); i < messageSize; i++) {
|
||||
buffer.append(' ');
|
||||
}
|
||||
return buffer.toString();
|
||||
}
|
||||
}
|
|
@ -1,81 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tool;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.util.IndentPrinter;
|
||||
|
||||
/**
|
||||
* Abstract base class useful for implementation inheritence
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class ToolSupport {
|
||||
|
||||
protected Destination destination;
|
||||
protected String subject = "TOOL.DEFAULT";
|
||||
protected boolean topic = true;
|
||||
protected String user = ActiveMQConnection.DEFAULT_USER;
|
||||
protected String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
|
||||
protected String url = ActiveMQConnection.DEFAULT_BROKER_URL;
|
||||
protected boolean transacted;
|
||||
protected boolean durable;
|
||||
protected String clientID = getClass().getName();
|
||||
protected int ackMode = Session.AUTO_ACKNOWLEDGE;
|
||||
protected String consumerName = "James";
|
||||
|
||||
protected Session createSession(Connection connection) throws Exception {
|
||||
if (durable) {
|
||||
connection.setClientID(clientID);
|
||||
}
|
||||
Session session = connection.createSession(transacted, ackMode);
|
||||
if (topic) {
|
||||
destination = session.createTopic(subject);
|
||||
} else {
|
||||
destination = session.createQueue(subject);
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
protected Connection createConnection() throws JMSException, Exception {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
|
||||
return connectionFactory.createConnection();
|
||||
}
|
||||
|
||||
protected void close(Connection connection, Session session) throws JMSException {
|
||||
// lets dump the stats
|
||||
dumpStats(connection);
|
||||
|
||||
if (session != null) {
|
||||
session.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected void dumpStats(Connection connection) {
|
||||
ActiveMQConnection c = (ActiveMQConnection)connection;
|
||||
c.getConnectionStats().dump(new IndentPrinter());
|
||||
}
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tool;
|
||||
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.bio.SocketConnector;
|
||||
import org.eclipse.jetty.webapp.WebAppContext;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public final class WebServer {
|
||||
|
||||
public static final int PORT = 8080;
|
||||
// public static final String WEBAPP_DIR = "target/activemq";
|
||||
public static final String WEBAPP_DIR = "src/webapp";
|
||||
public static final String WEBAPP_CTX = "/";
|
||||
|
||||
private WebServer() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Server server = new Server();
|
||||
Connector context = new SocketConnector();
|
||||
context.setServer(server);
|
||||
context.setPort(PORT);
|
||||
|
||||
String webappDir = WEBAPP_DIR;
|
||||
if (args.length > 0) {
|
||||
webappDir = args[0];
|
||||
}
|
||||
|
||||
WebAppContext webapp = new WebAppContext();
|
||||
webapp.setServer(server);
|
||||
webapp.setContextPath(WEBAPP_CTX);
|
||||
webapp.setResourceBase(webappDir);
|
||||
|
||||
server.setHandler(webapp);
|
||||
|
||||
server.setConnectors(new Connector[] {
|
||||
context
|
||||
});
|
||||
server.start();
|
||||
|
||||
}
|
||||
}
|
|
@ -1,145 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.util.oxm;
|
||||
|
||||
import java.io.Serializable;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.MessageTransformerSupport;
|
||||
|
||||
/**
|
||||
* Abstract class used as a base for implementing transformers from object to text messages (in XML/JSON format)
|
||||
* and vice versa using.
|
||||
* Supports plugging of custom marshallers
|
||||
*/
|
||||
public abstract class AbstractXMLMessageTransformer extends
|
||||
MessageTransformerSupport {
|
||||
|
||||
protected MessageTransform transformType;
|
||||
|
||||
/**
|
||||
* Defines the type of transformation. If XML (default), - producer
|
||||
* transformation transforms from Object to XML. - consumer transformation
|
||||
* transforms from XML to Object. If OBJECT, - producer transformation
|
||||
* transforms from XML to Object. - consumer transformation transforms from
|
||||
* Object to XML. If ADAPTIVE, - producer transformation transforms from
|
||||
* Object to XML, or XML to Object depending on the type of the original
|
||||
* message - consumer transformation transforms from XML to Object, or
|
||||
* Object to XML depending on the type of the original message
|
||||
*/
|
||||
public enum MessageTransform {
|
||||
XML, OBJECT, ADAPTIVE
|
||||
};
|
||||
|
||||
|
||||
public AbstractXMLMessageTransformer() {
|
||||
this(MessageTransform.XML);
|
||||
}
|
||||
|
||||
public AbstractXMLMessageTransformer(MessageTransform transformType) {
|
||||
this.transformType = transformType;
|
||||
}
|
||||
|
||||
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
|
||||
switch (transformType) {
|
||||
case XML:
|
||||
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message;
|
||||
case OBJECT:
|
||||
return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
|
||||
case ADAPTIVE:
|
||||
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
|
||||
default:
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
|
||||
switch (transformType) {
|
||||
case XML:
|
||||
return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
|
||||
case OBJECT:
|
||||
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message;
|
||||
case ADAPTIVE:
|
||||
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
|
||||
default:
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
public MessageTransform getTransformType() {
|
||||
return transformType;
|
||||
}
|
||||
|
||||
public void setTransformType(MessageTransform transformType) {
|
||||
this.transformType = transformType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms an incoming XML encoded {@link TextMessage} to an
|
||||
* {@link ObjectMessage}
|
||||
*
|
||||
* @param session - JMS session currently being used
|
||||
* @param textMessage - text message to transform to object message
|
||||
* @return ObjectMessage
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected ObjectMessage textToObject(Session session, TextMessage textMessage) throws JMSException {
|
||||
Object object = unmarshall(session, textMessage);
|
||||
if (object instanceof Serializable) {
|
||||
ObjectMessage answer = session.createObjectMessage((Serializable)object);
|
||||
copyProperties(textMessage, answer);
|
||||
return answer;
|
||||
} else {
|
||||
throw new JMSException("Object is not serializable: " + object);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms an incoming {@link ObjectMessage} to an XML encoded
|
||||
* {@link TextMessage}
|
||||
*
|
||||
* @param session - JMS session currently being used
|
||||
* @param objectMessage - object message to transform to text message
|
||||
* @return XML encoded TextMessage
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected TextMessage objectToText(Session session, ObjectMessage objectMessage) throws JMSException {
|
||||
TextMessage answer = session.createTextMessage(marshall(session, objectMessage));
|
||||
copyProperties(objectMessage, answer);
|
||||
return answer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshalls the Object in the {@link ObjectMessage} to a string using XML
|
||||
* encoding
|
||||
*/
|
||||
protected abstract String marshall(Session session, ObjectMessage objectMessage) throws JMSException;
|
||||
|
||||
/**
|
||||
* Unmarshalls the XML encoded message in the {@link TextMessage} to an
|
||||
* Object
|
||||
*/
|
||||
protected abstract Object unmarshall(Session session, TextMessage textMessage) throws JMSException;
|
||||
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.util.oxm;
|
||||
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.xml.transform.Result;
|
||||
import javax.xml.transform.Source;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
|
||||
import org.springframework.oxm.support.AbstractMarshaller;
|
||||
|
||||
|
||||
/**
|
||||
* Transforms object messages to text messages and vice versa using Spring OXM.
|
||||
*
|
||||
*/
|
||||
public class OXMMessageTransformer extends AbstractXMLMessageTransformer {
|
||||
|
||||
/**
|
||||
* OXM marshaller used to marshall/unmarshall messages
|
||||
*/
|
||||
private AbstractMarshaller marshaller;
|
||||
|
||||
public AbstractMarshaller getMarshaller() {
|
||||
return marshaller;
|
||||
}
|
||||
|
||||
public void setMarshaller(AbstractMarshaller marshaller) {
|
||||
this.marshaller = marshaller;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshalls the Object in the {@link ObjectMessage} to a string using XML
|
||||
* encoding
|
||||
*/
|
||||
protected String marshall(Session session, ObjectMessage objectMessage)
|
||||
throws JMSException {
|
||||
try {
|
||||
StringWriter writer = new StringWriter();
|
||||
Result result = new StreamResult(writer);
|
||||
marshaller.marshal(objectMessage.getObject(), result);
|
||||
writer.flush();
|
||||
return writer.toString();
|
||||
} catch (Exception e) {
|
||||
throw new JMSException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmarshalls the XML encoded message in the {@link TextMessage} to an
|
||||
* Object
|
||||
*/
|
||||
protected Object unmarshall(Session session, TextMessage textMessage)
|
||||
throws JMSException {
|
||||
try {
|
||||
String text = textMessage.getText();
|
||||
Source source = new StreamSource(new StringReader(text));
|
||||
return marshaller.unmarshal(source);
|
||||
} catch (Exception e) {
|
||||
throw new JMSException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,109 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.util.oxm;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import com.thoughtworks.xstream.XStream;
|
||||
import com.thoughtworks.xstream.io.HierarchicalStreamDriver;
|
||||
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
|
||||
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
|
||||
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
|
||||
import com.thoughtworks.xstream.io.xml.XppReader;
|
||||
|
||||
import org.xmlpull.mxp1.MXParser;
|
||||
|
||||
/**
|
||||
* Transforms object messages to text messages and vice versa using
|
||||
* {@link XStream}
|
||||
*
|
||||
*/
|
||||
public class XStreamMessageTransformer extends AbstractXMLMessageTransformer {
|
||||
|
||||
private XStream xStream;
|
||||
|
||||
/**
|
||||
* Specialized driver to be used with stream readers and writers
|
||||
*/
|
||||
private HierarchicalStreamDriver streamDriver;
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
public XStream getXStream() {
|
||||
if (xStream == null) {
|
||||
xStream = createXStream();
|
||||
}
|
||||
return xStream;
|
||||
}
|
||||
|
||||
public void setXStream(XStream xStream) {
|
||||
this.xStream = xStream;
|
||||
}
|
||||
|
||||
public HierarchicalStreamDriver getStreamDriver() {
|
||||
return streamDriver;
|
||||
}
|
||||
|
||||
public void setStreamDriver(HierarchicalStreamDriver streamDriver) {
|
||||
this.streamDriver = streamDriver;
|
||||
}
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
protected XStream createXStream() {
|
||||
return new XStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshalls the Object in the {@link ObjectMessage} to a string using XML
|
||||
* encoding
|
||||
*/
|
||||
protected String marshall(Session session, ObjectMessage objectMessage) throws JMSException {
|
||||
Serializable object = objectMessage.getObject();
|
||||
StringWriter buffer = new StringWriter();
|
||||
HierarchicalStreamWriter out;
|
||||
if (streamDriver != null) {
|
||||
out = streamDriver.createWriter(buffer);
|
||||
} else {
|
||||
out = new PrettyPrintWriter(buffer);
|
||||
}
|
||||
getXStream().marshal(object, out);
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmarshalls the XML encoded message in the {@link TextMessage} to an
|
||||
* Object
|
||||
*/
|
||||
protected Object unmarshall(Session session, TextMessage textMessage) throws JMSException {
|
||||
HierarchicalStreamReader in;
|
||||
if (streamDriver != null) {
|
||||
in = streamDriver.createReader(new StringReader(textMessage.getText()));
|
||||
} else {
|
||||
in = new XppReader(new StringReader(textMessage.getText()), new MXParser());
|
||||
}
|
||||
return getXStream().unmarshal(in);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,237 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.util.oxm;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||
import org.apache.activemq.MessageTransformer;
|
||||
import org.apache.activemq.util.xstream.SamplePojo;
|
||||
|
||||
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.ADAPTIVE;
|
||||
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.OBJECT;
|
||||
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.XML;
|
||||
|
||||
public abstract class AbstractXMLMessageTransformerTest extends TestCase {
|
||||
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
|
||||
protected Connection connection;
|
||||
protected long timeout = 5000;
|
||||
|
||||
protected Connection createConnection(MessageTransformer transformer) throws Exception {
|
||||
connectionFactory.setTransformer(transformer);
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
return connection;
|
||||
}
|
||||
|
||||
protected abstract AbstractXMLMessageTransformer createTransformer();
|
||||
|
||||
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
|
||||
AbstractXMLMessageTransformer transformer = createTransformer();
|
||||
transformer.setTransformType(XML);
|
||||
connection = createConnection(transformer);
|
||||
|
||||
// lets create the consumers
|
||||
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = objectSession.createTopic(getClass().getName());
|
||||
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
|
||||
|
||||
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer textConsumer = textSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer)textConsumer).setTransformer(null);
|
||||
|
||||
// send a message
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
ObjectMessage request = producerSession.createObjectMessage(new SamplePojo("James", "London"));
|
||||
producer.send(request);
|
||||
|
||||
// lets consume it as an object message
|
||||
Message message = objectConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
ObjectMessage objectMessage = (ObjectMessage)message;
|
||||
Object object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
SamplePojo body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// lets consume it as a text message
|
||||
message = textConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
TextMessage textMessage = (TextMessage)message;
|
||||
String text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
System.out.println("Received XML...");
|
||||
System.out.println(text);
|
||||
}
|
||||
|
||||
public void testSendTextMessageReceiveAsObjectMessageAndTextMessage() throws Exception {
|
||||
AbstractXMLMessageTransformer transformer = createTransformer();
|
||||
transformer.setTransformType(OBJECT);
|
||||
connection = createConnection(transformer);
|
||||
|
||||
// lets create the consumers
|
||||
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = textSession.createTopic(getClass().getName());
|
||||
MessageConsumer textConsumer = textSession.createConsumer(destination);
|
||||
|
||||
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer)objectConsumer).setTransformer(null);
|
||||
|
||||
// send a message
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
|
||||
+ "<name>James</name>"
|
||||
+ "<city>London</city>"
|
||||
+ "</org.apache.activemq.util.xstream.SamplePojo>";
|
||||
|
||||
TextMessage request = producerSession.createTextMessage(xmlText);
|
||||
producer.send(request);
|
||||
|
||||
Message message;
|
||||
// lets consume it as a text message
|
||||
message = textConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
TextMessage textMessage = (TextMessage)message;
|
||||
String text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = objectConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
ObjectMessage objectMessage = (ObjectMessage)message;
|
||||
Object object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
SamplePojo body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
}
|
||||
|
||||
public void testAdaptiveTransform() throws Exception {
|
||||
AbstractXMLMessageTransformer transformer = createTransformer();
|
||||
transformer.setTransformType(ADAPTIVE);
|
||||
connection = createConnection(transformer);
|
||||
|
||||
// lets create the consumers
|
||||
Session adaptiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = adaptiveSession.createTopic(getClass().getName());
|
||||
MessageConsumer adaptiveConsumer = adaptiveSession.createConsumer(destination);
|
||||
|
||||
Session origSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer origConsumer = origSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer)origConsumer).setTransformer(null);
|
||||
|
||||
// Create producer
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
Message message;
|
||||
ObjectMessage objectMessage;
|
||||
TextMessage textMessage;
|
||||
SamplePojo body;
|
||||
Object object;
|
||||
String text;
|
||||
|
||||
// Send a text message
|
||||
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
|
||||
+ "<name>James</name>"
|
||||
+ "<city>London</city>"
|
||||
+ "</org.apache.activemq.util.xstream.SamplePojo>";
|
||||
|
||||
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
|
||||
producer.send(txtRequest);
|
||||
|
||||
// lets consume it as a text message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
textMessage = (TextMessage)message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage)message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// Send object message
|
||||
ObjectMessage objRequest = producerSession.createObjectMessage(new SamplePojo("James", "London"));
|
||||
producer.send(objRequest);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage)message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// lets consume it as a text message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
textMessage = (TextMessage)message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
System.out.println("Received XML...");
|
||||
System.out.println(text);
|
||||
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.util.oxm;
|
||||
|
||||
import org.springframework.oxm.xstream.XStreamMarshaller;
|
||||
|
||||
public class OXMMessageTransformTest extends AbstractXMLMessageTransformerTest {
|
||||
|
||||
protected AbstractXMLMessageTransformer createTransformer() {
|
||||
OXMMessageTransformer transformer = new OXMMessageTransformer();
|
||||
transformer.setMarshaller(new XStreamMarshaller());
|
||||
return transformer;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,135 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.util.oxm;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
|
||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||
import org.apache.activemq.util.xstream.SamplePojo;
|
||||
|
||||
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.ADAPTIVE;
|
||||
|
||||
public class XStreamMessageTransformTest extends
|
||||
AbstractXMLMessageTransformerTest {
|
||||
|
||||
protected AbstractXMLMessageTransformer createTransformer() {
|
||||
return new XStreamMessageTransformer();
|
||||
}
|
||||
|
||||
public void testStreamDriverTransform() throws Exception {
|
||||
XStreamMessageTransformer transformer = (XStreamMessageTransformer) createTransformer();
|
||||
transformer.setTransformType(ADAPTIVE);
|
||||
transformer.setStreamDriver(new JettisonMappedXmlDriver());
|
||||
connection = createConnection(transformer);
|
||||
|
||||
// lets create the consumers
|
||||
Session adaptiveSession = connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = adaptiveSession.createTopic(getClass()
|
||||
.getName());
|
||||
MessageConsumer adaptiveConsumer = adaptiveSession
|
||||
.createConsumer(destination);
|
||||
|
||||
Session origSession = connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer origConsumer = origSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer) origConsumer).setTransformer(null);
|
||||
|
||||
// Create producer
|
||||
Session producerSession = connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
Message message;
|
||||
ObjectMessage objectMessage;
|
||||
TextMessage textMessage;
|
||||
SamplePojo body;
|
||||
Object object;
|
||||
String text;
|
||||
|
||||
// Send a text message
|
||||
String xmlText = "{\"org.apache.activemq.util.xstream.SamplePojo\":{\"name\":\"James\",\"city\":\"London\"}}";
|
||||
|
||||
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
|
||||
producer.send(txtRequest);
|
||||
|
||||
// lets consume it as a text message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message,
|
||||
message instanceof TextMessage);
|
||||
textMessage = (TextMessage) message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null
|
||||
&& text.length() > 0);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message,
|
||||
message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage) message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object,
|
||||
object instanceof SamplePojo);
|
||||
body = (SamplePojo) object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// Send object message
|
||||
ObjectMessage objRequest = producerSession
|
||||
.createObjectMessage(new SamplePojo("James", "London"));
|
||||
producer.send(objRequest);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message,
|
||||
message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage) message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object,
|
||||
object instanceof SamplePojo);
|
||||
body = (SamplePojo) object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// lets consume it as a text message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message,
|
||||
message instanceof TextMessage);
|
||||
textMessage = (TextMessage) message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null
|
||||
&& text.length() > 0);
|
||||
System.out.println("Received JSON...");
|
||||
System.out.println(text);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.util.xstream;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class SamplePojo implements Serializable {
|
||||
private String name;
|
||||
private String city;
|
||||
|
||||
public SamplePojo() {
|
||||
}
|
||||
|
||||
public SamplePojo(String name, String city) {
|
||||
this.name = name;
|
||||
this.city = city;
|
||||
}
|
||||
|
||||
|
||||
public String getCity() {
|
||||
return city;
|
||||
}
|
||||
|
||||
public void setCity(String city) {
|
||||
this.city = city;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
}
|
|
@ -1,315 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.util.xstream;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||
|
||||
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.ADAPTIVE;
|
||||
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.OBJECT;
|
||||
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.XML;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class XStreamTransformTest extends TestCase {
|
||||
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
|
||||
protected Connection connection;
|
||||
protected long timeout = 5000;
|
||||
|
||||
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
|
||||
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
|
||||
transformer.setTransformType(XML);
|
||||
connectionFactory.setTransformer(transformer);
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
|
||||
// lets create the consumers
|
||||
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = objectSession.createTopic(getClass().getName());
|
||||
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
|
||||
|
||||
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer textConsumer = textSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer)textConsumer).setTransformer(null);
|
||||
|
||||
// send a message
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
ObjectMessage request = producerSession.createObjectMessage(new SamplePojo("James", "London"));
|
||||
producer.send(request);
|
||||
|
||||
// lets consume it as an object message
|
||||
Message message = objectConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
ObjectMessage objectMessage = (ObjectMessage)message;
|
||||
Object object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
SamplePojo body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// lets consume it as a text message
|
||||
message = textConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
TextMessage textMessage = (TextMessage)message;
|
||||
String text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
System.out.println("Received XML...");
|
||||
System.out.println(text);
|
||||
}
|
||||
|
||||
public void testSendTextMessageReceiveAsObjectMessageAndTextMessage() throws Exception {
|
||||
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
|
||||
transformer.setTransformType(OBJECT);
|
||||
connectionFactory.setTransformer(transformer);
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
|
||||
// lets create the consumers
|
||||
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = textSession.createTopic(getClass().getName());
|
||||
MessageConsumer textConsumer = textSession.createConsumer(destination);
|
||||
|
||||
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer)objectConsumer).setTransformer(null);
|
||||
|
||||
// send a message
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
|
||||
+ "<name>James</name>"
|
||||
+ "<city>London</city>"
|
||||
+ "</org.apache.activemq.util.xstream.SamplePojo>";
|
||||
|
||||
TextMessage request = producerSession.createTextMessage(xmlText);
|
||||
producer.send(request);
|
||||
|
||||
Message message;
|
||||
// lets consume it as a text message
|
||||
message = textConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
TextMessage textMessage = (TextMessage)message;
|
||||
String text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = objectConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
ObjectMessage objectMessage = (ObjectMessage)message;
|
||||
Object object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
SamplePojo body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
}
|
||||
|
||||
public void testAdaptiveTransform() throws Exception {
|
||||
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
|
||||
transformer.setTransformType(ADAPTIVE);
|
||||
connectionFactory.setTransformer(transformer);
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
|
||||
// lets create the consumers
|
||||
Session adaptiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = adaptiveSession.createTopic(getClass().getName());
|
||||
MessageConsumer adaptiveConsumer = adaptiveSession.createConsumer(destination);
|
||||
|
||||
Session origSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer origConsumer = origSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer)origConsumer).setTransformer(null);
|
||||
|
||||
// Create producer
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
Message message;
|
||||
ObjectMessage objectMessage;
|
||||
TextMessage textMessage;
|
||||
SamplePojo body;
|
||||
Object object;
|
||||
String text;
|
||||
|
||||
// Send a text message
|
||||
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
|
||||
+ "<name>James</name>"
|
||||
+ "<city>London</city>"
|
||||
+ "</org.apache.activemq.util.xstream.SamplePojo>";
|
||||
|
||||
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
|
||||
producer.send(txtRequest);
|
||||
|
||||
// lets consume it as a text message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
textMessage = (TextMessage)message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage)message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// Send object message
|
||||
ObjectMessage objRequest = producerSession.createObjectMessage(new SamplePojo("James", "London"));
|
||||
producer.send(objRequest);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage)message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// lets consume it as a text message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
textMessage = (TextMessage)message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
System.out.println("Received XML...");
|
||||
System.out.println(text);
|
||||
|
||||
}
|
||||
|
||||
public void testStreamDriverTransform() throws Exception {
|
||||
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
|
||||
transformer.setTransformType(ADAPTIVE);
|
||||
transformer.setStreamDriver(new JettisonMappedXmlDriver());
|
||||
connectionFactory.setTransformer(transformer);
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
|
||||
// lets create the consumers
|
||||
Session adaptiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = adaptiveSession.createTopic(getClass().getName());
|
||||
MessageConsumer adaptiveConsumer = adaptiveSession.createConsumer(destination);
|
||||
|
||||
Session origSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer origConsumer = origSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as
|
||||
// it really is
|
||||
((ActiveMQMessageConsumer)origConsumer).setTransformer(null);
|
||||
|
||||
// Create producer
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
Message message;
|
||||
ObjectMessage objectMessage;
|
||||
TextMessage textMessage;
|
||||
SamplePojo body;
|
||||
Object object;
|
||||
String text;
|
||||
|
||||
// Send a text message
|
||||
String xmlText = "{\"org.apache.activemq.util.xstream.SamplePojo\":{\"name\":\"James\",\"city\":\"London\"}}";
|
||||
|
||||
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
|
||||
producer.send(txtRequest);
|
||||
|
||||
// lets consume it as a text message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
textMessage = (TextMessage)message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage)message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// Send object message
|
||||
ObjectMessage objRequest = producerSession.createObjectMessage(new SamplePojo("James", "London"));
|
||||
producer.send(objRequest);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = adaptiveConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
objectMessage = (ObjectMessage)message;
|
||||
object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
body = (SamplePojo)object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
// lets consume it as a text message
|
||||
message = origConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
textMessage = (TextMessage)message;
|
||||
text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
System.out.println("Received JSON...");
|
||||
System.out.println(text);
|
||||
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
## ---------------------------------------------------------------------------
|
||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
## contributor license agreements. See the NOTICE file distributed with
|
||||
## this work for additional information regarding copyright ownership.
|
||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
## (the "License"); you may not use this file except in compliance with
|
||||
## the License. You may obtain a copy of the License at
|
||||
##
|
||||
## http://www.apache.org/licenses/LICENSE-2.0
|
||||
##
|
||||
## Unless required by applicable law or agreed to in writing, software
|
||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
## See the License for the specific language governing permissions and
|
||||
## limitations under the License.
|
||||
## ---------------------------------------------------------------------------
|
||||
|
||||
#
|
||||
# The logging properties used during tests..
|
||||
#
|
||||
log4j.rootLogger=INFO, out, stdout
|
||||
|
||||
log4j.logger.org.apache.activemq.spring=WARN
|
||||
#log4j.logger.org.apache.activemq=DEBUG
|
||||
#log4j.logger.org.eclipse.jetty.io.nio.ssl=DEBUG
|
||||
#log4j.logger.org.apache.http=INFO
|
||||
|
||||
# CONSOLE appender not used by default
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c - %m%n
|
||||
|
||||
# File appender
|
||||
log4j.appender.out=org.apache.log4j.FileAppender
|
||||
log4j.appender.out.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
|
||||
log4j.appender.out.file=target/activemq-test.log
|
||||
log4j.appender.out.append=true
|
Loading…
Reference in New Issue