CallerBufferingDataFileAppender, fix rollover of cached buffers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1222635 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-12-23 10:48:38 +00:00
parent bb4a2f73f4
commit 384388fe62
2 changed files with 19 additions and 7 deletions

View File

@ -50,11 +50,13 @@ public class KahaDBFastEnqueueTest {
private Destination destination = new ActiveMQQueue("Test"); private Destination destination = new ActiveMQQueue("Test");
private String payloadString = new String(new byte[6*1024]); private String payloadString = new String(new byte[6*1024]);
private boolean useBytesMessage= true; private boolean useBytesMessage= true;
private final int parallelProducer = 2; private final int parallelProducer = 20;
private Vector<Exception> exceptions = new Vector<Exception>(); private Vector<Exception> exceptions = new Vector<Exception>();
final long toSend = 1000;//500000; final long toSend = 500000;
@Ignore("not ready yet, exploring getting broker disk bound") @Ignore("too slow, exploring getting broker disk bound")
// use with:
// -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true
public void testPublishNoConsumer() throws Exception { public void testPublishNoConsumer() throws Exception {
startBroker(true); startBroker(true);
@ -116,8 +118,10 @@ public class KahaDBFastEnqueueTest {
@After @After
public void stopBroker() throws Exception { public void stopBroker() throws Exception {
broker.stop(); if (broker != null) {
broker.waitUntilStopped(); broker.stop();
broker.waitUntilStopped();
}
} }
final double sampleRate = 100000; final double sampleRate = 100000;
@ -174,4 +178,12 @@ public class KahaDBFastEnqueueTest {
String options = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192"; String options = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options); connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options);
} }
public void testRollover() throws Exception {
byte flip = 0x1;
for (long i=0; i<Short.MAX_VALUE; i++) {
assertEquals("0 @:" + i, 0, flip ^= 1);
assertEquals("1 @:" + i, 1, flip ^= 1);
}
}
} }

View File

@ -57,10 +57,10 @@ class CallerBufferingDataFileAppender implements FileAppender {
new DataByteArrayOutputStream(maxWriteBatchSize), new DataByteArrayOutputStream(maxWriteBatchSize),
new DataByteArrayOutputStream(maxWriteBatchSize) new DataByteArrayOutputStream(maxWriteBatchSize)
}; };
AtomicInteger writeBatchInstanceCount = new AtomicInteger(); volatile byte flip = 0x1;
public class WriteBatch { public class WriteBatch {
DataByteArrayOutputStream buff = cachedBuffers[writeBatchInstanceCount.getAndIncrement()%2]; DataByteArrayOutputStream buff = cachedBuffers[flip ^= 1];
public final DataFile dataFile; public final DataFile dataFile;
public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>(); public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();