Merge pull request #606 from jbonofre/CLEANUP

Cleanup some unused dependencies and trash
This commit is contained in:
Jean-Baptiste Onofré 2021-01-09 07:31:23 +01:00 committed by GitHub
commit 5bcfd53470
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 0 additions and 2863 deletions

70
pom.xml
View File

@ -41,14 +41,10 @@
<activemq-protobuf-version>1.1</activemq-protobuf-version>
<activesoap-version>1.3</activesoap-version>
<annogen-version>0.1.0</annogen-version>
<ant-bundle-version>1.10.7_1</ant-bundle-version>
<aopalliance-version>1.0</aopalliance-version>
<aries-version>1.1.0</aries-version>
<aries-transaction-version>1.1.1</aries-transaction-version>
<axion-version>1.0-M3-dev</axion-version>
<camel-version>2.25.2</camel-version>
<camel-version-range>[2.20,3)</camel-version-range>
<cglib-version>2.2</cglib-version>
<commons-beanutils-version>1.9.4</commons-beanutils-version>
<commons-collections-version>3.2.2</commons-collections-version>
<commons-daemon-version>1.2.3</commons-daemon-version>
@ -62,13 +58,10 @@
<directory-version>2.0.0.AM25</directory-version>
<ecj.version>3.17.0</ecj.version>
<ftpserver-version>1.1.1</ftpserver-version>
<geronimo-version>1.0</geronimo-version>
<guava-version>28.2-jre</guava-version>
<hadoop-version>1.2.1</hadoop-version>
<hawtbuf-version>1.11</hawtbuf-version>
<hawtdispatch-version>1.22</hawtdispatch-version>
<howl-version>0.1.8</howl-version>
<hsqldb-version>1.8.0.12</hsqldb-version>
<httpclient-version>4.5.13</httpclient-version>
<httpcore-version>4.4.13</httpcore-version>
<insight-version>1.2.0.Beta4</insight-version>
@ -88,7 +81,6 @@
<json-simple-version>1.1.1</json-simple-version>
<junit-version>4.13.1</junit-version>
<hamcrest-version>1.3</hamcrest-version>
<jxta-version>2.0</jxta-version>
<karaf-version>4.2.10</karaf-version>
<leveldb-api-version>0.9</leveldb-api-version>
<leveldb-version>0.9</leveldb-version>
@ -98,10 +90,8 @@
<owasp-dependency-check-version>5.2.4</owasp-dependency-check-version>
<powermock-version>1.6.5</powermock-version>
<mqtt-client-version>1.16</mqtt-client-version>
<openjpa-version>1.2.0</openjpa-version>
<org-apache-derby-version>10.14.2.0</org-apache-derby-version>
<osgi-version>6.0.0</osgi-version>
<p2psockets-version>1.1.2</p2psockets-version>
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.14</zookeeper-version>
<qpid-proton-version>0.33.8</qpid-proton-version>
@ -111,8 +101,6 @@
<netty-all-version>4.1.53.Final</netty-all-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.15.0</rome-version>
<saxon-version>9.5.1-5</saxon-version>
<saxon-bundle-version>9.5.1-5_1</saxon-bundle-version>
<scala-plugin-version>3.1.0</scala-plugin-version>
<scala-version>2.11.11</scala-version>
<shiro-version>1.7.0</shiro-version>
@ -123,8 +111,6 @@
<taglibs-version>1.2.5</taglibs-version>
<velocity-version>2.2</velocity-version>
<xalan-version>2.7.2</xalan-version>
<xmlbeans-version>3.1.0</xmlbeans-version>
<xmlbeans-bundle-version>2.6.0_2</xmlbeans-bundle-version>
<xpp3-version>1.1.4c</xpp3-version>
<xstream-version>1.4.15</xstream-version>
<xbean-version>4.18</xbean-version>
@ -693,12 +679,6 @@
<version>${commons-collections-version}</version>
</dependency>
<dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-persistence-jdbc</artifactId>
<version>${openjpa-version}</version>
</dependency>
<!-- Optional Shiro Support -->
<dependency>
<groupId>org.apache.shiro</groupId>
@ -842,15 +822,6 @@
<version>${regexp-version}</version>
</dependency>
<!-- Optional HSQL DB Support -->
<!--
<dependency>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>${hsqldb-version}</version>
</dependency>
-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
@ -863,15 +834,6 @@
<version>${commons-pool2-version}</version>
</dependency>
<!-- Optional Journal Implementation -->
<!--
<dependency>
<groupId>howl</groupId>
<artifactId>howl-logger</artifactId>
<version>${howl-version}</version>
</dependency>
-->
<!-- Optional Jabber support -->
<dependency>
<groupId>activemq</groupId>
@ -885,21 +847,6 @@
<version>1.5.0</version>
</dependency>
<!-- =============================== -->
<!-- XML processing dependencies -->
<!-- =============================== -->
<!-- For XMLBeans -->
<dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans</artifactId>
<version>${xmlbeans-version}</version>
</dependency>
<dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans-xpath</artifactId>
<version>${xmlbeans-version}</version>
</dependency>
<!-- To use XPath using JAXP 1.3 (std in Java 5) -->
<dependency>
<groupId>activesoap</groupId>
@ -991,12 +938,6 @@
<version>${taglibs-version}</version>
</dependency>
<dependency>
<groupId>aopalliance</groupId>
<artifactId>aopalliance</artifactId>
<version>${aopalliance-version}</version>
</dependency>
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt</artifactId>
@ -1088,17 +1029,6 @@
<version>${mqtt-client-version}</version>
</dependency>
<dependency>
<groupId>p2psockets</groupId>
<artifactId>p2psockets-core</artifactId>
<version>${p2psockets-version}</version>
</dependency>
<dependency>
<groupId>jxta</groupId>
<artifactId>jxta</artifactId>
<version>${jxta-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -1,249 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.9-SNAPSHOT</version>
</parent>
<artifactId>activemq-optional</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ :: Optional</name>
<description>Optional ActiveMQ features</description>
<dependencies>
<!-- activemq -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-console</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>aopalliance</groupId>
<artifactId>aopalliance</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
</dependency>
<dependency>
<groupId>xpp3</groupId>
<artifactId>xpp3</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty-version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
<version>${jetty-version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>test</scope>
</dependency>
<!-- log4j jms appender and test tool needs this -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans-xpath</artifactId>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>net.sf.saxon</groupId>
<artifactId>saxon</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
<version>${saxon-version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
<version>${xerces-version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>xalan</groupId>
<artifactId>xalan</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<scope>test</scope>
</dependency>
<!--
TODO: Not needed, but OSGi bundle on the way: https://issues.apache.org/jira/browse/SMX4-1238
<dependency>
<groupId>net.sf.josql</groupId>
<artifactId>gentlyweb-utils</artifactId>
<version>1.5</version>
</dependency>
-->
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-java</artifactId>
<version>2.25.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-chrome-driver</artifactId>
<version>2.25.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-firefox-driver</artifactId>
<version>2.25.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Fragment-Host>org.apache.activemq.activemq-core</Fragment-Host>
<Export-Package>
org.apache.activemq.transport.http*;version=${project.version};-noimport:=;-split-package:=merge-last,
org.apache.activemq.transport.https*;version=${project.version};-noimport:=;-split-package:=merge-last
</Export-Package>
<Import-Package>
org.eclipse.jetty*;version="[7.6,8.0)";resolution:=optional,
!org.apache.activemq.transport.ws*;version=${project.version},
!org.apache.activemq.transport.xstream;version=${project.version},
!org.apache.activemq.transport.util;version=${project.version},
org.apache.activemq*;version=${project.version};resolution:=optional
</Import-Package>
</instructions>
</configuration>
<executions>
<execution>
<id>bundle-manifest</id>
<phase>process-classes</phase>
<goals>
<goal>manifest</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Configure which tests are included/excuded -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,220 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.benchmark;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IdGenerator;
/**
* Abstract base class for some simple benchmark tools
*/
public class BenchmarkSupport {
protected int connectionCount = 1;
protected int batch = 1000;
protected Destination destination;
protected String[] subjects;
private boolean topic = true;
private boolean durable;
private ActiveMQConnectionFactory factory;
private String url;
private int counter;
private List<Object> resources = new ArrayList<Object>();
private NumberFormat formatter = NumberFormat.getInstance();
private AtomicInteger connectionCounter = new AtomicInteger(0);
private IdGenerator idGenerator = new IdGenerator();
private boolean timerLoop;
public BenchmarkSupport() {
}
public void start() {
System.out.println("Using: " + connectionCount + " connection(s)");
subjects = new String[connectionCount];
for (int i = 0; i < connectionCount; i++) {
subjects[i] = "BENCHMARK.FEED" + i;
}
if (useTimerLoop()) {
Thread timer = new Thread() {
public void run() {
timerLoop();
}
};
timer.start();
}
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public boolean isTopic() {
return topic;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public ActiveMQConnectionFactory getFactory() {
return factory;
}
public void setFactory(ActiveMQConnectionFactory factory) {
this.factory = factory;
}
public void setSubject(String subject) {
connectionCount = 1;
subjects = new String[] {
subject
};
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public int getConnectionCount() {
return connectionCount;
}
public void setConnectionCount(int connectionCount) {
this.connectionCount = connectionCount;
}
protected Session createSession() throws JMSException {
if (factory == null) {
factory = createFactory();
}
Connection connection = factory.createConnection();
int value = connectionCounter.incrementAndGet();
System.out.println("Created connection: " + value + " = " + connection);
if (durable) {
connection.setClientID(idGenerator.generateId());
}
addResource(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
addResource(session);
return session;
}
protected ActiveMQConnectionFactory createFactory() {
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(getUrl());
return answer;
}
protected synchronized void count(int count) {
counter += count;
/*
* if (counter > batch) { counter = 0; long current =
* System.currentTimeMillis(); double end = current - time; end /= 1000;
* time = current; System.out.println("Processed " + batch + " messages
* in " + end + " (secs)"); }
*/
}
protected synchronized int resetCount() {
int answer = counter;
counter = 0;
return answer;
}
protected void timerLoop() {
int times = 0;
int total = 0;
int dumpVmStatsFrequency = 10;
Runtime runtime = Runtime.getRuntime();
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int processed = resetCount();
double average = 0;
if (processed > 0) {
total += processed;
times++;
}
if (times > 0) {
average = total / (double) times;
}
System.out.println(getClass().getName() + " Processed: " + processed + " messages this second. Average: " + average);
if ((times % dumpVmStatsFrequency) == 0 && times != 0) {
System.out.println("Used memory: " + asMemoryString(runtime.totalMemory() - runtime.freeMemory()) + " Free memory: " + asMemoryString(runtime.freeMemory()) + " Total memory: "
+ asMemoryString(runtime.totalMemory()) + " Max memory: " + asMemoryString(runtime.maxMemory()));
}
}
}
protected String asMemoryString(long value) {
return formatter.format(value / 1024) + " K";
}
protected boolean useTimerLoop() {
return timerLoop;
}
protected Destination createDestination(Session session, String subject) throws JMSException {
if (topic) {
return session.createTopic(subject);
} else {
return session.createQueue(subject);
}
}
protected void addResource(Object resource) {
resources.add(resource);
}
public int getCounter() {
return counter;
}
public void setTimerLoop(boolean timerLoop) {
this.timerLoop = timerLoop;
}
protected static boolean parseBoolean(String text) {
return text.equalsIgnoreCase("true");
}
}

View File

@ -1,104 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.benchmark;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
public class Consumer extends BenchmarkSupport implements MessageListener {
public Consumer() {
}
public static void main(String[] args) {
Consumer tool = new Consumer();
if (args.length > 0) {
tool.setUrl(args[0]);
}
if (args.length > 1) {
tool.setTopic(parseBoolean(args[1]));
}
if (args.length > 2) {
tool.setSubject(args[2]);
}
if (args.length > 3) {
tool.setDurable(parseBoolean(args[3]));
}
if (args.length > 4) {
tool.setConnectionCount(Integer.parseInt(args[4]));
}
try {
tool.run();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void run() throws JMSException {
start();
subscribe();
}
protected void subscribe() throws JMSException {
for (int i = 0; i < subjects.length; i++) {
subscribe(subjects[i]);
}
}
protected void subscribe(String subject) throws JMSException {
Session session = createSession();
Destination destination = createDestination(session, subject);
System.out.println("Consuming on : " + destination + " of type: " + destination.getClass().getName());
MessageConsumer consumer = null;
if (isDurable() && isTopic()) {
consumer = session.createDurableSubscriber((Topic)destination, getClass().getName());
} else {
consumer = session.createConsumer(destination);
}
consumer.setMessageListener(this);
addResource(consumer);
}
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage)message;
// lets force the content to be deserialized
textMessage.getText();
count(1);
// lets count the messages
// message.acknowledge();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

View File

@ -1,183 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.benchmark;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
public class Producer extends BenchmarkSupport {
int loops = -1;
int loopSize = 1000;
private int messageSize = 1000;
public Producer() {
}
public static void main(String[] args) {
Producer tool = new Producer();
if (args.length > 0) {
tool.setUrl(args[0]);
}
if (args.length > 1) {
tool.setTopic(parseBoolean(args[1]));
}
if (args.length > 2) {
tool.setSubject(args[2]);
}
if (args.length > 3) {
tool.setDurable(parseBoolean(args[3]));
}
if (args.length > 4) {
tool.setMessageSize(Integer.parseInt(args[4]));
}
if (args.length > 5) {
tool.setConnectionCount(Integer.parseInt(args[5]));
}
try {
tool.run();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void run() throws Exception {
start();
publish();
}
// Properties
// -------------------------------------------------------------------------
public int getMessageSize() {
return messageSize;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public int getLoopSize() {
return loopSize;
}
public void setLoopSize(int loopSize) {
this.loopSize = loopSize;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void publish() throws Exception {
final String text = getMessage();
System.out.println("Publishing to: " + subjects.length + " subject(s)");
for (int i = 0; i < subjects.length; i++) {
final String subject = subjects[i];
Thread thread = new Thread() {
public void run() {
try {
publish(text, subject);
} catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
};
thread.start();
}
}
protected String getMessage() {
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < messageSize; i++) {
char ch = 'X';
buffer.append(ch);
}
return buffer.toString();
}
protected void publish(String text, String subject) throws JMSException {
Session session = createSession();
Destination destination = createDestination(session, subject);
MessageProducer publisher = session.createProducer(destination);
if (isDurable()) {
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
System.out.println("Starting publisher on : " + destination + " of type: " + destination.getClass().getName());
System.out.println("Message length: " + text.length());
if (loops <= 0) {
while (true) {
publishLoop(session, publisher, text);
}
} else {
for (int i = 0; i < loops; i++) {
publishLoop(session, publisher, text);
}
}
}
protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException {
for (int i = 0; i < loopSize; i++) {
Message message = session.createTextMessage(text);
publisher.send(message);
count(1);
}
}
protected String loadFile(String file) throws IOException {
System.out.println("Loading file: " + file);
StringBuffer buffer = new StringBuffer();
BufferedReader in = new BufferedReader(new FileReader(file));
while (true) {
String line = in.readLine();
if (line == null) {
break;
}
buffer.append(line);
buffer.append(File.separator);
}
in.close();
return buffer.toString();
}
public int getLoops() {
return loops;
}
public void setLoops(int loops) {
this.loops = loops;
}
}

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.benchmark;
public class ProducerConsumer extends Producer {
private Consumer consumer = new Consumer();
public ProducerConsumer() {
}
public static void main(String[] args) {
ProducerConsumer tool = new ProducerConsumer();
if (args.length > 0) {
tool.setUrl(args[0]);
}
if (args.length > 1) {
tool.setTopic(parseBoolean(args[1]));
}
if (args.length > 2) {
tool.setSubject(args[2]);
}
if (args.length > 3) {
tool.setDurable(Boolean.getBoolean(args[3]));
}
if (args.length > 4) {
tool.setConnectionCount(Integer.parseInt(args[4]));
}
try {
tool.run();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void run() throws Exception {
consumer.start();
consumer.subscribe();
start();
publish();
}
public void setTopic(boolean topic) {
super.setTopic(topic);
consumer.setTopic(topic);
}
public void setSubject(String subject) {
super.setSubject(subject);
consumer.setSubject(subject);
}
public void setUrl(String url) {
super.setUrl(url);
consumer.setUrl(url);
}
protected boolean useTimerLoop() {
return false;
}
public Consumer getConsumer() {
return consumer;
}
}

View File

@ -1,362 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
/**
*
*/
public class AcidTestTool extends TestCase {
// Worker configuration.
protected int recordSize = 1024;
protected int batchSize = 5;
protected int workerThinkTime = 500;
protected Destination target;
private Random random = new Random();
private byte data[];
private int workerCount = 10;
private AtomicBoolean ignoreJMSErrors = new AtomicBoolean(false);
private ActiveMQConnectionFactory factory;
private Connection connection;
private AtomicInteger publishedBatches = new AtomicInteger(0);
private AtomicInteger consumedBatches = new AtomicInteger(0);
private List<Throwable> errors = Collections.synchronizedList(new ArrayList<Throwable>());
private interface Worker extends Runnable {
boolean waitForExit(long i) throws InterruptedException;
}
private final class ProducerWorker implements Worker {
private Session session;
private MessageProducer producer;
private BytesMessage message;
private CountDownLatch doneLatch = new CountDownLatch(1);
ProducerWorker(Session session, String workerId) throws JMSException {
this.session = session;
producer = session.createProducer(target);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
message = session.createBytesMessage();
message.setStringProperty("workerId", workerId);
message.writeBytes(data);
}
public void run() {
try {
for (int batchId = 0; true; batchId++) {
// System.out.println("Sending batch: "+workerId+"
// "+batchId);
for (int msgId = 0; msgId < batchSize; msgId++) {
// Sleep some random amount of time less than
// workerThinkTime
try {
Thread.sleep(random.nextInt(workerThinkTime));
} catch (InterruptedException e1) {
return;
}
message.setIntProperty("batch-id", batchId);
message.setIntProperty("msg-id", msgId);
producer.send(message);
}
session.commit();
publishedBatches.incrementAndGet();
// System.out.println("Commited send batch: "+workerId+"
// "+batchId);
}
} catch (JMSException e) {
if (!ignoreJMSErrors.get()) {
e.printStackTrace();
errors.add(e);
}
return;
} catch (Throwable e) {
e.printStackTrace();
errors.add(e);
return;
} finally {
System.out.println("Producer exiting.");
doneLatch.countDown();
}
}
public boolean waitForExit(long i) throws InterruptedException {
return doneLatch.await(i, TimeUnit.MILLISECONDS);
}
}
private final class ConsumerWorker implements Worker {
private Session session;
private MessageConsumer consumer;
private final long timeout;
private CountDownLatch doneLatch = new CountDownLatch(1);
ConsumerWorker(Session session, String workerId, long timeout) throws JMSException {
this.session = session;
this.timeout = timeout;
consumer = session.createConsumer(target, "workerId='" + workerId + "'");
}
public void run() {
try {
int batchId = 0;
while (true) {
for (int msgId = 0; msgId < batchSize; msgId++) {
// Sleep some random amount of time less than
// workerThinkTime
try {
Thread.sleep(random.nextInt(workerThinkTime));
} catch (InterruptedException e1) {
return;
}
Message message = consumer.receive(timeout);
if (msgId > 0) {
assertNotNull(message);
assertEquals(message.getIntProperty("batch-id"), batchId);
assertEquals(message.getIntProperty("msg-id"), msgId);
} else {
if (message == null) {
System.out.println("At end of batch an don't have a next batch to process. done.");
return;
}
assertEquals(msgId, message.getIntProperty("msg-id"));
batchId = message.getIntProperty("batch-id");
// System.out.println("Receiving batch: "+workerId+"
// "+batchId);
}
}
session.commit();
consumedBatches.incrementAndGet();
// System.out.println("Commited receive batch: "+workerId+"
// "+batchId);
}
} catch (JMSException e) {
if (!ignoreJMSErrors.get()) {
e.printStackTrace();
errors.add(e);
}
return;
} catch (Throwable e) {
e.printStackTrace();
errors.add(e);
return;
} finally {
System.out.println("Consumer exiting.");
doneLatch.countDown();
}
}
public boolean waitForExit(long i) throws InterruptedException {
return doneLatch.await(i, TimeUnit.MILLISECONDS);
}
}
/**
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
this.target = new ActiveMQQueue(getClass().getName());
}
protected void tearDown() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Throwable ignore) {
}
connection = null;
}
}
/**
* @throws InterruptedException
* @throws JMSException
* @throws JMSException
*/
private void reconnect() throws InterruptedException, JMSException {
if (connection != null) {
try {
connection.close();
} catch (Throwable ignore) {
}
connection = null;
}
long reconnectDelay = 1000;
while (connection == null) {
if (reconnectDelay > 1000 * 10) {
reconnectDelay = 1000 * 10;
}
try {
connection = factory.createConnection();
connection.start();
} catch (JMSException e) {
Thread.sleep(reconnectDelay);
reconnectDelay *= 2;
}
}
}
/**
* @throws Throwable
* @throws IOException
*/
public void testAcidTransactions() throws Throwable {
System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: " + batchSize + ", Worker Think Time: " + workerThinkTime);
// Create the record and fill it with some values.
data = new byte[recordSize];
for (int i = 0; i < data.length; i++) {
data[i] = (byte)i;
}
System.out.println("==============================================");
System.out.println("===> Start the server now.");
System.out.println("==============================================");
reconnect();
System.out.println("Starting " + workerCount + " Workers...");
ArrayList<Worker> workers = new ArrayList<Worker>();
for (int i = 0; i < workerCount; i++) {
String workerId = "worker-" + i;
Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 1000 * 5);
workers.add(w);
new Thread(w, "Consumer:" + workerId).start();
w = new ProducerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId);
workers.add(w);
new Thread(w, "Producer:" + workerId).start();
}
System.out.println("Waiting for " + (workerCount * 10) + " batches to be delivered.");
//
// Wait for about 5 batches of messages per worker to be consumed before
// restart.
//
while (publishedBatches.get() < workerCount * 5) {
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
Thread.sleep(1000);
}
System.out.println("==============================================");
System.out.println("===> Server is under load now. Kill it!");
System.out.println("==============================================");
ignoreJMSErrors.set(true);
// Wait for all the workers to finish.
System.out.println("Waiting for all workers to exit due to server shutdown.");
for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) {
Worker worker = iter.next();
while (!worker.waitForExit(1000)) {
System.out.println("==============================================");
System.out.println("===> Server is under load now. Kill it!");
System.out.println("==============================================");
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
}
}
workers.clear();
// No errors should have occurred so far.
if (errors.size() > 0) {
throw errors.get(0);
}
System.out.println("==============================================");
System.out.println("===> Start the server now.");
System.out.println("==============================================");
reconnect();
System.out.println("Restarted.");
// Validate the all transactions were commited as a uow. Looking for
// partial commits.
for (int i = 0; i < workerCount; i++) {
String workerId = "worker-" + i;
Worker w = new ConsumerWorker(connection.createSession(true, Session.SESSION_TRANSACTED), workerId, 5 * 1000);
workers.add(w);
new Thread(w, "Consumer:" + workerId).start();
}
System.out.println("Waiting for restarted consumers to finish consuming all messages..");
for (Iterator<Worker> iter = workers.iterator(); iter.hasNext();) {
Worker worker = iter.next();
while (!worker.waitForExit(1000 * 5)) {
System.out.println("Waiting for restarted consumers to finish consuming all messages..");
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
}
}
workers.clear();
System.out.println("Workers finished..");
System.out.println("Stats: Produced Batches: " + this.publishedBatches.get() + ", Consumed Batches: " + this.consumedBatches.get());
if (errors.size() > 0) {
throw errors.get(0);
}
}
public static void main(String[] args) {
try {
AcidTestTool tool = new AcidTestTool();
tool.setUp();
tool.testAcidTransactions();
tool.tearDown();
} catch (Throwable e) {
System.out.println("Test Failed: " + e.getMessage());
e.printStackTrace();
}
}
}

View File

@ -1,133 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
/**
* A simple tool for consuming messages
*
*
*/
public class ConsumerTool extends ToolSupport implements MessageListener {
protected int count;
protected int dumpCount = 10;
protected boolean verbose = true;
protected int maxiumMessages;
private boolean pauseBeforeShutdown;
public static void main(String[] args) {
ConsumerTool tool = new ConsumerTool();
if (args.length > 0) {
tool.url = args[0];
}
if (args.length > 1) {
tool.topic = args[1].equalsIgnoreCase("true");
}
if (args.length > 2) {
tool.subject = args[2];
}
if (args.length > 3) {
tool.durable = args[3].equalsIgnoreCase("true");
}
if (args.length > 4) {
tool.maxiumMessages = Integer.parseInt(args[4]);
}
tool.run();
}
public void run() {
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
Connection connection = createConnection();
Session session = createSession(connection);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic)destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}
if (maxiumMessages <= 0) {
consumer.setMessageListener(this);
}
connection.start();
if (maxiumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
if (verbose) {
String msg = txtMsg.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Received: " + msg);
}
} else {
if (verbose) {
System.out.println("Received: " + message);
}
}
/*
* if (++count % dumpCount == 0) { dumpStats(connection); }
*/
} catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
for (int i = 0; i < maxiumMessages; i++) {
Message message = consumer.receive();
onMessage(message);
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
}

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.naming.InitialContext;
/**
*
*/
public class JndiProducerTool extends ProducerTool {
public static void main(String[] args) {
runTool(args, new JndiProducerTool());
}
protected Connection createConnection() throws Exception {
InitialContext jndiContext = new InitialContext();
ConnectionFactory queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
Connection connection = queueConnectionFactory.createConnection();
destination = (Queue) jndiContext.lookup(subject);
return connection;
}
}

View File

@ -1,131 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* A simple tool for publishing messages
*
*
*/
public class ProducerTool extends ToolSupport {
protected int messageCount = 10;
protected long sleepTime;
protected boolean verbose = true;
protected int messageSize = 255;
public static void main(String[] args) {
runTool(args, new ProducerTool());
}
protected static void runTool(String[] args, ProducerTool tool) {
if (args.length > 0) {
tool.url = args[0];
}
if (args.length > 1) {
tool.topic = args[1].equalsIgnoreCase("true");
}
if (args.length > 2) {
tool.subject = args[2];
}
if (args.length > 3) {
tool.durable = args[3].equalsIgnoreCase("true");
}
if (args.length > 4) {
tool.messageCount = Integer.parseInt(args[4]);
}
if (args.length > 5) {
tool.messageSize = Integer.parseInt(args[5]);
}
tool.run();
}
public void run() {
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
Connection connection = createConnection();
Session session = createSession(connection);
MessageProducer producer = createProducer(session);
// connection.start();
sendLoop(session, producer);
System.out.println("Done.");
close(connection, session);
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
protected MessageProducer createProducer(Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
if (durable) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
return producer;
}
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < messageCount; i++) {
TextMessage message = session.createTextMessage(createMessageText(i));
if (verbose) {
String msg = message.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Sending message: " + msg);
}
producer.send(message);
Thread.sleep(sleepTime);
}
producer.send(session.createMessage());
}
/**
* @param i
* @return
*/
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
}

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
/**
* Abstract base class useful for implementation inheritence
*
*
*/
public class ToolSupport {
protected Destination destination;
protected String subject = "TOOL.DEFAULT";
protected boolean topic = true;
protected String user = ActiveMQConnection.DEFAULT_USER;
protected String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
protected String url = ActiveMQConnection.DEFAULT_BROKER_URL;
protected boolean transacted;
protected boolean durable;
protected String clientID = getClass().getName();
protected int ackMode = Session.AUTO_ACKNOWLEDGE;
protected String consumerName = "James";
protected Session createSession(Connection connection) throws Exception {
if (durable) {
connection.setClientID(clientID);
}
Session session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
return session;
}
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
return connectionFactory.createConnection();
}
protected void close(Connection connection, Session session) throws JMSException {
// lets dump the stats
dumpStats(connection);
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
protected void dumpStats(Connection connection) {
ActiveMQConnection c = (ActiveMQConnection)connection;
c.getConnectionStats().dump(new IndentPrinter());
}
}

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.bio.SocketConnector;
import org.eclipse.jetty.webapp.WebAppContext;
/**
*
*/
public final class WebServer {
public static final int PORT = 8080;
// public static final String WEBAPP_DIR = "target/activemq";
public static final String WEBAPP_DIR = "src/webapp";
public static final String WEBAPP_CTX = "/";
private WebServer() {
}
public static void main(String[] args) throws Exception {
Server server = new Server();
Connector context = new SocketConnector();
context.setServer(server);
context.setPort(PORT);
String webappDir = WEBAPP_DIR;
if (args.length > 0) {
webappDir = args[0];
}
WebAppContext webapp = new WebAppContext();
webapp.setServer(server);
webapp.setContextPath(WEBAPP_CTX);
webapp.setResourceBase(webappDir);
server.setHandler(webapp);
server.setConnectors(new Connector[] {
context
});
server.start();
}
}

View File

@ -1,145 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.oxm;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.MessageTransformerSupport;
/**
* Abstract class used as a base for implementing transformers from object to text messages (in XML/JSON format)
* and vice versa using.
* Supports plugging of custom marshallers
*/
public abstract class AbstractXMLMessageTransformer extends
MessageTransformerSupport {
protected MessageTransform transformType;
/**
* Defines the type of transformation. If XML (default), - producer
* transformation transforms from Object to XML. - consumer transformation
* transforms from XML to Object. If OBJECT, - producer transformation
* transforms from XML to Object. - consumer transformation transforms from
* Object to XML. If ADAPTIVE, - producer transformation transforms from
* Object to XML, or XML to Object depending on the type of the original
* message - consumer transformation transforms from XML to Object, or
* Object to XML depending on the type of the original message
*/
public enum MessageTransform {
XML, OBJECT, ADAPTIVE
};
public AbstractXMLMessageTransformer() {
this(MessageTransform.XML);
}
public AbstractXMLMessageTransformer(MessageTransform transformType) {
this.transformType = transformType;
}
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
switch (transformType) {
case XML:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message;
case OBJECT:
return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
case ADAPTIVE:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
default:
}
return message;
}
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
switch (transformType) {
case XML:
return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
case OBJECT:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message;
case ADAPTIVE:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
default:
}
return message;
}
public MessageTransform getTransformType() {
return transformType;
}
public void setTransformType(MessageTransform transformType) {
this.transformType = transformType;
}
/**
* Transforms an incoming XML encoded {@link TextMessage} to an
* {@link ObjectMessage}
*
* @param session - JMS session currently being used
* @param textMessage - text message to transform to object message
* @return ObjectMessage
* @throws JMSException
*/
protected ObjectMessage textToObject(Session session, TextMessage textMessage) throws JMSException {
Object object = unmarshall(session, textMessage);
if (object instanceof Serializable) {
ObjectMessage answer = session.createObjectMessage((Serializable)object);
copyProperties(textMessage, answer);
return answer;
} else {
throw new JMSException("Object is not serializable: " + object);
}
}
/**
* Transforms an incoming {@link ObjectMessage} to an XML encoded
* {@link TextMessage}
*
* @param session - JMS session currently being used
* @param objectMessage - object message to transform to text message
* @return XML encoded TextMessage
* @throws JMSException
*/
protected TextMessage objectToText(Session session, ObjectMessage objectMessage) throws JMSException {
TextMessage answer = session.createTextMessage(marshall(session, objectMessage));
copyProperties(objectMessage, answer);
return answer;
}
/**
* Marshalls the Object in the {@link ObjectMessage} to a string using XML
* encoding
*/
protected abstract String marshall(Session session, ObjectMessage objectMessage) throws JMSException;
/**
* Unmarshalls the XML encoded message in the {@link TextMessage} to an
* Object
*/
protected abstract Object unmarshall(Session session, TextMessage textMessage) throws JMSException;
}

View File

@ -1,85 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.oxm;
import java.io.StringReader;
import java.io.StringWriter;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.xml.transform.Result;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import org.springframework.oxm.support.AbstractMarshaller;
/**
* Transforms object messages to text messages and vice versa using Spring OXM.
*
*/
public class OXMMessageTransformer extends AbstractXMLMessageTransformer {
/**
* OXM marshaller used to marshall/unmarshall messages
*/
private AbstractMarshaller marshaller;
public AbstractMarshaller getMarshaller() {
return marshaller;
}
public void setMarshaller(AbstractMarshaller marshaller) {
this.marshaller = marshaller;
}
/**
* Marshalls the Object in the {@link ObjectMessage} to a string using XML
* encoding
*/
protected String marshall(Session session, ObjectMessage objectMessage)
throws JMSException {
try {
StringWriter writer = new StringWriter();
Result result = new StreamResult(writer);
marshaller.marshal(objectMessage.getObject(), result);
writer.flush();
return writer.toString();
} catch (Exception e) {
throw new JMSException(e.getMessage());
}
}
/**
* Unmarshalls the XML encoded message in the {@link TextMessage} to an
* Object
*/
protected Object unmarshall(Session session, TextMessage textMessage)
throws JMSException {
try {
String text = textMessage.getText();
Source source = new StreamSource(new StringReader(text));
return marshaller.unmarshal(source);
} catch (Exception e) {
throw new JMSException(e.getMessage());
}
}
}

View File

@ -1,109 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.oxm;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamDriver;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
import com.thoughtworks.xstream.io.xml.XppReader;
import org.xmlpull.mxp1.MXParser;
/**
* Transforms object messages to text messages and vice versa using
* {@link XStream}
*
*/
public class XStreamMessageTransformer extends AbstractXMLMessageTransformer {
private XStream xStream;
/**
* Specialized driver to be used with stream readers and writers
*/
private HierarchicalStreamDriver streamDriver;
// Properties
// -------------------------------------------------------------------------
public XStream getXStream() {
if (xStream == null) {
xStream = createXStream();
}
return xStream;
}
public void setXStream(XStream xStream) {
this.xStream = xStream;
}
public HierarchicalStreamDriver getStreamDriver() {
return streamDriver;
}
public void setStreamDriver(HierarchicalStreamDriver streamDriver) {
this.streamDriver = streamDriver;
}
// Implementation methods
// -------------------------------------------------------------------------
protected XStream createXStream() {
return new XStream();
}
/**
* Marshalls the Object in the {@link ObjectMessage} to a string using XML
* encoding
*/
protected String marshall(Session session, ObjectMessage objectMessage) throws JMSException {
Serializable object = objectMessage.getObject();
StringWriter buffer = new StringWriter();
HierarchicalStreamWriter out;
if (streamDriver != null) {
out = streamDriver.createWriter(buffer);
} else {
out = new PrettyPrintWriter(buffer);
}
getXStream().marshal(object, out);
return buffer.toString();
}
/**
* Unmarshalls the XML encoded message in the {@link TextMessage} to an
* Object
*/
protected Object unmarshall(Session session, TextMessage textMessage) throws JMSException {
HierarchicalStreamReader in;
if (streamDriver != null) {
in = streamDriver.createReader(new StringReader(textMessage.getText()));
} else {
in = new XppReader(new StringReader(textMessage.getText()), new MXParser());
}
return getXStream().unmarshal(in);
}
}

View File

@ -1,237 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.oxm;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.MessageTransformer;
import org.apache.activemq.util.xstream.SamplePojo;
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.ADAPTIVE;
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.OBJECT;
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.XML;
public abstract class AbstractXMLMessageTransformerTest extends TestCase {
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
protected Connection connection;
protected long timeout = 5000;
protected Connection createConnection(MessageTransformer transformer) throws Exception {
connectionFactory.setTransformer(transformer);
connection = connectionFactory.createConnection();
connection.start();
return connection;
}
protected abstract AbstractXMLMessageTransformer createTransformer();
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
AbstractXMLMessageTransformer transformer = createTransformer();
transformer.setTransformType(XML);
connection = createConnection(transformer);
// lets create the consumers
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = objectSession.createTopic(getClass().getName());
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer textConsumer = textSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer)textConsumer).setTransformer(null);
// send a message
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
ObjectMessage request = producerSession.createObjectMessage(new SamplePojo("James", "London"));
producer.send(request);
// lets consume it as an object message
Message message = objectConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
ObjectMessage objectMessage = (ObjectMessage)message;
Object object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
SamplePojo body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// lets consume it as a text message
message = textConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
System.out.println("Received XML...");
System.out.println(text);
}
public void testSendTextMessageReceiveAsObjectMessageAndTextMessage() throws Exception {
AbstractXMLMessageTransformer transformer = createTransformer();
transformer.setTransformType(OBJECT);
connection = createConnection(transformer);
// lets create the consumers
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = textSession.createTopic(getClass().getName());
MessageConsumer textConsumer = textSession.createConsumer(destination);
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer)objectConsumer).setTransformer(null);
// send a message
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
+ "<name>James</name>"
+ "<city>London</city>"
+ "</org.apache.activemq.util.xstream.SamplePojo>";
TextMessage request = producerSession.createTextMessage(xmlText);
producer.send(request);
Message message;
// lets consume it as a text message
message = textConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
// lets consume it as an object message
message = objectConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
ObjectMessage objectMessage = (ObjectMessage)message;
Object object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
SamplePojo body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
}
public void testAdaptiveTransform() throws Exception {
AbstractXMLMessageTransformer transformer = createTransformer();
transformer.setTransformType(ADAPTIVE);
connection = createConnection(transformer);
// lets create the consumers
Session adaptiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = adaptiveSession.createTopic(getClass().getName());
MessageConsumer adaptiveConsumer = adaptiveSession.createConsumer(destination);
Session origSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer origConsumer = origSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer)origConsumer).setTransformer(null);
// Create producer
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
Message message;
ObjectMessage objectMessage;
TextMessage textMessage;
SamplePojo body;
Object object;
String text;
// Send a text message
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
+ "<name>James</name>"
+ "<city>London</city>"
+ "</org.apache.activemq.util.xstream.SamplePojo>";
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
producer.send(txtRequest);
// lets consume it as a text message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
textMessage = (TextMessage)message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
// lets consume it as an object message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
objectMessage = (ObjectMessage)message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// Send object message
ObjectMessage objRequest = producerSession.createObjectMessage(new SamplePojo("James", "London"));
producer.send(objRequest);
// lets consume it as an object message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
objectMessage = (ObjectMessage)message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// lets consume it as a text message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
textMessage = (TextMessage)message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
System.out.println("Received XML...");
System.out.println(text);
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}
}

View File

@ -1,30 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.oxm;
import org.springframework.oxm.xstream.XStreamMarshaller;
public class OXMMessageTransformTest extends AbstractXMLMessageTransformerTest {
protected AbstractXMLMessageTransformer createTransformer() {
OXMMessageTransformer transformer = new OXMMessageTransformer();
transformer.setMarshaller(new XStreamMarshaller());
return transformer;
}
}

View File

@ -1,135 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.oxm;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.util.xstream.SamplePojo;
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.ADAPTIVE;
public class XStreamMessageTransformTest extends
AbstractXMLMessageTransformerTest {
protected AbstractXMLMessageTransformer createTransformer() {
return new XStreamMessageTransformer();
}
public void testStreamDriverTransform() throws Exception {
XStreamMessageTransformer transformer = (XStreamMessageTransformer) createTransformer();
transformer.setTransformType(ADAPTIVE);
transformer.setStreamDriver(new JettisonMappedXmlDriver());
connection = createConnection(transformer);
// lets create the consumers
Session adaptiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = adaptiveSession.createTopic(getClass()
.getName());
MessageConsumer adaptiveConsumer = adaptiveSession
.createConsumer(destination);
Session origSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer origConsumer = origSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer) origConsumer).setTransformer(null);
// Create producer
Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
Message message;
ObjectMessage objectMessage;
TextMessage textMessage;
SamplePojo body;
Object object;
String text;
// Send a text message
String xmlText = "{\"org.apache.activemq.util.xstream.SamplePojo\":{\"name\":\"James\",\"city\":\"London\"}}";
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
producer.send(txtRequest);
// lets consume it as a text message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message,
message instanceof TextMessage);
textMessage = (TextMessage) message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null
&& text.length() > 0);
// lets consume it as an object message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message,
message instanceof ObjectMessage);
objectMessage = (ObjectMessage) message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object,
object instanceof SamplePojo);
body = (SamplePojo) object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// Send object message
ObjectMessage objRequest = producerSession
.createObjectMessage(new SamplePojo("James", "London"));
producer.send(objRequest);
// lets consume it as an object message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message,
message instanceof ObjectMessage);
objectMessage = (ObjectMessage) message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object,
object instanceof SamplePojo);
body = (SamplePojo) object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// lets consume it as a text message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message,
message instanceof TextMessage);
textMessage = (TextMessage) message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null
&& text.length() > 0);
System.out.println("Received JSON...");
System.out.println(text);
}
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.xstream;
import java.io.Serializable;
/**
*
*/
public class SamplePojo implements Serializable {
private String name;
private String city;
public SamplePojo() {
}
public SamplePojo(String name, String city) {
this.name = name;
this.city = city;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -1,315 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.xstream;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.ADAPTIVE;
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.OBJECT;
import static org.apache.activemq.util.oxm.AbstractXMLMessageTransformer.MessageTransform.XML;
/**
*
*/
public class XStreamTransformTest extends TestCase {
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
protected Connection connection;
protected long timeout = 5000;
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
transformer.setTransformType(XML);
connectionFactory.setTransformer(transformer);
connection = connectionFactory.createConnection();
connection.start();
// lets create the consumers
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = objectSession.createTopic(getClass().getName());
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer textConsumer = textSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer)textConsumer).setTransformer(null);
// send a message
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
ObjectMessage request = producerSession.createObjectMessage(new SamplePojo("James", "London"));
producer.send(request);
// lets consume it as an object message
Message message = objectConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
ObjectMessage objectMessage = (ObjectMessage)message;
Object object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
SamplePojo body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// lets consume it as a text message
message = textConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
System.out.println("Received XML...");
System.out.println(text);
}
public void testSendTextMessageReceiveAsObjectMessageAndTextMessage() throws Exception {
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
transformer.setTransformType(OBJECT);
connectionFactory.setTransformer(transformer);
connection = connectionFactory.createConnection();
connection.start();
// lets create the consumers
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = textSession.createTopic(getClass().getName());
MessageConsumer textConsumer = textSession.createConsumer(destination);
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer)objectConsumer).setTransformer(null);
// send a message
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
+ "<name>James</name>"
+ "<city>London</city>"
+ "</org.apache.activemq.util.xstream.SamplePojo>";
TextMessage request = producerSession.createTextMessage(xmlText);
producer.send(request);
Message message;
// lets consume it as a text message
message = textConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
// lets consume it as an object message
message = objectConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
ObjectMessage objectMessage = (ObjectMessage)message;
Object object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
SamplePojo body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
}
public void testAdaptiveTransform() throws Exception {
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
transformer.setTransformType(ADAPTIVE);
connectionFactory.setTransformer(transformer);
connection = connectionFactory.createConnection();
connection.start();
// lets create the consumers
Session adaptiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = adaptiveSession.createTopic(getClass().getName());
MessageConsumer adaptiveConsumer = adaptiveSession.createConsumer(destination);
Session origSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer origConsumer = origSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer)origConsumer).setTransformer(null);
// Create producer
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
Message message;
ObjectMessage objectMessage;
TextMessage textMessage;
SamplePojo body;
Object object;
String text;
// Send a text message
String xmlText = "<org.apache.activemq.util.xstream.SamplePojo>"
+ "<name>James</name>"
+ "<city>London</city>"
+ "</org.apache.activemq.util.xstream.SamplePojo>";
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
producer.send(txtRequest);
// lets consume it as a text message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
textMessage = (TextMessage)message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
// lets consume it as an object message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
objectMessage = (ObjectMessage)message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// Send object message
ObjectMessage objRequest = producerSession.createObjectMessage(new SamplePojo("James", "London"));
producer.send(objRequest);
// lets consume it as an object message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
objectMessage = (ObjectMessage)message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// lets consume it as a text message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
textMessage = (TextMessage)message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
System.out.println("Received XML...");
System.out.println(text);
}
public void testStreamDriverTransform() throws Exception {
org.apache.activemq.util.oxm.XStreamMessageTransformer transformer = new org.apache.activemq.util.oxm.XStreamMessageTransformer();
transformer.setTransformType(ADAPTIVE);
transformer.setStreamDriver(new JettisonMappedXmlDriver());
connectionFactory.setTransformer(transformer);
connection = connectionFactory.createConnection();
connection.start();
// lets create the consumers
Session adaptiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = adaptiveSession.createTopic(getClass().getName());
MessageConsumer adaptiveConsumer = adaptiveSession.createConsumer(destination);
Session origSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer origConsumer = origSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as
// it really is
((ActiveMQMessageConsumer)origConsumer).setTransformer(null);
// Create producer
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
Message message;
ObjectMessage objectMessage;
TextMessage textMessage;
SamplePojo body;
Object object;
String text;
// Send a text message
String xmlText = "{\"org.apache.activemq.util.xstream.SamplePojo\":{\"name\":\"James\",\"city\":\"London\"}}";
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
producer.send(txtRequest);
// lets consume it as a text message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
textMessage = (TextMessage)message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
// lets consume it as an object message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
objectMessage = (ObjectMessage)message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// Send object message
ObjectMessage objRequest = producerSession.createObjectMessage(new SamplePojo("James", "London"));
producer.send(objRequest);
// lets consume it as an object message
message = adaptiveConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
objectMessage = (ObjectMessage)message;
object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
body = (SamplePojo)object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// lets consume it as a text message
message = origConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
textMessage = (TextMessage)message;
text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
System.out.println("Received JSON...");
System.out.println(text);
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}
}

View File

@ -1,38 +0,0 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN
#log4j.logger.org.apache.activemq=DEBUG
#log4j.logger.org.eclipse.jetty.io.nio.ssl=DEBUG
#log4j.logger.org.apache.http=INFO
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.out.file=target/activemq-test.log
log4j.appender.out.append=true