This commit is contained in:
Clebert Suconic 2018-02-27 20:23:36 -05:00
commit f7dbf7b6d0
2 changed files with 31 additions and 34 deletions

View File

@ -86,7 +86,6 @@ import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@ -859,16 +858,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
private boolean internalFlushExecutor(long timeout, boolean log) { private boolean internalFlushExecutor(long timeout, boolean log) {
FutureLatch future = new FutureLatch();
getExecutor().execute(future); if (!getExecutor().flush(timeout, TimeUnit.MILLISECONDS)) {
if (log) {
boolean result = future.await(timeout); ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout);
}
if (log && !result) { return false;
ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout); } else {
return true;
} }
return result;
} }
@Override @Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.timing.core.server.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -46,7 +47,7 @@ public class QueueImplTest extends ActiveMQTestBase {
private ScheduledExecutorService scheduledExecutor; private ScheduledExecutorService scheduledExecutor;
// private ExecutorService executor; private ExecutorService executor;
@Override @Override
@Before @Before
@ -54,12 +55,14 @@ public class QueueImplTest extends ActiveMQTestBase {
super.setUp(); super.setUp();
scheduledExecutor = new ScheduledThreadPoolExecutor(1); scheduledExecutor = new ScheduledThreadPoolExecutor(1);
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
} }
@Override @Override
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
scheduledExecutor.shutdownNow(); scheduledExecutor.shutdownNow();
executor.shutdown();
super.tearDown(); super.tearDown();
} }
@ -68,17 +71,14 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test @Test
public void testScheduledNoConsumer() throws Exception { public void testScheduledNoConsumer() throws Exception {
QueueImpl queue = QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, ArtemisExecutor.delegate(executor), null, null);
new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
false, scheduledExecutor, null, null, null,
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
// Send one scheduled // Send one scheduled
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
MessageReference ref1 = generateReference(queue, 1); MessageReference ref1 = generateReference(queue, 1);
ref1.setScheduledDeliveryTime(now + 7000); ref1.setScheduledDeliveryTime(now + 700);
queue.addTail(ref1); queue.addTail(ref1);
// Send some non scheduled messages // Send some non scheduled messages
@ -93,19 +93,19 @@ public class QueueImplTest extends ActiveMQTestBase {
// Now send some more scheduled messages // Now send some more scheduled messages
MessageReference ref5 = generateReference(queue, 5); MessageReference ref5 = generateReference(queue, 5);
ref5.setScheduledDeliveryTime(now + 5000); ref5.setScheduledDeliveryTime(now + 500);
queue.addTail(ref5); queue.addTail(ref5);
MessageReference ref6 = generateReference(queue, 6); MessageReference ref6 = generateReference(queue, 6);
ref6.setScheduledDeliveryTime(now + 4000); ref6.setScheduledDeliveryTime(now + 400);
queue.addTail(ref6); queue.addTail(ref6);
MessageReference ref7 = generateReference(queue, 7); MessageReference ref7 = generateReference(queue, 7);
ref7.setScheduledDeliveryTime(now + 3000); ref7.setScheduledDeliveryTime(now + 300);
queue.addTail(ref7); queue.addTail(ref7);
MessageReference ref8 = generateReference(queue, 8); MessageReference ref8 = generateReference(queue, 8);
ref8.setScheduledDeliveryTime(now + 6000); ref8.setScheduledDeliveryTime(now + 600);
queue.addTail(ref8); queue.addTail(ref8);
List<MessageReference> refs = new ArrayList<>(); List<MessageReference> refs = new ArrayList<>();
@ -123,7 +123,7 @@ public class QueueImplTest extends ActiveMQTestBase {
refs.add(ref3); refs.add(ref3);
refs.add(ref4); refs.add(ref4);
Thread.sleep(7500); Thread.sleep(750);
FakeConsumer consumer = new FakeConsumer(); FakeConsumer consumer = new FakeConsumer();
@ -136,8 +136,7 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test @Test
public void testScheduled() throws Exception { public void testScheduled() throws Exception {
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, ArtemisExecutor.delegate(executor), null, null);
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
FakeConsumer consumer = null; FakeConsumer consumer = null;
@ -146,7 +145,7 @@ public class QueueImplTest extends ActiveMQTestBase {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
MessageReference ref1 = generateReference(queue, 1); MessageReference ref1 = generateReference(queue, 1);
ref1.setScheduledDeliveryTime(now + 7000); ref1.setScheduledDeliveryTime(now + 700);
queue.addTail(ref1); queue.addTail(ref1);
// Send some non scheduled messages // Send some non scheduled messages
@ -161,19 +160,19 @@ public class QueueImplTest extends ActiveMQTestBase {
// Now send some more scheduled messages // Now send some more scheduled messages
MessageReference ref5 = generateReference(queue, 5); MessageReference ref5 = generateReference(queue, 5);
ref5.setScheduledDeliveryTime(now + 5000); ref5.setScheduledDeliveryTime(now + 500);
queue.addTail(ref5); queue.addTail(ref5);
MessageReference ref6 = generateReference(queue, 6); MessageReference ref6 = generateReference(queue, 6);
ref6.setScheduledDeliveryTime(now + 4000); ref6.setScheduledDeliveryTime(now + 400);
queue.addTail(ref6); queue.addTail(ref6);
MessageReference ref7 = generateReference(queue, 7); MessageReference ref7 = generateReference(queue, 7);
ref7.setScheduledDeliveryTime(now + 3000); ref7.setScheduledDeliveryTime(now + 300);
queue.addTail(ref7); queue.addTail(ref7);
MessageReference ref8 = generateReference(queue, 8); MessageReference ref8 = generateReference(queue, 8);
ref8.setScheduledDeliveryTime(now + 6000); ref8.setScheduledDeliveryTime(now + 600);
queue.addTail(ref8); queue.addTail(ref8);
consumer = new FakeConsumer(); consumer = new FakeConsumer();
@ -196,27 +195,27 @@ public class QueueImplTest extends ActiveMQTestBase {
MessageReference ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); MessageReference ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT);
Assert.assertEquals(ref7, ref); Assert.assertEquals(ref7, ref);
long now2 = System.currentTimeMillis(); long now2 = System.currentTimeMillis();
Assert.assertTrue(now2 - now >= 3000); Assert.assertTrue(now2 - now >= 300);
ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT);
Assert.assertEquals(ref6, ref); Assert.assertEquals(ref6, ref);
now2 = System.currentTimeMillis(); now2 = System.currentTimeMillis();
Assert.assertTrue(now2 - now >= 4000); Assert.assertTrue(now2 - now >= 400);
ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT);
Assert.assertEquals(ref5, ref); Assert.assertEquals(ref5, ref);
now2 = System.currentTimeMillis(); now2 = System.currentTimeMillis();
Assert.assertTrue(now2 - now >= 5000); Assert.assertTrue(now2 - now >= 500);
ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT);
Assert.assertEquals(ref8, ref); Assert.assertEquals(ref8, ref);
now2 = System.currentTimeMillis(); now2 = System.currentTimeMillis();
Assert.assertTrue(now2 - now >= 6000); Assert.assertTrue(now2 - now >= 600);
ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT);
Assert.assertEquals(ref1, ref); Assert.assertEquals(ref1, ref);
now2 = System.currentTimeMillis(); now2 = System.currentTimeMillis();
Assert.assertTrue(now2 - now >= 7000); Assert.assertTrue(now2 - now >= 700);
Assert.assertTrue(consumer.getReferences().isEmpty()); Assert.assertTrue(consumer.getReferences().isEmpty());
} }
@ -236,10 +235,10 @@ public class QueueImplTest extends ActiveMQTestBase {
} }
}; };
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null,
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null); ArtemisExecutor.delegate(executor), null, null);
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer); queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 200);
queue.addHead(messageReference, false); queue.addHead(messageReference, false);
boolean gotLatch = countDownLatch.await(3000, TimeUnit.MILLISECONDS); boolean gotLatch = countDownLatch.await(3000, TimeUnit.MILLISECONDS);