Moved the camel-activemq module into the activemq-core module to break the circular dependency between the activemq and camel projects.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@573615 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-09-07 15:44:45 +00:00
parent b732d3d114
commit 5f96839259
19 changed files with 1166 additions and 18 deletions

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.camel.CamelContext;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.jms.JmsConfiguration;
/**
* The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
*
* @version $Revision$
*/
public class ActiveMQComponent extends JmsComponent {
/**
* Creates an <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
*
* @return the created component
*/
public static ActiveMQComponent activeMQComponent() {
return new ActiveMQComponent();
}
/**
* Creates an <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
* connecting to the given <a href="http://activemq.apache.org/configuring-transports.html">broker URL</a>
*
* @param brokerURL the URL to connect to
* @return the created component
*/
public static ActiveMQComponent activeMQComponent(String brokerURL) {
ActiveMQComponent answer = new ActiveMQComponent();
answer.getConfiguration().setBrokerURL(brokerURL);
return answer;
}
public ActiveMQComponent() {
}
public ActiveMQComponent(CamelContext context) {
super(context);
}
public ActiveMQComponent(ActiveMQConfiguration configuration) {
super(configuration);
}
@Override
public ActiveMQConfiguration getConfiguration() {
return (ActiveMQConfiguration) super.getConfiguration();
}
public void setBrokerURL(String brokerURL) {
getConfiguration().setBrokerURL(brokerURL);
}
@Override
protected JmsConfiguration createConfiguration() {
return new ActiveMQConfiguration();
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;
import javax.jms.ConnectionFactory;
/**
* @version $Revision$
*/
public class ActiveMQConfiguration extends JmsConfiguration {
private String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
public ActiveMQConfiguration() {
}
public String getBrokerURL() {
return brokerURL;
}
/**
* Sets the broker URL to use to connect to ActiveMQ using the
* <a href="http://activemq.apache.org/configuring-transports.html">ActiveMQ URI format</a>
*
* @param brokerURL the URL of the broker.
*/
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
}
@Override
public ActiveMQConnectionFactory getListenerConnectionFactory() {
return (ActiveMQConnectionFactory) super.getListenerConnectionFactory();
}
@Override
public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
if (listenerConnectionFactory instanceof ActiveMQConnectionFactory) {
super.setListenerConnectionFactory(listenerConnectionFactory);
}
else {
throw new IllegalArgumentException("ConnectionFactory " + listenerConnectionFactory
+ " is not an instanceof " + ActiveMQConnectionFactory.class.getName());
}
}
@Override
protected ConnectionFactory createListenerConnectionFactory() {
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory();
answer.setBrokerURL(getBrokerURL());
return answer;
}
@Override
protected ConnectionFactory createTemplateConnectionFactory() {
return new PooledConnectionFactory(getListenerConnectionFactory());
}
}

View File

@ -0,0 +1,42 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.camel.Converter;
/**
* @version $Revision$
*/
@Converter
public class ActiveMQConverter {
/**
* Converts a URL in ActiveMQ syntax to a destination such as to support
* "queue://foo.bar" or 'topic://bar.whatnot". Things default to queues if no scheme.
*
* This allows ActiveMQ destinations to be passed around as Strings and converted back again.
*
* @param name is the name of the queue or the full URI using prefixes queue:// or topic://
* @return the ActiveMQ destination
*/
@Converter
public static ActiveMQDestination toDestination(String name) {
return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
}
}

View File

