mirror of https://github.com/apache/activemq.git
applying a modified version of that patch attached to https://issues.apache.org/activemq/browse/AMQ-1689
- This reduces the the number of disk syncs that the broker does on some workloads which increases performance - Fixed and issue where the last write to disk migh get lost on broker shutdown. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@651637 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8bf2d78cb2
commit
be47d0b392
|
@ -36,7 +36,7 @@ import org.apache.activemq.util.LinkedNode;
|
|||
class DataFileAppender {
|
||||
|
||||
protected static final byte[] RESERVED_SPACE = new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
|
||||
protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
|
||||
protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
|
||||
|
||||
protected final AsyncDataManager dataManager;
|
||||
protected final Map<WriteKey, WriteCommand> inflightWrites;
|
||||
|
@ -46,7 +46,7 @@ class DataFileAppender {
|
|||
protected boolean shutdown;
|
||||
protected IOException firstAsyncException;
|
||||
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
|
||||
protected int maxWriteBatchSize = 1024 * 1024 * 4;
|
||||
protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
|
||||
|
||||
private boolean running;
|
||||
private Thread thread;
|
||||
|
@ -311,24 +311,19 @@ class DataFileAppender {
|
|||
// Block till we get a command.
|
||||
synchronized (enqueueMutex) {
|
||||
while (true) {
|
||||
if (shutdown) {
|
||||
o = SHUTDOWN_COMMAND;
|
||||
break;
|
||||
}
|
||||
if (nextWriteBatch != null) {
|
||||
o = nextWriteBatch;
|
||||
nextWriteBatch = null;
|
||||
break;
|
||||
}
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
enqueueMutex.wait();
|
||||
}
|
||||
enqueueMutex.notify();
|
||||
}
|
||||
|
||||
if (o == SHUTDOWN_COMMAND) {
|
||||
break;
|
||||
}
|
||||
|
||||
WriteBatch wb = (WriteBatch)o;
|
||||
if (dataFile != wb.dataFile) {
|
||||
if (file != null) {
|
||||
|
@ -345,9 +340,13 @@ class DataFileAppender {
|
|||
// are in sequence.
|
||||
file.seek(write.location.getOffset());
|
||||
|
||||
|
||||
boolean forceToDisk=false;
|
||||
|
||||
//
|
||||
// is it just 1 big write?
|
||||
if (wb.size == write.location.getSize()) {
|
||||
forceToDisk = write.sync | write.onComplete!=null;
|
||||
|
||||
// Just write it directly..
|
||||
file.writeInt(write.location.getSize());
|
||||
|
@ -361,6 +360,7 @@ class DataFileAppender {
|
|||
|
||||
// Combine the smaller writes into 1 big buffer
|
||||
while (write != null) {
|
||||
forceToDisk |= write.sync | write.onComplete!=null;
|
||||
|
||||
buff.writeInt(write.location.getSize());
|
||||
buff.writeByte(write.location.getType());
|
||||
|
@ -378,14 +378,13 @@ class DataFileAppender {
|
|||
buff.reset();
|
||||
}
|
||||
|
||||
file.getFD().sync();
|
||||
if( forceToDisk ) {
|
||||
file.getFD().sync();
|
||||
}
|
||||
|
||||
WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
|
||||
dataManager.setLastAppendLocation(lastWrite.location);
|
||||
|
||||
// Signal any waiting threads that the write is on disk.
|
||||
wb.latch.countDown();
|
||||
|
||||
// Now that the data is on disk, remove the writes from the in
|
||||
// flight
|
||||
// cache.
|
||||
|
@ -403,8 +402,10 @@ class DataFileAppender {
|
|||
}
|
||||
write = (WriteCommand)write.getNext();
|
||||
}
|
||||
|
||||
// Signal any waiting threads that the write is on disk.
|
||||
wb.latch.countDown();
|
||||
}
|
||||
buff.close();
|
||||
} catch (IOException e) {
|
||||
synchronized (enqueueMutex) {
|
||||
firstAsyncException = e;
|
||||
|
|
|
@ -68,24 +68,19 @@ class NIODataFileAppender extends DataFileAppender {
|
|||
// Block till we get a command.
|
||||
synchronized (enqueueMutex) {
|
||||
while (true) {
|
||||
if (shutdown) {
|
||||
o = SHUTDOWN_COMMAND;
|
||||
break;
|
||||
}
|
||||
if (nextWriteBatch != null) {
|
||||
o = nextWriteBatch;
|
||||
nextWriteBatch = null;
|
||||
break;
|
||||
}
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
enqueueMutex.wait();
|
||||
}
|
||||
enqueueMutex.notify();
|
||||
}
|
||||
|
||||
if (o == SHUTDOWN_COMMAND) {
|
||||
break;
|
||||
}
|
||||
|
||||
WriteBatch wb = (WriteBatch)o;
|
||||
if (dataFile != wb.dataFile) {
|
||||
if (file != null) {
|
||||
|
@ -103,9 +98,13 @@ class NIODataFileAppender extends DataFileAppender {
|
|||
// are in sequence.
|
||||
file.seek(write.location.getOffset());
|
||||
|
||||
|
||||
boolean forceToDisk=false;
|
||||
|
||||
//
|
||||
// is it just 1 big write?
|
||||
if (wb.size == write.location.getSize()) {
|
||||
forceToDisk = write.sync | write.onComplete!=null;
|
||||
|
||||
header.clear();
|
||||
header.putInt(write.location.getSize());
|
||||
|
@ -122,6 +121,7 @@ class NIODataFileAppender extends DataFileAppender {
|
|||
|
||||
// Combine the smaller writes into 1 big buffer
|
||||
while (write != null) {
|
||||
forceToDisk |= write.sync | write.onComplete!=null;
|
||||
|
||||
header.clear();
|
||||
header.putInt(write.location.getSize());
|
||||
|
@ -148,16 +148,13 @@ class NIODataFileAppender extends DataFileAppender {
|
|||
buffer.clear();
|
||||
}
|
||||
|
||||
file.getChannel().force(false);
|
||||
if( forceToDisk ) {
|
||||
file.getChannel().force(false);
|
||||
}
|
||||
|
||||
WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
|
||||
dataManager.setLastAppendLocation(lastWrite.location);
|
||||
|
||||
// Signal any waiting threads that the write is on disk.
|
||||
if (wb.latch != null) {
|
||||
wb.latch.countDown();
|
||||
}
|
||||
|
||||
// Now that the data is on disk, remove the writes from the in
|
||||
// flight
|
||||
// cache.
|
||||
|
@ -175,6 +172,9 @@ class NIODataFileAppender extends DataFileAppender {
|
|||
}
|
||||
write = (WriteCommand)write.getNext();
|
||||
}
|
||||
|
||||
// Signal any waiting threads that the write is on disk.
|
||||
wb.latch.countDown();
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.async;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
||||
public class DataFileAppenderTest extends TestCase {
|
||||
AsyncDataManager dataManager;
|
||||
File dir;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
dir = new File("target/tests/DataFileAppenderTest");
|
||||
dir.mkdirs();
|
||||
dataManager = new AsyncDataManager();
|
||||
dataManager.setDirectory(dir);
|
||||
configure(dataManager);
|
||||
dataManager.start();
|
||||
}
|
||||
|
||||
protected void configure(AsyncDataManager dataManager) {
|
||||
dataManager.setUseNio(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
dataManager.close();
|
||||
deleteFilesInDirectory(dir);
|
||||
dir.delete();
|
||||
}
|
||||
|
||||
private void deleteFilesInDirectory(File directory) {
|
||||
File[] files = directory.listFiles();
|
||||
for (int i=0; i<files.length; i++) {
|
||||
File f = files[i];
|
||||
if (f.isDirectory()) {
|
||||
deleteFilesInDirectory(f);
|
||||
}
|
||||
f.delete();
|
||||
}
|
||||
}
|
||||
|
||||
public void testBatchWriteCompleteAfterTimeout() throws Exception {
|
||||
ByteSequence data = new ByteSequence("DATA".getBytes());
|
||||
final int iterations = 10;
|
||||
for (int i=0; i<iterations; i++) {
|
||||
dataManager.write(data, false);
|
||||
}
|
||||
assertTrue("writes are queued up", dataManager.getInflightWrites().size() >= iterations);
|
||||
Thread.sleep(1000);
|
||||
assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
|
||||
}
|
||||
|
||||
|
||||
public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
|
||||
final int iterations = 10;
|
||||
final CountDownLatch latch = new CountDownLatch(iterations);
|
||||
ByteSequence data = new ByteSequence("DATA".getBytes());
|
||||
for (int i=0; i<iterations; i++) {
|
||||
dataManager.write(data, new Runnable() {
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
assertTrue("writes are queued up", dataManager.getInflightWrites().size() >= iterations);
|
||||
assertEquals("none written", iterations, latch.getCount());
|
||||
Thread.sleep(1000);
|
||||
assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
|
||||
assertEquals("none written", 0, latch.getCount());
|
||||
}
|
||||
|
||||
public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
|
||||
final int iterations = 10;
|
||||
final CountDownLatch latch = new CountDownLatch(iterations);
|
||||
ByteSequence data = new ByteSequence("DATA".getBytes());
|
||||
for (int i=0; i<iterations; i++) {
|
||||
dataManager.write(data, new Runnable() {
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
assertTrue("writes are queued up", dataManager.getInflightWrites().size() >= iterations);
|
||||
assertEquals("none written", iterations, latch.getCount());
|
||||
dataManager.close();
|
||||
assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
|
||||
assertEquals("none written", 0, latch.getCount());
|
||||
}
|
||||
|
||||
public void testBatchWriteCompleteAfterClose() throws Exception {
|
||||
ByteSequence data = new ByteSequence("DATA".getBytes());
|
||||
final int iterations = 10;
|
||||
for (int i=0; i<iterations; i++) {
|
||||
dataManager.write(data, false);
|
||||
}
|
||||
dataManager.close();
|
||||
assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
|
||||
}
|
||||
|
||||
public void testBatchWriteToMaxMessageSize() throws Exception {
|
||||
final int iterations = 4;
|
||||
final CountDownLatch latch = new CountDownLatch(iterations);
|
||||
Runnable done = new Runnable() {
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
int messageSize = DataFileAppender.DEFAULT_MAX_BATCH_SIZE / iterations;
|
||||
byte[] message = new byte[messageSize];
|
||||
ByteSequence data = new ByteSequence(message);
|
||||
|
||||
for (int i=0; i< iterations - 1; i++) {
|
||||
dataManager.write(data, done);
|
||||
}
|
||||
assertEquals("all writes are queued", iterations, latch.getCount());
|
||||
dataManager.write(data, done);
|
||||
latch.await(10, TimeUnit.SECONDS); // write may take some time
|
||||
assertEquals("all callbacks complete", 0, latch.getCount());
|
||||
}
|
||||
|
||||
public void testNoBatchWriteWithSync() throws Exception {
|
||||
ByteSequence data = new ByteSequence("DATA".getBytes());
|
||||
final int iterations = 10;
|
||||
for (int i=0; i<iterations; i++) {
|
||||
dataManager.write(data, true);
|
||||
assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.async;
|
||||
|
||||
public class NioDataFileAppenderTest extends DataFileAppenderTest {
|
||||
|
||||
@Override
|
||||
protected void configure(AsyncDataManager dataManager) {
|
||||
dataManager.setUseNio(true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,310 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.perf;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import junit.textui.TestRunner;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
import org.apache.activemq.xbean.BrokerFactoryBean;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
|
||||
|
||||
public class NetworkedSyncTest extends TestCase {
|
||||
|
||||
// constants
|
||||
public static final int MESSAGE_COUNT = 10000; //100000;
|
||||
public final static String config = "org/apache/activemq/perf/networkSync.xml";
|
||||
public final static String broker1URL = "tcp://localhost:61616";
|
||||
public final static String broker2URL = "tcp://localhost:62616";
|
||||
private final String networkConnectorURL = "static://(" + broker2URL + ")";
|
||||
private static final Log LOG = LogFactory.getLog(NetworkedSyncTest.class);
|
||||
BrokerService broker1 = null;
|
||||
BrokerService broker2 = null;
|
||||
NetworkConnector connector = null;
|
||||
|
||||
/**
|
||||
* @param name
|
||||
*/
|
||||
public NetworkedSyncTest(String name) {
|
||||
super(name);
|
||||
LOG.info("Testcase started.");
|
||||
}
|
||||
|
||||
public static void main(String args[]) {
|
||||
TestRunner.run(NetworkedSyncTest.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
protected void setUp() throws Exception {
|
||||
LOG.info("setUp() called.");
|
||||
ClassPathXmlApplicationContext context1 = null;
|
||||
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(config));
|
||||
|
||||
/* start up first broker instance */
|
||||
try {
|
||||
// resolve broker1
|
||||
Thread.currentThread().setContextClassLoader(
|
||||
NetworkedSyncTest.class.getClassLoader());
|
||||
context1 = new ClassPathXmlApplicationContext(config);
|
||||
broker1 = (BrokerService) context1.getBean("broker1");
|
||||
|
||||
// start the broker
|
||||
if (!broker1.isStarted()) {
|
||||
LOG.info("Broker broker1 not yet started. Kicking it off now.");
|
||||
broker1.start();
|
||||
} else {
|
||||
LOG.info("Broker broker1 already started. Not kicking it off a second time.");
|
||||
broker1.waitUntilStopped();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("Error: " + e.getMessage());
|
||||
throw e;
|
||||
// brokerService.stop();
|
||||
}
|
||||
|
||||
/* start up second broker instance */
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(
|
||||
NetworkedSyncTest.class.getClassLoader());
|
||||
context1 = new ClassPathXmlApplicationContext(config);
|
||||
broker2 = (BrokerService) context1.getBean("broker2");
|
||||
|
||||
// start the broker
|
||||
if (!broker2.isStarted()) {
|
||||
LOG.info("Broker broker2 not yet started. Kicking it off now.");
|
||||
broker2.start();
|
||||
} else {
|
||||
LOG.info("Broker broker2 already started. Not kicking it off a second time.");
|
||||
broker2.waitUntilStopped();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("Error: " + e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
|
||||
// setup network connector from broker1 to broker2
|
||||
connector = broker1.addNetworkConnector(networkConnectorURL);
|
||||
connector.setBrokerName(broker1.getBrokerName());
|
||||
connector.setDuplex(true);
|
||||
connector.start();
|
||||
LOG.info("Network connector created.");
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
protected void tearDown() throws Exception {
|
||||
|
||||
LOG.info("tearDown() called.");
|
||||
|
||||
if (broker1 != null && broker1.isStarted()) {
|
||||
LOG.info("Broker1 still running, stopping it now.");
|
||||
broker1.stop();
|
||||
} else {
|
||||
LOG.info("Broker1 not running, nothing to shutdown.");
|
||||
}
|
||||
if (broker2 != null && broker2.isStarted()) {
|
||||
LOG.info("Broker2 still running, stopping it now.");
|
||||
broker2.stop();
|
||||
} else {
|
||||
LOG.info("Broker2 not running, nothing to shutdown.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testMessageExchange() throws Exception {
|
||||
LOG.info("testMessageExchange() called.");
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
// create producer and consumer threads
|
||||
Thread producer = new Thread(new Producer());
|
||||
Thread consumer = new Thread(new Consumer());
|
||||
// start threads
|
||||
consumer.start();
|
||||
Thread.sleep(2000);
|
||||
producer.start();
|
||||
|
||||
|
||||
// wait for threads to finish
|
||||
producer.join();
|
||||
consumer.join();
|
||||
long end = System.currentTimeMillis();
|
||||
|
||||
System.out.println("Duration: "+(end-start));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Message producer running as a separate thread, connecting to broker1
|
||||
*
|
||||
* @author tmielke
|
||||
*
|
||||
*/
|
||||
class Producer implements Runnable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(Producer.class);
|
||||
|
||||
/**
|
||||
* connect to broker and constantly send messages
|
||||
*/
|
||||
public void run() {
|
||||
|
||||
Connection connection = null;
|
||||
Session session = null;
|
||||
MessageProducer producer = null;
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
|
||||
NetworkedSyncTest.broker1URL);
|
||||
connection = amq.createConnection();
|
||||
|
||||
connection.setExceptionListener(new javax.jms.ExceptionListener() {
|
||||
public void onException(javax.jms.JMSException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic destination = session.createTopic("TEST.FOO");
|
||||
|
||||
producer = session.createProducer(destination);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
long counter = 0;
|
||||
|
||||
// Create and send message
|
||||
for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) {
|
||||
|
||||
String text = "Hello world! From: "
|
||||
+ Thread.currentThread().getName() + " : "
|
||||
+ this.hashCode() + ":" + counter;
|
||||
TextMessage message = session.createTextMessage(text);
|
||||
producer.send(message);
|
||||
counter++;
|
||||
|
||||
if ((counter % 1000) == 0)
|
||||
LOG.info("sent " + counter + " messages");
|
||||
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error(ex);
|
||||
return;
|
||||
} finally {
|
||||
try {
|
||||
if (producer != null)
|
||||
producer.close();
|
||||
if (session != null)
|
||||
session.close();
|
||||
if (connection != null)
|
||||
connection.close();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Problem closing down JMS objects: " + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* * Message consumer running as a separate thread, connecting to broker2
|
||||
* @author tmielke
|
||||
*
|
||||
*/
|
||||
class Consumer implements Runnable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(Consumer.class);;
|
||||
|
||||
|
||||
/**
|
||||
* connect to broker and receive messages
|
||||
*/
|
||||
public void run() {
|
||||
Connection connection = null;
|
||||
Session session = null;
|
||||
MessageConsumer consumer = null;
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
|
||||
NetworkedSyncTest.broker2URL);
|
||||
connection = amq.createConnection();
|
||||
// need to set clientID when using durable subscription.
|
||||
connection.setClientID("tmielke");
|
||||
|
||||
connection.setExceptionListener(new javax.jms.ExceptionListener() {
|
||||
public void onException(javax.jms.JMSException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createTopic("TEST.FOO");
|
||||
consumer = session.createDurableSubscriber((Topic) destination,"tmielke");
|
||||
|
||||
long counter = 0;
|
||||
// Wait for a message
|
||||
for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) {
|
||||
Message message2 = consumer.receive();
|
||||
if (message2 instanceof TextMessage) {
|
||||
TextMessage textMessage = (TextMessage) message2;
|
||||
String text = textMessage.getText();
|
||||
// logger.info("Received: " + text);
|
||||
} else {
|
||||
LOG.error("Received message of unsupported type. Expecting TextMessage. "+ message2);
|
||||
}
|
||||
counter++;
|
||||
if ((counter % 1000) == 0)
|
||||
LOG.info("received " + counter + " messages");
|
||||
|
||||
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in Consumer: " + e);
|
||||
return;
|
||||
} finally {
|
||||
try {
|
||||
if (consumer != null)
|
||||
consumer.close();
|
||||
if (session != null)
|
||||
session.close();
|
||||
if (connection != null)
|
||||
connection.close();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error closing down JMS objects: " + ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:amq="http://activemq.org/config/1.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://activemq.org/config/1.0 http://activemq.apache.org/snapshot-schema/activemq-core-5.0-SNAPSHOT.xsd">
|
||||
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
|
||||
</bean>
|
||||
|
||||
<!-- Broker1 -->
|
||||
<broker brokerName="broker1" id="broker1" useJmx="true" persistent="true" deleteAllMessagesOnStartup="true" start="false" xmlns="http://activemq.org/config/1.0">
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:61616" />
|
||||
</transportConnectors>
|
||||
|
||||
<persistenceAdapter>
|
||||
<amqPersistenceAdapter directory="target/Broker1-data/activemq-data" syncOnWrite="true" indexPageSize="16kb" indexBinSize="100" maxReferenceFileLength="8192"/>
|
||||
</persistenceAdapter>
|
||||
</broker>
|
||||
|
||||
|
||||
<!-- Broker2 -->
|
||||
<broker brokerName="broker2" id="broker2" useJmx="true" persistent="false" deleteAllMessagesOnStartup="true" start="false" xmlns="http://activemq.org/config/1.0">
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:62616" />
|
||||
</transportConnectors>
|
||||
<persistenceAdapter>
|
||||
<amqPersistenceAdapter directory="target/Broker2-data/activemq-data" syncOnWrite="true" indexPageSize="16kb" indexBinSize="100" maxReferenceFileLength="8192"/>
|
||||
</persistenceAdapter>
|
||||
</broker>
|
||||
</beans>
|
Loading…
Reference in New Issue