Removed the journal component as its dependent on the now deprecated activemq-amq-store code.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1431492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-10 16:43:21 +00:00
parent 20d3bdb4eb
commit 6e36321669
6 changed files with 5 additions and 499 deletions

View File

@ -70,10 +70,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-spring</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-amq-store</artifactId>
</dependency>
<!-- to use AMQ with Camel efficient we would need to use pooling -->
<dependency>
@ -194,7 +190,6 @@
<optional>true</optional>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -207,7 +202,6 @@
<childDelegation>false</childDelegation>
<useFile>true</useFile>
<argLine>-Xmx512M</argLine>
<systemProperties>
<property>
<name>org.apache.activemq.default.directory.prefix</name>

View File

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultComponent;
import java.io.File;
import java.util.Map;
/**
* The <a href="http://camel.apache.org/activemq-journal.html">ActiveMQ Journal Component</a>
*
*
*/
public class JournalComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
JournalEndpoint endpoint = new JournalEndpoint(uri, this, new File(remaining));
setProperties(endpoint, parameters);
return endpoint;
}
}

View File

@ -1,229 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JournalEndpoint extends DefaultEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(JournalEndpoint.class);
private final File directory;
private final AtomicReference<DefaultConsumer> consumer = new AtomicReference<DefaultConsumer>();
private final Object activationMutex = new Object();
private int referenceCount;
private AsyncDataManager dataManager;
private Thread thread;
private Location lastReadLocation;
private long idleDelay = 1000;
private boolean syncProduce = true;
private boolean syncConsume;
public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
super(uri, journalComponent.getCamelContext());
this.directory = directory;
}
public JournalEndpoint(String endpointUri, File directory) {
super(endpointUri);
this.directory = directory;
}
public boolean isSingleton() {
return true;
}
public File getDirectory() {
return directory;
}
public Consumer createConsumer(Processor processor) throws Exception {
return new DefaultConsumer(this, processor) {
@Override
public void start() throws Exception {
super.start();
activateConsumer(this);
}
@Override
public void stop() throws Exception {
deactivateConsumer(this);
super.stop();
}
};
}
protected void decrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount--;
if (referenceCount == 0) {
LOG.debug("Closing data manager: " + directory);
LOG.debug("Last mark at: " + lastReadLocation);
dataManager.close();
dataManager = null;
}
}
}
protected void incrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount++;
if (referenceCount == 1) {
LOG.debug("Opening data manager: " + directory);
dataManager = new AsyncDataManager();
dataManager.setDirectory(directory);
dataManager.start();
lastReadLocation = dataManager.getMark();
LOG.debug("Last mark at: " + lastReadLocation);
}
}
}
protected void deactivateConsumer(DefaultConsumer consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != consumer) {
throw new RuntimeCamelException("Consumer was not active.");
}
this.consumer.set(null);
try {
thread.join();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
decrementReference();
}
}
protected void activateConsumer(DefaultConsumer consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != null) {
throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
}
incrementReference();
this.consumer.set(consumer);
thread = new Thread() {
@Override
public void run() {
dispatchToConsumer();
}
};
thread.setName("Dipatch thread: " + getEndpointUri());
thread.setDaemon(true);
thread.start();
}
}
protected void dispatchToConsumer() {
try {
DefaultConsumer consumer;
while ((consumer = this.consumer.get()) != null) {
// See if there is a new record to process
Location location = dataManager.getNextLocation(lastReadLocation);
if (location != null) {
// Send it on.
ByteSequence read = dataManager.read(location);
Exchange exchange = createExchange();
exchange.getIn().setBody(read);
exchange.getIn().setHeader("journal", getEndpointUri());
exchange.getIn().setHeader("location", location);
consumer.getProcessor().process(exchange);
// Setting the mark makes the data manager forget about
// everything
// before that record.
if (LOG.isDebugEnabled()) {
LOG.debug("Consumed record at: " + location);
}
dataManager.setMark(location, syncConsume);
lastReadLocation = location;
} else {
// Avoid a tight CPU loop if there is no new record to read.
LOG.debug("Sleeping due to no records being available.");
Thread.sleep(idleDelay);
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}
public Producer createProducer() throws Exception {
return new DefaultProducer(this) {
public void process(Exchange exchange) throws Exception {
incrementReference();
try {
ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
if (body == null) {
byte[] bytes = exchange.getIn().getBody(byte[].class);
if (bytes != null) {
body = new ByteSequence(bytes);
}
}
if (body == null) {
throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
}
dataManager.write(body, syncProduce);
} finally {
decrementReference();
}
}
};
}
public boolean isSyncConsume() {
return syncConsume;
}
public void setSyncConsume(boolean syncConsume) {
this.syncConsume = syncConsume;
}
public boolean isSyncProduce() {
return syncProduce;
}
public void setSyncProduce(boolean syncProduce) {
this.syncProduce = syncProduce;
}
boolean isOpen() {
synchronized (activationMutex) {
return referenceCount > 0;
}
}
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.io.File;
import org.apache.camel.Endpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
/**
*
*/
public class JournalConfigureTest extends CamelTestSupport {
@Test
public void testDefaltConfig() throws Exception {
JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test");
assertEquals("directory", new File("target", "test"), endpoint.getDirectory());
assertEquals("syncConsume", false, endpoint.isSyncConsume());
assertEquals("syncProduce", true, endpoint.isSyncProduce());
}
@Test
public void testConfigViaOptions() throws Exception {
JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test?syncConsume=true&syncProduce=false");
assertEquals("directory", new File("target", "test"), endpoint.getDirectory());
assertEquals("syncConsume", true, endpoint.isSyncConsume());
assertEquals("syncProduce", false, endpoint.isSyncProduce());
}
@Override
protected JournalEndpoint resolveMandatoryEndpoint(String uri) {
Endpoint endpoint = super.resolveMandatoryEndpoint(uri);
return assertIsInstanceOf(JournalEndpoint.class, endpoint);
}
}

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
/**
* Used to get an idea of what kind of performance can be expected from
* the journal.
*
*
*/
public class JournalRoutePerformance extends CamelTestSupport {
AtomicLong produceCounter = new AtomicLong();
AtomicLong consumeCounter = new AtomicLong();
AtomicBoolean running = new AtomicBoolean(true);
@Test
public void testPerformance() throws Exception {
int payLoadSize = 1024;
int concurrentProducers = 50;
long delayBetweenSample = 1000;
long perfTestDuration = 1000 * 60; // 1 min
StringBuffer t = new StringBuffer();
for (int i = 0; i < payLoadSize; i++) {
t.append('a' + (i % 26));
}
final byte[] payload = t.toString().getBytes("UTF-8");
for (int i = 0; i < concurrentProducers; i++) {
Thread thread = new Thread("Producer: " + i) {
@Override
public void run() {
while (running.get()) {
template.sendBody("direct:in", payload);
produceCounter.incrementAndGet();
}
}
};
thread.start();
}
long produceTotal = 0;
long consumeTotal = 0;
long start = System.currentTimeMillis();
long end = start + perfTestDuration;
while (System.currentTimeMillis() < end) {
Thread.sleep(delayBetweenSample);
long totalTime = System.currentTimeMillis() - start;
long p = produceCounter.getAndSet(0);
long c = consumeCounter.getAndSet(0);
produceTotal += p;
consumeTotal += c;
System.out.println("Interval Produced " + stat(p, delayBetweenSample) + " m/s, Consumed " + stat(c, delayBetweenSample) + " m/s");
System.out.println("Total Produced " + stat(produceTotal, totalTime) + " m/s, Consumed " + stat(consumeTotal, totalTime) + " m/s");
}
running.set(false);
}
private String stat(long pd, long delayBetweenSample) {
return "" + (1.0 * pd / delayBetweenSample) * 1000.0;
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:in").to("activemq.journal:target/perf-test");
from("activemq.journal:target/perf-test").process(new Processor() {
public void process(Exchange exchange) throws Exception {
consumeCounter.incrementAndGet();
}
});
}
};
}
}

View File

@ -1,67 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.util.List;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.AssertionClause;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
/**
*
*/
public class JournalRouteTest extends CamelTestSupport {
@Test
public void testSimpleJournalRoute() throws Exception {
byte[] payload = "Hello World".getBytes();
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:out", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
AssertionClause firstMessageExpectations = resultEndpoint.message(0);
firstMessageExpectations.header("journal").isEqualTo("activemq.journal://target/test.a");
firstMessageExpectations.header("location").isNotNull();
firstMessageExpectations.body().isInstanceOf(ByteSequence.class);
template.sendBody("direct:in", payload);
resultEndpoint.assertIsSatisfied();
List<Exchange> list = resultEndpoint.getReceivedExchanges();
Exchange exchange = list.get(0);
ByteSequence body = (ByteSequence)exchange.getIn().getBody();
body.compact(); // trims the byte array to the actual size.
assertEquals("body", new String(payload), new String(body.data));
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:in").to("activemq.journal:target/test.a");
from("activemq.journal:target/test.a").to("mock:out");
}
};
}
}