@ -0,0 +1,74 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.*;
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.component.jms.JmsBinding;
import javax.jms.MessageNotWriteableException;
import javax.jms.JMSException;
import java.io.Serializable;
/**
* @version $Revision$
*/
@Converter
public class ActiveMQMessageConverter {
private JmsBinding binding = new JmsBinding();
/**
* Converts the inbound message exchange to an ActiveMQ JMS message
*
* @return the ActiveMQ message
*/
@Converter
public ActiveMQMessage toMessage(Exchange exchange) throws JMSException {
ActiveMQMessage message = createActiveMQMessage(exchange);
getBinding().appendJmsProperties(message, exchange);
return message;
}
private static ActiveMQMessage createActiveMQMessage(Exchange exchange) throws JMSException {
Object body = exchange.getIn().getBody();
if (body instanceof String) {
ActiveMQTextMessage answer = new ActiveMQTextMessage();
answer.setText((String) body);
return answer;
} else if (body instanceof Serializable) {
ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
answer.setObject((Serializable) body);
return answer;
} else {
return new ActiveMQMessage();
}
}
// Properties
//-------------------------------------------------------------------------
public JmsBinding getBinding() {
return binding;
}
public void setBinding(JmsBinding binding) {
this.binding = binding;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultComponent;
import java.io.File;
import java.util.Map;
/**
* The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
*
* @version $Revision$
*/
public class JournalComponent extends DefaultComponent<Exchange> {
@Override
protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
JournalEndpoint endpoint = new JournalEndpoint(uri, this, new File(remaining));
setProperties(endpoint, parameters);
return endpoint;
}
}

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class JournalEndpoint extends DefaultEndpoint<Exchange> {
private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class);
private final File directory;
private final AtomicReference<DefaultConsumer<Exchange>> consumer = new AtomicReference<DefaultConsumer<Exchange>>();
private final Object activationMutex = new Object();
private int referenceCount;
private AsyncDataManager dataManager;
private Thread thread;
private Location lastReadLocation;
private long idleDelay = 1000;
private boolean syncProduce = true;
private boolean syncConsume;
public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
super(uri, journalComponent.getCamelContext());
this.directory = directory;
}
public boolean isSingleton() {
return true;
}
public File getDirectory() {
return directory;
}
public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
return new DefaultConsumer<Exchange>(this, processor) {
@Override
public void start() throws Exception {
super.start();
activateConsumer(this);
}
@Override
public void stop() throws Exception {
deactivateConsumer(this);
super.stop();
}
};
}
protected void decrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount--;
if (referenceCount == 0) {
LOG.debug("Closing data manager: " + directory);
LOG.debug("Last mark at: " + lastReadLocation);
dataManager.close();
dataManager = null;
}
}
}
protected void incrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount++;
if (referenceCount == 1) {
LOG.debug("Opening data manager: " + directory);
dataManager = new AsyncDataManager();
dataManager.setDirectory(directory);
dataManager.start();
lastReadLocation = dataManager.getMark();
LOG.debug("Last mark at: " + lastReadLocation);
}
}
}
protected void deactivateConsumer(DefaultConsumer<Exchange> consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != consumer) {
throw new RuntimeCamelException("Consumer was not active.");
}
this.consumer.set(null);
try {
thread.join();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
decrementReference();
}
}
protected void activateConsumer(DefaultConsumer<Exchange> consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != null) {
throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
}
incrementReference();
this.consumer.set(consumer);
thread = new Thread() {
@Override
public void run() {
dispatchToConsumer();
}
};
thread.setName("Dipatch thread: " + getEndpointUri());
thread.setDaemon(true);
thread.start();
}
}
protected void dispatchToConsumer() {
try {
DefaultConsumer<Exchange> consumer;
while ((consumer = this.consumer.get()) != null) {
// See if there is a new record to process
Location location = dataManager.getNextLocation(lastReadLocation);
if (location != null) {
// Send it on.
ByteSequence read = dataManager.read(location);
Exchange exchange = createExchange();
exchange.getIn().setBody(read);
exchange.getIn().setHeader("journal", getEndpointUri());
exchange.getIn().setHeader("location", location);
consumer.getProcessor().process(exchange);
// Setting the mark makes the data manager forget about
// everything
// before that record.
if (LOG.isDebugEnabled()) {
LOG.debug("Consumed record at: " + location);
}
dataManager.setMark(location, syncConsume);
lastReadLocation = location;
} else {
// Avoid a tight CPU loop if there is no new record to read.
LOG.debug("Sleeping due to no records being available.");
Thread.sleep(idleDelay);
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}
public Producer<Exchange> createProducer() throws Exception {
return new DefaultProducer<Exchange>(this) {
public void process(Exchange exchange) throws Exception {
incrementReference();
try {
ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
if (body == null) {
byte[] bytes = exchange.getIn().getBody(byte[].class);
if (bytes != null) {
body = new ByteSequence(bytes);
}
}
if (body == null) {
throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
}
dataManager.write(body, syncProduce);
} finally {
decrementReference();
}
}
};
}
public boolean isSyncConsume() {
return syncConsume;
}
public void setSyncConsume(boolean syncConsume) {
this.syncConsume = syncConsume;
}
public boolean isSyncProduce() {
return syncProduce;
}
public void setSyncProduce(boolean syncProduce) {
this.syncProduce = syncProduce;
}
boolean isOpen() {
synchronized (activationMutex) {
return referenceCount > 0;
}
}
}

View File

@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
Defines the <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
</body>
</html>

View File

@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.activemq.camel.component

View File

@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class=org.apache.activemq.camel.component.ActiveMQComponent

View File

@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class=org.apache.activemq.camel.component.JournalComponent

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.component.jms.JmsConsumer;
import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.component.jms.JmsProducer;
import org.apache.camel.processor.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
/**
* @version $Revision$
*/
public class ActiveMQConfigureTest extends ContextTestSupport {
public void testJmsTemplateUsesPoolingConnectionFactory() throws Exception {
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo");
JmsProducer producer = endpoint.createProducer();
JmsTemplate template = assertIsInstanceOf(JmsTemplate.class, producer.getTemplate());
assertIsInstanceOf(PooledConnectionFactory.class, template.getConnectionFactory());
assertEquals("pubSubDomain", false, template.isPubSubDomain());
}
public void testListenerContainerUsesSpringConnectionFactory() throws Exception {
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:topic:test.foo");
JmsConsumer consumer = endpoint.createConsumer(new Logger());
AbstractMessageListenerContainer listenerContainer = consumer.getListenerContainer();
assertIsInstanceOf(ActiveMQConnectionFactory.class, listenerContainer.getConnectionFactory());
assertEquals("pubSubDomain", true, listenerContainer.isPubSubDomain());
}
@Override
protected JmsEndpoint resolveMandatoryEndpoint(String uri) {
Endpoint endpoint = super.resolveMandatoryEndpoint(uri);
return assertIsInstanceOf(JmsEndpoint.class, endpoint);
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
import org.apache.camel.component.jms.JmsExchange;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.mock.AssertionClause;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Message;
import javax.jms.Destination;
import java.util.Date;
import java.util.List;
/**
* @version $Revision$
*/
public class ActiveMQJmsHeaderRouteTest extends ContextTestSupport {
private static final transient Log LOG = LogFactory.getLog(ActiveMQJmsHeaderRouteTest.class);
protected Object expectedBody = "<time>" + new Date() + "</time>";
protected ActiveMQQueue replyQueue = new ActiveMQQueue("test.reply.queue");
protected String correlationID = "ABC-123";
protected String messageType = getClass().getName();
public void testForwardingAMessageAcrossJMSKeepingCustomJMSHeaders() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedBodiesReceived(expectedBody);
AssertionClause firstMessageExpectations = resultEndpoint.message(0);
firstMessageExpectations.header("cheese").isEqualTo(123);
firstMessageExpectations.header("JMSReplyTo").isEqualTo(replyQueue);
firstMessageExpectations.header("JMSCorrelationID").isEqualTo(correlationID);
firstMessageExpectations.header("JMSType").isEqualTo(messageType);
template.sendBodyAndHeader("activemq:test.a", expectedBody, "cheese", 123);
resultEndpoint.assertIsSatisfied();
List<Exchange> list = resultEndpoint.getReceivedExchanges();
Exchange exchange = list.get(0);
Object replyTo = exchange.getIn().getHeader("JMSReplyTo");
LOG.info("Reply to is: " + replyTo);
Destination destination = assertIsInstanceOf(Destination.class, replyTo);
assertEquals("ReplyTo", replyQueue.toString(), destination.toString());
}
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
// START SNIPPET: example
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
// END SNIPPET: example
return camelContext;
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("activemq:test.a").process(new Processor() {
public void process(Exchange exchange) throws Exception {
// lets set the custom JMS headers using the JMS API
JmsExchange jmsExchange = assertIsInstanceOf(JmsExchange.class, exchange);
Message inMessage = jmsExchange.getInMessage();
inMessage.setJMSReplyTo(replyQueue);
inMessage.setJMSCorrelationID(correlationID);
inMessage.setJMSType(messageType);
}
}).to("activemq:test.b");
from("activemq:test.b").to("mock:result");
}
};
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
import org.apache.camel.component.mock.AssertionClause;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Destination;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @version $Revision$
*/
public class ActiveMQReplyToHeaderUsingConverterTest extends ContextTestSupport {
private static final transient Log LOG = LogFactory.getLog(ActiveMQReplyToHeaderUsingConverterTest.class);
protected Object expectedBody = "<time>" + new Date() + "</time>";
protected String replyQueueName = "queue://test.my.reply.queue";
protected String correlationID = "ABC-123";
protected String groupID = "GROUP-XYZ";
protected String messageType = getClass().getName();
public void testSendingAMessageFromCamelSetsCustomJmsHeaders() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedBodiesReceived(expectedBody);
AssertionClause firstMessage = resultEndpoint.message(0);
firstMessage.header("cheese").isEqualTo(123);
firstMessage.header("JMSCorrelationID").isEqualTo(correlationID);
firstMessage.header("JMSReplyTo").isEqualTo(ActiveMQConverter.toDestination(replyQueueName));
firstMessage.header("JMSType").isEqualTo(messageType);
firstMessage.header("JMSXGroupID").isEqualTo(groupID);
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("cheese", 123);
headers.put("JMSReplyTo", replyQueueName);
headers.put("JMSCorrelationID", correlationID);
headers.put("JMSType", messageType);
headers.put("JMSXGroupID", groupID);
template.sendBodyAndHeaders("activemq:test.a", expectedBody, headers);
resultEndpoint.assertIsSatisfied();
List<Exchange> list = resultEndpoint.getReceivedExchanges();
Exchange exchange = list.get(0);
Message in = exchange.getIn();
Object replyTo = in.getHeader("JMSReplyTo");
LOG.info("Reply to is: " + replyTo);
Destination destination = assertIsInstanceOf(Destination.class, replyTo);
assertEquals("ReplyTo", replyQueueName, destination.toString());
assertMessageHeader(in, "cheese", 123);
assertMessageHeader(in, "JMSCorrelationID", correlationID);
assertMessageHeader(in, "JMSType", messageType);
assertMessageHeader(in, "JMSXGroupID", groupID);
}
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
// START SNIPPET: example
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
// END SNIPPET: example
return camelContext;
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("activemq:test.a").to("activemq:test.b");
from("activemq:test.b").to("mock:result");
}
};
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
*/
public class ActiveMQRouteTest extends ContextTestSupport {
protected MockEndpoint resultEndpoint;
protected String startEndpointUri = "activemq:queue:test.a";
public void testJmsRouteWithTextMessage() throws Exception {
String expectedBody = "Hello there!";
resultEndpoint.expectedBodiesReceived(expectedBody);
resultEndpoint.message(0).header("cheese").isEqualTo(123);
sendExchange(expectedBody);
resultEndpoint.assertIsSatisfied();
}
protected void sendExchange(final Object expectedBody) {
template.sendBodyAndHeader(startEndpointUri, expectedBody, "cheese", 123);
}
@Override
protected void setUp() throws Exception {
super.setUp();
resultEndpoint = (MockEndpoint) context.getEndpoint("mock:result");
}
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
// START SNIPPET: example
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
// END SNIPPET: example
return camelContext;
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from(startEndpointUri).to("activemq:queue:test.b");
from("activemq:queue:test.b").to("mock:result");
JmsEndpoint endpoint1 = (JmsEndpoint) endpoint("activemq:topic:quote.IONA");
endpoint1.getConfiguration().setTransacted(true);
from(endpoint1).to("mock:transactedClient");
JmsEndpoint endpoint2 = (JmsEndpoint) endpoint("activemq:topic:quote.IONA");
endpoint1.getConfiguration().setTransacted(true);
from(endpoint2).to("mock:nonTrasnactedClient");
}
};
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.io.File;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
/**
* @version $Revision$
*/
public class JournalConfigureTest extends ContextTestSupport {
public void testDefaltConfig() throws Exception {
JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test");
assertEquals("directory", new File("target", "test"), endpoint.getDirectory());
assertEquals("syncConsume", false, endpoint.isSyncConsume());
assertEquals("syncProduce", true, endpoint.isSyncProduce());
}
public void testConfigViaOptions() throws Exception {
JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test?syncConsume=true&syncProduce=false");
assertEquals("directory", new File("target", "test"), endpoint.getDirectory());
assertEquals("syncConsume", true, endpoint.isSyncConsume());
assertEquals("syncProduce", false, endpoint.isSyncProduce());
}
@Override
protected JournalEndpoint resolveMandatoryEndpoint(String uri) {
Endpoint endpoint = super.resolveMandatoryEndpoint(uri);
return assertIsInstanceOf(JournalEndpoint.class, endpoint);
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
/**
* Used to get an idea of what kind of performance can be expected from
* the journal.
*
* @version $Revision$
*/
public class JournalRoutePerformance extends ContextTestSupport {
AtomicLong produceCounter = new AtomicLong();
AtomicLong consumeCounter = new AtomicLong();
AtomicBoolean running = new AtomicBoolean(true);
public void testPerformance() throws Exception {
int payLoadSize = 1024;
int concurrentProducers = 50;
long delayBetweenSample = 1000;
long perfTestDuration = 1000 * 60; // 1 min
StringBuffer t = new StringBuffer();
for (int i = 0; i < payLoadSize; i++) {
t.append('a' + (i % 26));
}
final byte[] payload = t.toString().getBytes("UTF-8");
for (int i = 0; i < concurrentProducers; i++) {
Thread thread = new Thread("Producer: " + i) {
@Override
public void run() {
while (running.get()) {
template.sendBody("direct:in", payload);
produceCounter.incrementAndGet();
}
}
};
thread.start();
}
long produceTotal = 0;
long consumeTotal = 0;
long start = System.currentTimeMillis();
long end = start + perfTestDuration;
while (System.currentTimeMillis() < end) {
Thread.sleep(delayBetweenSample);
long totalTime = System.currentTimeMillis() - start;
long p = produceCounter.getAndSet(0);
long c = consumeCounter.getAndSet(0);
produceTotal += p;
consumeTotal += c;
System.out.println("Interval Produced " + stat(p, delayBetweenSample) + " m/s, Consumed " + stat(c, delayBetweenSample) + " m/s");
System.out.println("Total Produced " + stat(produceTotal, totalTime) + " m/s, Consumed " + stat(consumeTotal, totalTime) + " m/s");
}
running.set(false);
}
private String stat(long pd, long delayBetweenSample) {
return "" + (1.0 * pd / delayBetweenSample) * 1000.0;
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:in").to("activemq.journal:target/perf-test");
from("activemq.journal:target/perf-test").process(new Processor() {
public void process(Exchange exchange) throws Exception {
consumeCounter.incrementAndGet();
}
});
}
};
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.util.List;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.AssertionClause;
import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
*/
public class JournalRouteTest extends ContextTestSupport {
public void testSimpleJournalRoute() throws Exception {
byte[] payload = "Hello World".getBytes();
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:out", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
AssertionClause firstMessageExpectations = resultEndpoint.message(0);
firstMessageExpectations.header("journal").isEqualTo("activemq.journal:target/test.a");
firstMessageExpectations.header("location").isNotNull();
firstMessageExpectations.body().isInstanceOf(ByteSequence.class);
template.sendBody("direct:in", payload);
resultEndpoint.assertIsSatisfied();
List<Exchange> list = resultEndpoint.getReceivedExchanges();
Exchange exchange = list.get(0);
ByteSequence body = (ByteSequence)exchange.getIn().getBody();
body.compact(); // trims the byte array to the actual size.
assertEquals("body", new String(payload), new String(body.data));
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:in").to("activemq.journal:target/test.a");
from("activemq.journal:target/test.a").to("mock:out");
}
};
}
}

View File

@ -116,10 +116,6 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-activemq</artifactId>
</dependency>
<!-- for the XML parsing -->
<dependency>
<groupId>javax.xml</groupId>

13
pom.xml
View File

@ -400,19 +400,6 @@
<artifactId>camel-jms</artifactId>
<version>${camel-version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-activemq</artifactId>
<version>${camel-version}</version>
<!-- lets swap out the version of AMQ that camel released with -->
<exclusions>
<exclusion>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- for the XML parsing -->
<dependency>