From 4dd54080a1e91463ad4b6693fb1e62ad3a81124a Mon Sep 17 00:00:00 2001 From: Thiago Kronig Date: Fri, 12 Jun 2015 01:27:54 -0300 Subject: [PATCH] ARTEMIS-127 Fix Array.toString(), nonatomic update on volatiles --- .../broker/policy/AbortSlowConsumer1Test.java | 4 +- .../org/apache/activemq/bugs/AMQ2149Test.java | 112 ++++++++++-------- .../org/apache/activemq/bugs/AMQ3779Test.java | 2 +- .../command/ActiveMQTextMessageTest.java | 2 +- .../store/kahadb/KahaDBFastEnqueueTest.java | 4 +- .../store/kahadb/plist/PListTest.java | 8 +- .../activemq/transport/StubTransport.java | 7 +- .../DurableSubscriptionOffline4Test.java | 3 +- .../activemq/usecases/MemoryLimitTest.java | 4 +- 9 files changed, 78 insertions(+), 68 deletions(-) diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java index e17b362cf6..4368f7927a 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java @@ -82,7 +82,7 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase { consumer.close(); TimeUnit.SECONDS.sleep(5); - assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + assertTrue("no exceptions : " + exceptions, exceptions.isEmpty()); } @Test(timeout = 60 * 1000) @@ -99,6 +99,6 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase { conn.close(); TimeUnit.SECONDS.sleep(5); - assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + assertTrue("no exceptions : " + exceptions, exceptions.isEmpty()); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index c28d3ad295..f172a919f3 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.*; @@ -65,22 +66,22 @@ public class AMQ2149Test { private static final String BROKER_CONNECTOR = "tcp://localhost:61617"; private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR +")?maxReconnectDelay=1000&useExponentialBackOff=false"; - + private final String SEQ_NUM_PROPERTY = "seqNum"; final int MESSAGE_LENGTH_BYTES = 75 * 1024; final long SLEEP_BETWEEN_SEND_MS = 25; final int NUM_SENDERS_AND_RECEIVERS = 10; final Object brokerLock = new Object(); - + private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000; private static final long DEFAULT_NUM_TO_SEND = 1400; - + long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; long numtoSend = DEFAULT_NUM_TO_SEND; long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS; String brokerURL = DEFAULT_BROKER_URL; - + int numBrokerRestarts = 0; final static int MAX_BROKER_RESTARTS = 4; BrokerService broker; @@ -88,15 +89,15 @@ public class AMQ2149Test { protected File dataDirFile; final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()}; - - + + public void createBroker(Configurer configurer) throws Exception { broker = new BrokerService(); configurePersistenceAdapter(broker); - + broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS); - broker.addConnector(BROKER_CONNECTOR); + broker.addConnector(BROKER_CONNECTOR); broker.setBrokerName(testName.getMethodName()); broker.setDataDirectoryFile(dataDirFile); if (configurer != null) { @@ -104,7 +105,7 @@ public class AMQ2149Test { } broker.start(); } - + protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception { } @@ -135,7 +136,7 @@ public class AMQ2149Test { executor.shutdownNow(); exceptions.clear(); } - + private String buildLongString() { final StringBuilder stringBuilder = new StringBuilder( MESSAGE_LENGTH_BYTES); @@ -156,8 +157,8 @@ public class AMQ2149Test { private final MessageConsumer messageConsumer; - private volatile long nextExpectedSeqNum = 0; - + private AtomicLong nextExpectedSeqNum = new AtomicLong(); + private final boolean transactional; private String lastId = null; @@ -182,11 +183,11 @@ public class AMQ2149Test { public void close() throws JMSException { connection.close(); } - + public long getNextExpectedSeqNo() { - return nextExpectedSeqNum; + return nextExpectedSeqNum.get(); } - + final int TRANSACITON_BATCH = 500; boolean resumeOnNextOrPreviousIsOk = false; public void onMessage(Message message) { @@ -194,7 +195,7 @@ public class AMQ2149Test { final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); if ((seqNum % TRANSACITON_BATCH) == 0) { LOG.info(dest + " received " + seqNum); - + if (transactional) { LOG.info("committing.."); session.commit(); @@ -202,25 +203,26 @@ public class AMQ2149Test { } if (resumeOnNextOrPreviousIsOk) { // after an indoubt commit we need to accept what we get (within reason) - if (seqNum != nextExpectedSeqNum) { - if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) { - nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + if (seqNum != nextExpectedSeqNum.get()) { + final long l = nextExpectedSeqNum.get(); + if (seqNum == l - (TRANSACITON_BATCH -1)) { + nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH -1) ); LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum); } } resumeOnNextOrPreviousIsOk = false; } - if (seqNum != nextExpectedSeqNum) { + if (seqNum != nextExpectedSeqNum.get()) { LOG.warn(dest + " received " + seqNum + " in msg: " + message.getJMSMessageID() + " expected " + nextExpectedSeqNum - + ", lastId: " + lastId + + ", lastId: " + lastId + ", message:" + message); fail(dest + " received " + seqNum + " expected " + nextExpectedSeqNum); } - ++nextExpectedSeqNum; + nextExpectedSeqNum.incrementAndGet(); lastId = message.getJMSMessageID(); } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) { LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery); @@ -228,12 +230,12 @@ public class AMQ2149Test { // in doubt - either commit command or reply missing // don't know if we will get a replay resumeOnNextOrPreviousIsOk = true; - nextExpectedSeqNum++; + nextExpectedSeqNum.incrementAndGet(); LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum); } else { resumeOnNextOrPreviousIsOk = false; // batch will be replayed - nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1)); } } catch (Throwable e) { @@ -255,6 +257,7 @@ public class AMQ2149Test { private final MessageProducer messageProducer; private volatile long nextSequenceNumber = 0; + private final Object guard = new Object(); public Sender(javax.jms.Destination dest) throws JMSException { this.dest = dest; @@ -269,15 +272,24 @@ public class AMQ2149Test { public void run() { final String longString = buildLongString(); + long nextSequenceNumber = this.nextSequenceNumber; while (nextSequenceNumber < numtoSend) { try { final Message message = session .createTextMessage(longString); message.setLongProperty(SEQ_NUM_PROPERTY, nextSequenceNumber); - ++nextSequenceNumber; - messageProducer.send(message); - + synchronized (guard) + { + if (nextSequenceNumber == this.nextSequenceNumber) + { + this.nextSequenceNumber = nextSequenceNumber + 1; + messageProducer.send(message); + } else { + continue; + } + } + if ((nextSequenceNumber % 500) == 0) { LOG.info(dest + " sent " + nextSequenceNumber); } @@ -353,13 +365,13 @@ public class AMQ2149Test { // no need to run this unless there are some issues with the others public void vanilaVerify_testOrder() throws Exception { - + createBroker(new Configurer() { public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); + broker.deleteAllMessages(); } }); - + verifyOrderedMessageReceipt(); verifyStats(false); } @@ -368,22 +380,22 @@ public class AMQ2149Test { public void testOrderWithRestart() throws Exception { createBroker(new Configurer() { public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); + broker.deleteAllMessages(); } }); - + final Timer timer = new Timer(); schedualRestartTask(timer, new Configurer() { - public void configure(BrokerService broker) throws Exception { + public void configure(BrokerService broker) throws Exception { } }); - + try { verifyOrderedMessageReceipt(); } finally { timer.cancel(); } - + verifyStats(true); } @@ -394,16 +406,16 @@ public class AMQ2149Test { broker.deleteAllMessages(); } }); - + final Timer timer = new Timer(); schedualRestartTask(timer, null); - + try { verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE); } finally { timer.cancel(); } - + verifyStats(true); } @@ -416,33 +428,33 @@ public class AMQ2149Test { public void testTopicTransactionalOrderWithRestart() throws Exception { doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE); } - + public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception { numtoSend = 10000; sleepBetweenSend = 3; brokerStopPeriod = 10 * 1000; - + createBroker(new Configurer() { public void configure(BrokerService broker) throws Exception { broker.deleteAllMessages(); } }); - + final Timer timer = new Timer(); schedualRestartTask(timer, null); - + try { verifyOrderedMessageReceipt(destinationType, 1, true); } finally { timer.cancel(); } - + verifyStats(true); } private void verifyStats(boolean brokerRestarts) throws Exception { RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); - + for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { DestinationStatistics stats = dest.getDestinationStatistics(); if (brokerRestarts) { @@ -453,7 +465,7 @@ public class AMQ2149Test { + " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount()); } else { assertEquals("qneue/dequeue match for: " + dest.getName(), - stats.getEnqueues().getCount(), stats.getDequeues().getCount()); + stats.getEnqueues().getCount(), stats.getDequeues().getCount()); } } } @@ -496,20 +508,20 @@ public class AMQ2149Test { } return task; } - + private void verifyOrderedMessageReceipt(byte destinationType) throws Exception { verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false); } - + private void verifyOrderedMessageReceipt() throws Exception { verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false); } - + private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception { Vector threads = new Vector(); Vector receivers = new Vector(); - + for (int i = 0; i < concurrentPairs; ++i) { final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest." + i, destinationType); @@ -518,7 +530,7 @@ public class AMQ2149Test { thread.start(); threads.add(thread); } - + final long expiry = System.currentTimeMillis() + 1000 * 60 * 4; while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { Thread sendThread = threads.firstElement(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java index 5a410e87a4..c7a486f60f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java @@ -47,7 +47,7 @@ public class AMQ3779Test extends AutoFailTestSupport { } } }; - logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().addAppender(appender); try { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java index 28fc307c5e..13c71847c5 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java @@ -49,7 +49,7 @@ public class ActiveMQTextMessageTest extends TestCase { String string = "str"; msg.setText(string); Message copy = msg.copy(); - assertTrue(msg.getText() == ((ActiveMQTextMessage) copy).getText()); + assertSame(msg.getText(), ((ActiveMQTextMessage) copy).getText()); } public void testSetText() { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java index 57530da7e2..4ccb51efe6 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java @@ -238,8 +238,8 @@ public class KahaDBFastEnqueueTest { public void testRollover() throws Exception { byte flip = 0x1; for (long i=0; i exceptions = new Vector(); ExecutorService executor; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java index 8fb70ec966..a11d45a5c2 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport; import java.io.IOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.util.ServiceStopper; @@ -29,7 +30,7 @@ import org.apache.activemq.util.ServiceStopper; public class StubTransport extends TransportSupport { private Queue queue = new ConcurrentLinkedQueue(); - private volatile int receiveCounter; + private AtomicInteger receiveCounter; protected void doStop(ServiceStopper stopper) throws Exception { } @@ -38,7 +39,7 @@ public class StubTransport extends TransportSupport { } public void oneway(Object command) throws IOException { - receiveCounter++; + receiveCounter.incrementAndGet(); queue.add(command); } @@ -51,7 +52,7 @@ public class StubTransport extends TransportSupport { } public int getReceiveCounter() { - return receiveCounter; + return receiveCounter.get(); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java index 09c50d0107..bc31444d06 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java @@ -30,6 +30,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -80,7 +81,7 @@ public class DurableSubscriptionOffline4Test extends DurableSubscriptionOfflineT MessageProducer producer = session.createProducer(null); final int toSend = 500; - final String payload = new byte[40*1024].toString(); + final String payload = Arrays.toString(new byte[40 * 1024]); int sent = 0; for (int i = sent; i < toSend; i++) { Message message = session.createTextMessage(payload); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java index 49026bd6a7..deb9cde774 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -168,7 +168,7 @@ public class MemoryLimitTest extends TestSupport { final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) { @Override protected Message createMessage(int i) throws Exception { - return sess.createTextMessage(payload + "::" + i); + return sess.createTextMessage(Arrays.toString(payload) + "::" + i); } }; producer.setMessageCount(1000); @@ -176,7 +176,7 @@ public class MemoryLimitTest extends TestSupport { final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) { @Override protected Message createMessage(int i) throws Exception { - return sess.createTextMessage(payload + "::" + i); + return sess.createTextMessage(Arrays.toString(payload) + "::" + i); } }; producer2.setMessageCount(1000);