ARTEMIS-127 Fix Array.toString(), nonatomic update on volatiles

This commit is contained in:
Thiago Kronig 2015-06-12 01:27:54 -03:00
parent ae6a2b87ea
commit 4dd54080a1
9 changed files with 78 additions and 68 deletions

View File

@ -82,7 +82,7 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
consumer.close(); consumer.close();
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
} }
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
@ -99,6 +99,6 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
conn.close(); conn.close();
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
} }
} }

View File

@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.*; import javax.jms.*;
@ -156,7 +157,7 @@ public class AMQ2149Test {
private final MessageConsumer messageConsumer; private final MessageConsumer messageConsumer;
private volatile long nextExpectedSeqNum = 0; private AtomicLong nextExpectedSeqNum = new AtomicLong();
private final boolean transactional; private final boolean transactional;
@ -184,7 +185,7 @@ public class AMQ2149Test {
} }
public long getNextExpectedSeqNo() { public long getNextExpectedSeqNo() {
return nextExpectedSeqNum; return nextExpectedSeqNum.get();
} }
final int TRANSACITON_BATCH = 500; final int TRANSACITON_BATCH = 500;
@ -202,15 +203,16 @@ public class AMQ2149Test {
} }
if (resumeOnNextOrPreviousIsOk) { if (resumeOnNextOrPreviousIsOk) {
// after an indoubt commit we need to accept what we get (within reason) // after an indoubt commit we need to accept what we get (within reason)
if (seqNum != nextExpectedSeqNum) { if (seqNum != nextExpectedSeqNum.get()) {
if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) { final long l = nextExpectedSeqNum.get();
nextExpectedSeqNum -= (TRANSACITON_BATCH -1); if (seqNum == l - (TRANSACITON_BATCH -1)) {
nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH -1) );
LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum); LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum);
} }
} }
resumeOnNextOrPreviousIsOk = false; resumeOnNextOrPreviousIsOk = false;
} }
if (seqNum != nextExpectedSeqNum) { if (seqNum != nextExpectedSeqNum.get()) {
LOG.warn(dest + " received " + seqNum LOG.warn(dest + " received " + seqNum
+ " in msg: " + message.getJMSMessageID() + " in msg: " + message.getJMSMessageID()
+ " expected " + " expected "
@ -220,7 +222,7 @@ public class AMQ2149Test {
fail(dest + " received " + seqNum + " expected " fail(dest + " received " + seqNum + " expected "
+ nextExpectedSeqNum); + nextExpectedSeqNum);
} }
++nextExpectedSeqNum; nextExpectedSeqNum.incrementAndGet();
lastId = message.getJMSMessageID(); lastId = message.getJMSMessageID();
} catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) { } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery); LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
@ -228,12 +230,12 @@ public class AMQ2149Test {
// in doubt - either commit command or reply missing // in doubt - either commit command or reply missing
// don't know if we will get a replay // don't know if we will get a replay
resumeOnNextOrPreviousIsOk = true; resumeOnNextOrPreviousIsOk = true;
nextExpectedSeqNum++; nextExpectedSeqNum.incrementAndGet();
LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum); LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum);
} else { } else {
resumeOnNextOrPreviousIsOk = false; resumeOnNextOrPreviousIsOk = false;
// batch will be replayed // batch will be replayed
nextExpectedSeqNum -= (TRANSACITON_BATCH -1); nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1));
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -255,6 +257,7 @@ public class AMQ2149Test {
private final MessageProducer messageProducer; private final MessageProducer messageProducer;
private volatile long nextSequenceNumber = 0; private volatile long nextSequenceNumber = 0;
private final Object guard = new Object();
public Sender(javax.jms.Destination dest) throws JMSException { public Sender(javax.jms.Destination dest) throws JMSException {
this.dest = dest; this.dest = dest;
@ -269,14 +272,23 @@ public class AMQ2149Test {
public void run() { public void run() {
final String longString = buildLongString(); final String longString = buildLongString();
long nextSequenceNumber = this.nextSequenceNumber;
while (nextSequenceNumber < numtoSend) { while (nextSequenceNumber < numtoSend) {
try { try {
final Message message = session final Message message = session
.createTextMessage(longString); .createTextMessage(longString);
message.setLongProperty(SEQ_NUM_PROPERTY, message.setLongProperty(SEQ_NUM_PROPERTY,
nextSequenceNumber); nextSequenceNumber);
++nextSequenceNumber; synchronized (guard)
messageProducer.send(message); {
if (nextSequenceNumber == this.nextSequenceNumber)
{
this.nextSequenceNumber = nextSequenceNumber + 1;
messageProducer.send(message);
} else {
continue;
}
}
if ((nextSequenceNumber % 500) == 0) { if ((nextSequenceNumber % 500) == 0) {
LOG.info(dest + " sent " + nextSequenceNumber); LOG.info(dest + " sent " + nextSequenceNumber);

View File

@ -47,7 +47,7 @@ public class AMQ3779Test extends AutoFailTestSupport {
} }
} }
}; };
logger.getRootLogger().addAppender(appender); Logger.getRootLogger().addAppender(appender);
try { try {

View File

@ -49,7 +49,7 @@ public class ActiveMQTextMessageTest extends TestCase {
String string = "str"; String string = "str";
msg.setText(string); msg.setText(string);
Message copy = msg.copy(); Message copy = msg.copy();
assertTrue(msg.getText() == ((ActiveMQTextMessage) copy).getText()); assertSame(msg.getText(), ((ActiveMQTextMessage) copy).getText());
} }
public void testSetText() { public void testSetText() {

View File

@ -238,8 +238,8 @@ public class KahaDBFastEnqueueTest {
public void testRollover() throws Exception { public void testRollover() throws Exception {
byte flip = 0x1; byte flip = 0x1;
for (long i=0; i<Short.MAX_VALUE; i++) { for (long i=0; i<Short.MAX_VALUE; i++) {
assertEquals("0 @:" + i, 0, flip ^= 1); assertEquals("0 @:" + i, 0, flip ^= (byte) 1);
assertEquals("1 @:" + i, 1, flip ^= 1); assertEquals("1 @:" + i, 1, flip ^= (byte) 1);
} }
} }
} }

View File

@ -22,11 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.*;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -46,7 +42,7 @@ public class PListTest {
private PListStoreImpl store; private PListStoreImpl store;
private PListImpl plist; private PListImpl plist;
final ByteSequence payload = new ByteSequence(new byte[400]); final ByteSequence payload = new ByteSequence(new byte[400]);
final String idSeed = new String("Seed" + new byte[1024]); final String idSeed = new String("Seed" + Arrays.toString(new byte[1024]));
final Vector<Throwable> exceptions = new Vector<Throwable>(); final Vector<Throwable> exceptions = new Vector<Throwable>();
ExecutorService executor; ExecutorService executor;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
@ -29,7 +30,7 @@ import org.apache.activemq.util.ServiceStopper;
public class StubTransport extends TransportSupport { public class StubTransport extends TransportSupport {
private Queue<Object> queue = new ConcurrentLinkedQueue<Object>(); private Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
private volatile int receiveCounter; private AtomicInteger receiveCounter;
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
} }
@ -38,7 +39,7 @@ public class StubTransport extends TransportSupport {
} }
public void oneway(Object command) throws IOException { public void oneway(Object command) throws IOException {
receiveCounter++; receiveCounter.incrementAndGet();
queue.add(command); queue.add(command);
} }
@ -51,7 +52,7 @@ public class StubTransport extends TransportSupport {
} }
public int getReceiveCounter() { public int getReceiveCounter() {
return receiveCounter; return receiveCounter.get();
} }
} }

View File

@ -30,6 +30,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -80,7 +81,7 @@ public class DurableSubscriptionOffline4Test extends DurableSubscriptionOfflineT
MessageProducer producer = session.createProducer(null); MessageProducer producer = session.createProducer(null);
final int toSend = 500; final int toSend = 500;
final String payload = new byte[40*1024].toString(); final String payload = Arrays.toString(new byte[40 * 1024]);
int sent = 0; int sent = 0;
for (int i = sent; i < toSend; i++) { for (int i = sent; i < toSend; i++) {
Message message = session.createTextMessage(payload); Message message = session.createTextMessage(payload);

View File

@ -168,7 +168,7 @@ public class MemoryLimitTest extends TestSupport {
final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) { final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
@Override @Override
protected Message createMessage(int i) throws Exception { protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i); return sess.createTextMessage(Arrays.toString(payload) + "::" + i);
} }
}; };
producer.setMessageCount(1000); producer.setMessageCount(1000);
@ -176,7 +176,7 @@ public class MemoryLimitTest extends TestSupport {
final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) { final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
@Override @Override
protected Message createMessage(int i) throws Exception { protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i); return sess.createTextMessage(Arrays.toString(payload) + "::" + i);
} }
}; };
producer2.setMessageCount(1000); producer2.setMessageCount(1000);