diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java new file mode 100644 index 0000000000..17e5999450 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.jms.JmsComponent; +import org.apache.camel.component.jms.JmsConfiguration; + +/** + * The ActiveMQ Component + * + * @version $Revision$ + */ +public class ActiveMQComponent extends JmsComponent { + /** + * Creates an ActiveMQ Component + * + * @return the created component + */ + public static ActiveMQComponent activeMQComponent() { + return new ActiveMQComponent(); + } + + /** + * Creates an ActiveMQ Component + * connecting to the given broker URL + * + * @param brokerURL the URL to connect to + * @return the created component + */ + public static ActiveMQComponent activeMQComponent(String brokerURL) { + ActiveMQComponent answer = new ActiveMQComponent(); + answer.getConfiguration().setBrokerURL(brokerURL); + return answer; + } + + public ActiveMQComponent() { + } + + public ActiveMQComponent(CamelContext context) { + super(context); + } + + public ActiveMQComponent(ActiveMQConfiguration configuration) { + super(configuration); + } + + @Override + public ActiveMQConfiguration getConfiguration() { + return (ActiveMQConfiguration) super.getConfiguration(); + } + + public void setBrokerURL(String brokerURL) { + getConfiguration().setBrokerURL(brokerURL); + } + + + @Override + protected JmsConfiguration createConfiguration() { + return new ActiveMQConfiguration(); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java new file mode 100644 index 0000000000..38a16b3bbc --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.activemq.spring.ActiveMQConnectionFactory; +import org.apache.camel.component.jms.JmsConfiguration; + +import javax.jms.ConnectionFactory; + +/** + * @version $Revision$ + */ +public class ActiveMQConfiguration extends JmsConfiguration { + private String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; + + public ActiveMQConfiguration() { + } + + public String getBrokerURL() { + return brokerURL; + } + + /** + * Sets the broker URL to use to connect to ActiveMQ using the + * ActiveMQ URI format + * + * @param brokerURL the URL of the broker. + */ + public void setBrokerURL(String brokerURL) { + this.brokerURL = brokerURL; + } + + @Override + public ActiveMQConnectionFactory getListenerConnectionFactory() { + return (ActiveMQConnectionFactory) super.getListenerConnectionFactory(); + } + + @Override + public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) { + if (listenerConnectionFactory instanceof ActiveMQConnectionFactory) { + super.setListenerConnectionFactory(listenerConnectionFactory); + } + else { + throw new IllegalArgumentException("ConnectionFactory " + listenerConnectionFactory + + " is not an instanceof " + ActiveMQConnectionFactory.class.getName()); + } + } + + @Override + protected ConnectionFactory createListenerConnectionFactory() { + ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(); + answer.setBrokerURL(getBrokerURL()); + return answer; + } + + @Override + protected ConnectionFactory createTemplateConnectionFactory() { + return new PooledConnectionFactory(getListenerConnectionFactory()); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java new file mode 100644 index 0000000000..1dfc66d18f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.camel.Converter; + +/** + * @version $Revision$ + */ +@Converter +public class ActiveMQConverter { + /** + * Converts a URL in ActiveMQ syntax to a destination such as to support + * "queue://foo.bar" or 'topic://bar.whatnot". Things default to queues if no scheme. + * + * This allows ActiveMQ destinations to be passed around as Strings and converted back again. + * + * @param name is the name of the queue or the full URI using prefixes queue:// or topic:// + * @return the ActiveMQ destination + */ + @Converter + public static ActiveMQDestination toDestination(String name) { + return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java new file mode 100644 index 0000000000..b41befcc13 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.*; +import org.apache.camel.Converter; +import org.apache.camel.Exchange; +import org.apache.camel.component.jms.JmsBinding; + +import javax.jms.MessageNotWriteableException; +import javax.jms.JMSException; +import java.io.Serializable; + +/** + * @version $Revision$ + */ +@Converter +public class ActiveMQMessageConverter { + private JmsBinding binding = new JmsBinding(); + + /** + * Converts the inbound message exchange to an ActiveMQ JMS message + * + * @return the ActiveMQ message + */ + @Converter + public ActiveMQMessage toMessage(Exchange exchange) throws JMSException { + ActiveMQMessage message = createActiveMQMessage(exchange); + getBinding().appendJmsProperties(message, exchange); + return message; + } + + private static ActiveMQMessage createActiveMQMessage(Exchange exchange) throws JMSException { + Object body = exchange.getIn().getBody(); + if (body instanceof String) { + ActiveMQTextMessage answer = new ActiveMQTextMessage(); + answer.setText((String) body); + return answer; + } else if (body instanceof Serializable) { + ActiveMQObjectMessage answer = new ActiveMQObjectMessage(); + answer.setObject((Serializable) body); + return answer; + } else { + return new ActiveMQMessage(); + } + + } + + // Properties + //------------------------------------------------------------------------- + public JmsBinding getBinding() { + return binding; + } + + public void setBinding(JmsBinding binding) { + this.binding = binding; + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java new file mode 100644 index 0000000000..8af77eec91 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultComponent; + +import java.io.File; +import java.util.Map; + +/** + * The ActiveMQ Component + * + * @version $Revision$ + */ +public class JournalComponent extends DefaultComponent { + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { + JournalEndpoint endpoint = new JournalEndpoint(uri, this, new File(remaining)); + setProperties(endpoint, parameters); + return endpoint; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java new file mode 100644 index 0000000000..8baf58a3c9 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import java.io.File; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.kaha.impl.async.AsyncDataManager; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.util.ByteSequence; +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.ExchangePattern; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultProducer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class JournalEndpoint extends DefaultEndpoint { + + private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class); + + private final File directory; + private final AtomicReference> consumer = new AtomicReference>(); + private final Object activationMutex = new Object(); + private int referenceCount; + private AsyncDataManager dataManager; + private Thread thread; + private Location lastReadLocation; + private long idleDelay = 1000; + private boolean syncProduce = true; + private boolean syncConsume; + + public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) { + super(uri, journalComponent.getCamelContext()); + this.directory = directory; + } + + public boolean isSingleton() { + return true; + } + + public File getDirectory() { + return directory; + } + + public Consumer createConsumer(Processor processor) throws Exception { + return new DefaultConsumer(this, processor) { + @Override + public void start() throws Exception { + super.start(); + activateConsumer(this); + } + + @Override + public void stop() throws Exception { + deactivateConsumer(this); + super.stop(); + } + }; + } + + protected void decrementReference() throws IOException { + synchronized (activationMutex) { + referenceCount--; + if (referenceCount == 0) { + LOG.debug("Closing data manager: " + directory); + LOG.debug("Last mark at: " + lastReadLocation); + dataManager.close(); + dataManager = null; + } + } + } + + protected void incrementReference() throws IOException { + synchronized (activationMutex) { + referenceCount++; + if (referenceCount == 1) { + LOG.debug("Opening data manager: " + directory); + dataManager = new AsyncDataManager(); + dataManager.setDirectory(directory); + dataManager.start(); + + lastReadLocation = dataManager.getMark(); + LOG.debug("Last mark at: " + lastReadLocation); + } + } + } + + protected void deactivateConsumer(DefaultConsumer consumer) throws IOException { + synchronized (activationMutex) { + if (this.consumer.get() != consumer) { + throw new RuntimeCamelException("Consumer was not active."); + } + this.consumer.set(null); + try { + thread.join(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + decrementReference(); + } + } + + protected void activateConsumer(DefaultConsumer consumer) throws IOException { + synchronized (activationMutex) { + if (this.consumer.get() != null) { + throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer"); + } + incrementReference(); + this.consumer.set(consumer); + thread = new Thread() { + @Override + public void run() { + dispatchToConsumer(); + } + }; + thread.setName("Dipatch thread: " + getEndpointUri()); + thread.setDaemon(true); + thread.start(); + } + } + + protected void dispatchToConsumer() { + try { + DefaultConsumer consumer; + while ((consumer = this.consumer.get()) != null) { + // See if there is a new record to process + Location location = dataManager.getNextLocation(lastReadLocation); + if (location != null) { + + // Send it on. + ByteSequence read = dataManager.read(location); + Exchange exchange = createExchange(); + exchange.getIn().setBody(read); + exchange.getIn().setHeader("journal", getEndpointUri()); + exchange.getIn().setHeader("location", location); + consumer.getProcessor().process(exchange); + + // Setting the mark makes the data manager forget about + // everything + // before that record. + if (LOG.isDebugEnabled()) { + LOG.debug("Consumed record at: " + location); + } + dataManager.setMark(location, syncConsume); + lastReadLocation = location; + } else { + // Avoid a tight CPU loop if there is no new record to read. + LOG.debug("Sleeping due to no records being available."); + Thread.sleep(idleDelay); + } + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + + public Producer createProducer() throws Exception { + return new DefaultProducer(this) { + public void process(Exchange exchange) throws Exception { + incrementReference(); + try { + + ByteSequence body = exchange.getIn().getBody(ByteSequence.class); + if (body == null) { + byte[] bytes = exchange.getIn().getBody(byte[].class); + if (bytes != null) { + body = new ByteSequence(bytes); + } + } + if (body == null) { + throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange); + } + dataManager.write(body, syncProduce); + + } finally { + decrementReference(); + } + } + }; + } + + public boolean isSyncConsume() { + return syncConsume; + } + + public void setSyncConsume(boolean syncConsume) { + this.syncConsume = syncConsume; + } + + public boolean isSyncProduce() { + return syncProduce; + } + + public void setSyncProduce(boolean syncProduce) { + this.syncProduce = syncProduce; + } + + boolean isOpen() { + synchronized (activationMutex) { + return referenceCount > 0; + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html b/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html new file mode 100644 index 0000000000..1be6dbd70d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html @@ -0,0 +1,25 @@ + + + + + + +Defines the ActiveMQ Component + + + diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter new file mode 100644 index 0000000000..9ef205b4ad --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.activemq.camel.component \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq b/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq new file mode 100644 index 0000000000..7df5d454bd --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.activemq.camel.component.ActiveMQComponent diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal b/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal new file mode 100644 index 0000000000..bac5c46213 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.activemq.camel.component.JournalComponent diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java new file mode 100644 index 0000000000..a63b362a02 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.activemq.spring.ActiveMQConnectionFactory; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.component.jms.JmsConsumer; +import org.apache.camel.component.jms.JmsEndpoint; +import org.apache.camel.component.jms.JmsProducer; +import org.apache.camel.processor.Logger; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.listener.AbstractMessageListenerContainer; + +/** + * @version $Revision$ + */ +public class ActiveMQConfigureTest extends ContextTestSupport { + + public void testJmsTemplateUsesPoolingConnectionFactory() throws Exception { + JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo"); + JmsProducer producer = endpoint.createProducer(); + + JmsTemplate template = assertIsInstanceOf(JmsTemplate.class, producer.getTemplate()); + assertIsInstanceOf(PooledConnectionFactory.class, template.getConnectionFactory()); + assertEquals("pubSubDomain", false, template.isPubSubDomain()); + } + + public void testListenerContainerUsesSpringConnectionFactory() throws Exception { + JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:topic:test.foo"); + JmsConsumer consumer = endpoint.createConsumer(new Logger()); + + AbstractMessageListenerContainer listenerContainer = consumer.getListenerContainer(); + assertIsInstanceOf(ActiveMQConnectionFactory.class, listenerContainer.getConnectionFactory()); + assertEquals("pubSubDomain", true, listenerContainer.isPubSubDomain()); + + } + + @Override + protected JmsEndpoint resolveMandatoryEndpoint(String uri) { + Endpoint endpoint = super.resolveMandatoryEndpoint(uri); + return assertIsInstanceOf(JmsEndpoint.class, endpoint); + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java new file mode 100644 index 0000000000..d33417415d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; +import org.apache.camel.component.jms.JmsExchange; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.mock.AssertionClause; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Message; +import javax.jms.Destination; +import java.util.Date; +import java.util.List; + +/** + * @version $Revision$ + */ +public class ActiveMQJmsHeaderRouteTest extends ContextTestSupport { + private static final transient Log LOG = LogFactory.getLog(ActiveMQJmsHeaderRouteTest.class); + + protected Object expectedBody = ""; + protected ActiveMQQueue replyQueue = new ActiveMQQueue("test.reply.queue"); + protected String correlationID = "ABC-123"; + protected String messageType = getClass().getName(); + + public void testForwardingAMessageAcrossJMSKeepingCustomJMSHeaders() throws Exception { + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); + + resultEndpoint.expectedBodiesReceived(expectedBody); + AssertionClause firstMessageExpectations = resultEndpoint.message(0); + firstMessageExpectations.header("cheese").isEqualTo(123); + firstMessageExpectations.header("JMSReplyTo").isEqualTo(replyQueue); + firstMessageExpectations.header("JMSCorrelationID").isEqualTo(correlationID); + firstMessageExpectations.header("JMSType").isEqualTo(messageType); + + template.sendBodyAndHeader("activemq:test.a", expectedBody, "cheese", 123); + + resultEndpoint.assertIsSatisfied(); + + List list = resultEndpoint.getReceivedExchanges(); + Exchange exchange = list.get(0); + Object replyTo = exchange.getIn().getHeader("JMSReplyTo"); + LOG.info("Reply to is: " + replyTo); + Destination destination = assertIsInstanceOf(Destination.class, replyTo); + assertEquals("ReplyTo", replyQueue.toString(), destination.toString()); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + // START SNIPPET: example + camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); + // END SNIPPET: example + + return camelContext; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("activemq:test.a").process(new Processor() { + public void process(Exchange exchange) throws Exception { + // lets set the custom JMS headers using the JMS API + JmsExchange jmsExchange = assertIsInstanceOf(JmsExchange.class, exchange); + Message inMessage = jmsExchange.getInMessage(); + inMessage.setJMSReplyTo(replyQueue); + inMessage.setJMSCorrelationID(correlationID); + inMessage.setJMSType(messageType); + } + }).to("activemq:test.b"); + + from("activemq:test.b").to("mock:result"); + } + }; + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java new file mode 100644 index 0000000000..710a56f642 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.builder.RouteBuilder; +import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; +import org.apache.camel.component.mock.AssertionClause; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Destination; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @version $Revision$ + */ +public class ActiveMQReplyToHeaderUsingConverterTest extends ContextTestSupport { + private static final transient Log LOG = LogFactory.getLog(ActiveMQReplyToHeaderUsingConverterTest.class); + + protected Object expectedBody = ""; + protected String replyQueueName = "queue://test.my.reply.queue"; + protected String correlationID = "ABC-123"; + protected String groupID = "GROUP-XYZ"; + protected String messageType = getClass().getName(); + + public void testSendingAMessageFromCamelSetsCustomJmsHeaders() throws Exception { + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); + + resultEndpoint.expectedBodiesReceived(expectedBody); + AssertionClause firstMessage = resultEndpoint.message(0); + firstMessage.header("cheese").isEqualTo(123); + firstMessage.header("JMSCorrelationID").isEqualTo(correlationID); + firstMessage.header("JMSReplyTo").isEqualTo(ActiveMQConverter.toDestination(replyQueueName)); + firstMessage.header("JMSType").isEqualTo(messageType); + firstMessage.header("JMSXGroupID").isEqualTo(groupID); + + Map headers = new HashMap(); + headers.put("cheese", 123); + headers.put("JMSReplyTo", replyQueueName); + headers.put("JMSCorrelationID", correlationID); + headers.put("JMSType", messageType); + headers.put("JMSXGroupID", groupID); + template.sendBodyAndHeaders("activemq:test.a", expectedBody, headers); + + resultEndpoint.assertIsSatisfied(); + + List list = resultEndpoint.getReceivedExchanges(); + Exchange exchange = list.get(0); + Message in = exchange.getIn(); + Object replyTo = in.getHeader("JMSReplyTo"); + LOG.info("Reply to is: " + replyTo); + Destination destination = assertIsInstanceOf(Destination.class, replyTo); + assertEquals("ReplyTo", replyQueueName, destination.toString()); + + assertMessageHeader(in, "cheese", 123); + assertMessageHeader(in, "JMSCorrelationID", correlationID); + assertMessageHeader(in, "JMSType", messageType); + assertMessageHeader(in, "JMSXGroupID", groupID); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + // START SNIPPET: example + camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); + // END SNIPPET: example + + return camelContext; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("activemq:test.a").to("activemq:test.b"); + + from("activemq:test.b").to("mock:result"); + } + }; + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java new file mode 100644 index 0000000000..e028e2654e --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; +import org.apache.camel.component.jms.JmsEndpoint; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version $Revision$ + */ +public class ActiveMQRouteTest extends ContextTestSupport { + protected MockEndpoint resultEndpoint; + protected String startEndpointUri = "activemq:queue:test.a"; + + public void testJmsRouteWithTextMessage() throws Exception { + String expectedBody = "Hello there!"; + + resultEndpoint.expectedBodiesReceived(expectedBody); + resultEndpoint.message(0).header("cheese").isEqualTo(123); + + sendExchange(expectedBody); + + resultEndpoint.assertIsSatisfied(); + } + + protected void sendExchange(final Object expectedBody) { + template.sendBodyAndHeader(startEndpointUri, expectedBody, "cheese", 123); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + resultEndpoint = (MockEndpoint) context.getEndpoint("mock:result"); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + // START SNIPPET: example + camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); + // END SNIPPET: example + + return camelContext; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from(startEndpointUri).to("activemq:queue:test.b"); + from("activemq:queue:test.b").to("mock:result"); + + JmsEndpoint endpoint1 = (JmsEndpoint) endpoint("activemq:topic:quote.IONA"); + endpoint1.getConfiguration().setTransacted(true); + from(endpoint1).to("mock:transactedClient"); + + JmsEndpoint endpoint2 = (JmsEndpoint) endpoint("activemq:topic:quote.IONA"); + endpoint1.getConfiguration().setTransacted(true); + from(endpoint2).to("mock:nonTrasnactedClient"); + } + }; + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java new file mode 100644 index 0000000000..6572953bfd --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import java.io.File; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; + +/** + * @version $Revision$ + */ +public class JournalConfigureTest extends ContextTestSupport { + + public void testDefaltConfig() throws Exception { + JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test"); + assertEquals("directory", new File("target", "test"), endpoint.getDirectory()); + assertEquals("syncConsume", false, endpoint.isSyncConsume()); + assertEquals("syncProduce", true, endpoint.isSyncProduce()); + } + + public void testConfigViaOptions() throws Exception { + JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test?syncConsume=true&syncProduce=false"); + assertEquals("directory", new File("target", "test"), endpoint.getDirectory()); + assertEquals("syncConsume", true, endpoint.isSyncConsume()); + assertEquals("syncProduce", false, endpoint.isSyncProduce()); + } + + @Override + protected JournalEndpoint resolveMandatoryEndpoint(String uri) { + Endpoint endpoint = super.resolveMandatoryEndpoint(uri); + return assertIsInstanceOf(JournalEndpoint.class, endpoint); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java new file mode 100644 index 0000000000..68d337714e --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * Used to get an idea of what kind of performance can be expected from + * the journal. + * + * @version $Revision$ + */ +public class JournalRoutePerformance extends ContextTestSupport { + + AtomicLong produceCounter = new AtomicLong(); + AtomicLong consumeCounter = new AtomicLong(); + AtomicBoolean running = new AtomicBoolean(true); + + public void testPerformance() throws Exception { + + int payLoadSize = 1024; + int concurrentProducers = 50; + long delayBetweenSample = 1000; + long perfTestDuration = 1000 * 60; // 1 min + + StringBuffer t = new StringBuffer(); + for (int i = 0; i < payLoadSize; i++) { + t.append('a' + (i % 26)); + } + final byte[] payload = t.toString().getBytes("UTF-8"); + + for (int i = 0; i < concurrentProducers; i++) { + Thread thread = new Thread("Producer: " + i) { + @Override + public void run() { + while (running.get()) { + template.sendBody("direct:in", payload); + produceCounter.incrementAndGet(); + } + } + }; + thread.start(); + } + + long produceTotal = 0; + long consumeTotal = 0; + long start = System.currentTimeMillis(); + long end = start + perfTestDuration; + while (System.currentTimeMillis() < end) { + Thread.sleep(delayBetweenSample); + long totalTime = System.currentTimeMillis() - start; + long p = produceCounter.getAndSet(0); + long c = consumeCounter.getAndSet(0); + produceTotal += p; + consumeTotal += c; + System.out.println("Interval Produced " + stat(p, delayBetweenSample) + " m/s, Consumed " + stat(c, delayBetweenSample) + " m/s"); + System.out.println("Total Produced " + stat(produceTotal, totalTime) + " m/s, Consumed " + stat(consumeTotal, totalTime) + " m/s"); + } + running.set(false); + + } + + private String stat(long pd, long delayBetweenSample) { + return "" + (1.0 * pd / delayBetweenSample) * 1000.0; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:in").to("activemq.journal:target/perf-test"); + from("activemq.journal:target/perf-test").process(new Processor() { + public void process(Exchange exchange) throws Exception { + consumeCounter.incrementAndGet(); + } + }); + } + }; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java new file mode 100644 index 0000000000..19b7955689 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import java.util.List; + +import org.apache.activemq.util.ByteSequence; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.AssertionClause; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version $Revision$ + */ +public class JournalRouteTest extends ContextTestSupport { + + public void testSimpleJournalRoute() throws Exception { + + byte[] payload = "Hello World".getBytes(); + + + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:out", MockEndpoint.class); + resultEndpoint.expectedMessageCount(1); + + AssertionClause firstMessageExpectations = resultEndpoint.message(0); + firstMessageExpectations.header("journal").isEqualTo("activemq.journal:target/test.a"); + firstMessageExpectations.header("location").isNotNull(); + firstMessageExpectations.body().isInstanceOf(ByteSequence.class); + + template.sendBody("direct:in", payload); + + resultEndpoint.assertIsSatisfied(); + + List list = resultEndpoint.getReceivedExchanges(); + Exchange exchange = list.get(0); + ByteSequence body = (ByteSequence)exchange.getIn().getBody(); + body.compact(); // trims the byte array to the actual size. + assertEquals("body", new String(payload), new String(body.data)); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:in").to("activemq.journal:target/test.a"); + from("activemq.journal:target/test.a").to("mock:out"); + } + }; + } +} \ No newline at end of file diff --git a/assembly/pom.xml b/assembly/pom.xml index 6763b2b423..c7c2833090 100755 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -116,10 +116,6 @@ org.apache.camel camel-jms - - org.apache.camel - camel-activemq - javax.xml diff --git a/pom.xml b/pom.xml index 3835d0ca8c..ac202596fc 100755 --- a/pom.xml +++ b/pom.xml @@ -400,21 +400,8 @@ camel-jms ${camel-version} - - org.apache.camel - camel-activemq - ${camel-version} - - - - org.apache.activemq - activemq-core - - - - - + javax.xml jaxb-api