diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java index a2b609fbcb..b07e02ecb3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java @@ -46,7 +46,7 @@ public interface ArtemisExecutor extends Executor { * @param onPendingTask it will be called for each pending task found * @return the number of pending tasks that won't be executed */ - default int shutdownNow(Consumer onPendingTask) { + default int shutdownNow(Consumer onPendingTask, int timeout, TimeUnit unit) { return 0; } @@ -73,7 +73,7 @@ public interface ArtemisExecutor extends Executor { */ default int shutdownNow() { return shutdownNow(t -> { - }); + }, 1, TimeUnit.SECONDS); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java index ff6d9a1db5..67dcb5cdb3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import org.jboss.logging.Logger; @@ -99,45 +98,26 @@ public abstract class ProcessorBase extends HandlerBase { } } - /** - * It will wait the current execution (if there is one) to finish - * but will not complete any further executions - */ - public int shutdownNow(Consumer onPendingItem) { + /** It will shutdown the executor however it will not wait for finishing tasks*/ + public int shutdownNow(Consumer onPendingItem, int timeout, TimeUnit unit) { //alert anyone that has been requested (at least) an immediate shutdown requestedForcedShutdown = true; requestedShutdown = true; - if (inHandler()) { - stateUpdater.set(this, STATE_FORCED_SHUTDOWN); - } else { - //it could take a very long time depending on the current executing task - do { - //alert the ExecutorTask (if is running) to just drain the current backlog of tasks - final int startState = stateUpdater.get(this); - if (startState == STATE_FORCED_SHUTDOWN) { - //another thread has completed a forced shutdown: let it to manage the tasks cleanup - break; - } - if (startState == STATE_RUNNING) { - //wait 100 ms to avoid burning CPU while waiting and - //give other threads a chance to make progress - LockSupport.parkNanos(100_000_000L); - } - } - while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN)); - //this could happen just one time: the forced shutdown state is the last one and - //can be set by just one caller. - //As noted on the execute method there is a small chance that some tasks would be enqueued + if (!inHandler()) { + // We don't have an option where we could do an immediate timeout + // I just need to make one roundtrip to make sure there's no pending tasks on the loop + // for that I ellected one second + flush(timeout, unit); } + + stateUpdater.set(this, STATE_FORCED_SHUTDOWN); int pendingItems = 0; - //there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them - synchronized (tasks) { - T item; - while ((item = tasks.poll()) != null) { - onPendingItem.accept(item); - pendingItems++; - } + + T item; + while ((item = tasks.poll()) != null) { + onPendingItem.accept(item); + pendingItems++; } return pendingItems; } @@ -184,6 +164,7 @@ public abstract class ProcessorBase extends HandlerBase { protected void task(T command) { if (requestedShutdown) { logAddOnShutdown(); + return; } //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks tasks.add(command); @@ -203,11 +184,6 @@ public abstract class ProcessorBase extends HandlerBase { if (state == STATE_NOT_RUNNING) { //startPoller could be deleted but is maintained because is inherited delegate.execute(task); - } else if (state == STATE_FORCED_SHUTDOWN) { - //help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add - synchronized (tasks) { - tasks.clear(); - } } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java index 345cbb5009..e0b8a10ca6 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.utils.actors; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -69,9 +70,7 @@ public class OrderedExecutorSanityTest { executor.shutdownNow(); Assert.assertEquals("There are no remaining tasks to be executed", 0, executor.remaining()); //from now on new tasks won't be executed - final CountDownLatch afterDeatchExecution = new CountDownLatch(1); - executor.execute(afterDeatchExecution::countDown); - Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(100, TimeUnit.MILLISECONDS)); + executor.execute(() -> System.out.println("this will never happen")); //to avoid memory leaks the executor must take care of the new submitted tasks immediatly Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining()); } finally { @@ -82,11 +81,11 @@ public class OrderedExecutorSanityTest { @Test - public void shutdownNowOnDelegateExecutor() throws InterruptedException { + public void shutdownNowOnDelegateExecutor() throws Exception { final ExecutorService executorService = Executors.newSingleThreadExecutor(); try { final OrderedExecutor executor = new OrderedExecutor(executorService); - final CountDownLatch latch = new CountDownLatch(1); + final CyclicBarrier latch = new CyclicBarrier(2); final AtomicInteger numberOfTasks = new AtomicInteger(0); final CountDownLatch ran = new CountDownLatch(1); @@ -105,7 +104,7 @@ public class OrderedExecutorSanityTest { executor.execute(() -> System.out.println("Dont worry, this will never happen")); } - latch.countDown(); + latch.await(); ran.await(1, TimeUnit.SECONDS); Assert.assertEquals(100, numberOfTasks.get()); @@ -116,6 +115,44 @@ public class OrderedExecutorSanityTest { } } + @Test + public void shutdownNowWithBlocked() throws Exception { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + final OrderedExecutor executor = new OrderedExecutor(executorService); + final CyclicBarrier latch = new CyclicBarrier(2); + final CyclicBarrier secondlatch = new CyclicBarrier(2); + final CountDownLatch ran = new CountDownLatch(1); + + executor.execute(() -> { + try { + latch.await(1, TimeUnit.MINUTES); + secondlatch.await(1, TimeUnit.MINUTES); + ran.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + + for (int i = 0; i < 100; i++) { + executor.execute(() -> System.out.println("Dont worry, this will never happen")); + } + + latch.await(); + try { + Assert.assertEquals(100, executor.shutdownNow()); + } finally { + secondlatch.await(); + } + + Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status()); + Assert.assertEquals(0, executor.remaining()); + } finally { + executorService.shutdown(); + } + } + @Test public void testMeasure() throws InterruptedException { diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 859d5d0080..06758f5526 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -162,6 +162,12 @@ org.apache.activemq activemq-client + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + org.apache.activemq diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 9d78b4a7ca..08f0794a8e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -61,7 +61,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.artemis.utils.RunnableEx; import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -142,7 +141,18 @@ public class AMQPSessionCallback implements SessionCallback { return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED; } - public void withinContext(RunnableEx run) throws Exception { + public void withinSessionExecutor(Runnable run) { + sessionExecutor.execute(() -> { + try { + withinContext(run); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + }); + + } + + public void withinContext(Runnable run) throws Exception { OperationContext context = recoverContext(); try { run.run(); @@ -438,18 +448,6 @@ public class AMQPSessionCallback implements SessionCallback { return new AMQPStandardMessage(delivery.getMessageFormat(), data, null, coreMessageObjectPools); } - public void serverSend(final ProtonServerReceiverContext context, - final Transaction transaction, - final Receiver receiver, - final Delivery delivery, - SimpleString address, - int messageFormat, - ReadableBuffer data, - RoutingContext routingContext) throws Exception { - AMQPStandardMessage message = new AMQPStandardMessage(messageFormat, data, null, coreMessageObjectPools); - serverSend(context, transaction, receiver, delivery, address, routingContext, message); - } - public void serverSend(ProtonServerReceiverContext context, Transaction transaction, Receiver receiver, @@ -491,7 +489,9 @@ public class AMQPSessionCallback implements SessionCallback { throw e; } } else { - serverSend(context, transaction, message, delivery, receiver, routingContext); + message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); + // We need to transfer IO execution to a different thread otherwise we may deadlock netty loop + sessionExecutor.execute(() -> inSessionSend(context, transaction, message, delivery, receiver, routingContext)); } } finally { resetContext(oldcontext); @@ -523,46 +523,58 @@ public class AMQPSessionCallback implements SessionCallback { } - private void serverSend(final ProtonServerReceiverContext context, + private void inSessionSend(final ProtonServerReceiverContext context, final Transaction transaction, final Message message, final Delivery delivery, final Receiver receiver, - final RoutingContext routingContext) throws Exception { - message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); - if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) { - serverSession.send(transaction, message, directDeliver, false, routingContext); + final RoutingContext routingContext) { + OperationContext oldContext = recoverContext(); + try { + if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) { + serverSession.send(transaction, message, directDeliver, false, routingContext); - afterIO(new IOCallback() { - @Override - public void done() { - connection.runLater(() -> { - if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txAccepted = new TransactionalState(); - txAccepted.setOutcome(Accepted.getInstance()); - txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId()); + afterIO(new IOCallback() { + @Override + public void done() { + connection.runLater(() -> { + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId()); - delivery.disposition(txAccepted); - } else { - delivery.disposition(Accepted.getInstance()); - } - delivery.settle(); - context.flow(); - connection.flush(); - }); - } + delivery.disposition(txAccepted); + } else { + delivery.disposition(Accepted.getInstance()); + } + delivery.settle(); + context.flow(); + connection.instantFlush(); + }); + } - @Override - public void onError(int errorCode, String errorMessage) { - connection.runNow(() -> { - receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); - connection.flush(); - }); - } - }); - } else { - rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message"); + @Override + public void onError(int errorCode, String errorMessage) { + sendError(errorCode, errorMessage, receiver); + } + }); + } else { + rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message"); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + context.deliveryFailed(delivery, receiver, e); + } finally { + resetContext(oldContext); } + + } + + private void sendError(int errorCode, String errorMessage, Receiver receiver) { + connection.runNow(() -> { + receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); + connection.flush(); + }); } /** Will execute a Runnable on an Address when there's space in memory*/ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index a6cfb5e01c..38aaeded9e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -360,17 +360,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements sessionSPI.serverSend(this, tx, receiver, delivery, address, routingContext, message); } catch (Exception e) { log.warn(e.getMessage(), e); + + deliveryFailed(delivery, receiver, e); + + } + } + + public void deliveryFailed(Delivery delivery, Receiver receiver, Exception e) { + connection.runNow(() -> { DeliveryState deliveryState = determineDeliveryState(((Source) receiver.getSource()), useModified, e); - connection.runLater(() -> { - delivery.disposition(deliveryState); - delivery.settle(); - flow(); - connection.flush(); - }); - - } + delivery.disposition(deliveryState); + delivery.settle(); + flow(); + connection.flush(); + }); } private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index bf425d809b..b9385bc3e8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -62,7 +62,6 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Outcome; -import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -630,16 +629,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // this can happen in the twice ack mode, that is the receiver accepts and settles separately // acking again would show an exception but would have no negative effect but best to handle anyway. if (!delivery.isSettled()) { - // we have to individual ack as we can't guarantee we will get the delivery updates - // (including acks) in order from dealer, a performance hit but a must - try { - sessionSPI.ack(null, brokerConsumer, message); - } catch (Exception e) { - log.warn(e.toString(), e); - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); - } - - delivery.settle(); + inSessionACK(delivery, message); } } else { handleExtendedDeliveryOutcomes(message, delivery, remoteState); @@ -654,6 +644,37 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } + private void inSessionACK(Delivery delivery, Message message) throws ActiveMQAMQPIllegalStateException { + OperationContext oldContext = sessionSPI.recoverContext(); + try { + // we have to individual ack as we can't guarantee we will get the delivery updates + // (including acks) in order from dealer, a performance hit but a must + try { + sessionSPI.ack(null, brokerConsumer, message); + } catch (Exception e) { + log.warn(e.toString(), e); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + + sessionSPI.afterIO(new IOCallback() { + @Override + public void done() { + connection.runLater(() -> { + delivery.settle(); + connection.instantFlush(); + }); + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }); + } finally { + sessionSPI.resetContext(oldContext); + } + } + private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException { boolean settleImmediate = true; boolean handled = true; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 15803f497c..0bb61df19a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -145,26 +145,45 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { }; if (discharge.getFail()) { - sessionSPI.withinContext(() -> tx.rollback()); - sessionSPI.afterIO(ioAction); + sessionSPI.withinSessionExecutor(() -> { + try { + tx.rollback(); + sessionSPI.afterIO(ioAction); + } catch (Throwable e) { + txError(delivery, e); + } + }); } else { - sessionSPI.withinContext(() -> tx.commit()); - sessionSPI.afterIO(ioAction); + sessionSPI.withinSessionExecutor(() -> { + try { + tx.commit(); + sessionSPI.afterIO(ioAction); + } catch (Throwable e) { + txError(delivery, e); + } + }); } } } catch (ActiveMQAMQPException amqpE) { - log.warn(amqpE.getMessage(), amqpE); - delivery.settle(); - delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); - connection.flush(); + txError(delivery, amqpE); } catch (Throwable e) { - log.warn(e.getMessage(), e); - delivery.settle(); - delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); - connection.flush(); + txError(delivery, e); } } + private void txError(Delivery delivery, Throwable e) { + log.warn(e.getMessage(), e); + connection.runNow(() -> { + delivery.settle(); + if (e instanceof ActiveMQAMQPException) { + delivery.disposition(createRejected(((ActiveMQAMQPException) e).getAmqpError(), e.getMessage())); + } else { + delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); + } + connection.flush(); + }); + } + @Override public void onFlow(int credits, boolean drain) { } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java index 62858c8e1c..ad6bddd485 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java @@ -132,6 +132,12 @@ public class ProtonServerReceiverContextTest { runnable.run(); return null; }).when(mockConnContext).runLater(any(Runnable.class)); + + doAnswer((Answer) invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(mockConnContext).runNow(any(Runnable.class)); ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class); when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true); when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 6f4db11e59..b0d6433074 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -363,7 +363,9 @@ public class PagingStoreImpl implements PagingStore { cursorProvider.stop(); final List pendingTasks = new ArrayList<>(); - final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add); + + // TODO we could have a parameter to use this + final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS); if (pendingTasksWhileShuttingDown > 0) { logger.tracef("Try executing %d pending tasks on stop", pendingTasksWhileShuttingDown); for (Runnable pendingTask : pendingTasks) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index f3f0c5d02b..446ae84056 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -886,8 +886,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) { - pagingManager.deletePageStore(binding.getAddress()); - deleteDuplicateCache(binding.getAddress()); } diff --git a/examples/features/perf/perf/pom.xml b/examples/features/perf/perf/pom.xml index f698af8cdc..aa39fe51c1 100644 --- a/examples/features/perf/perf/pom.xml +++ b/examples/features/perf/perf/pom.xml @@ -45,6 +45,12 @@ under the License. org.apache.activemq activemq-client ${activemq5-version} + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + org.slf4j diff --git a/pom.xml b/pom.xml index a15ed0dd10..466c001361 100644 --- a/pom.xml +++ b/pom.xml @@ -580,6 +580,12 @@ org.apache.activemq activemq-client ${activemq5-version} + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + diff --git a/tests/artemis-test-support/pom.xml b/tests/artemis-test-support/pom.xml index a1d7e7a11f..6bceaaf04a 100644 --- a/tests/artemis-test-support/pom.xml +++ b/tests/artemis-test-support/pom.xml @@ -66,6 +66,12 @@ org.apache.activemq activemq-client + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + org.fusesource.hawtbuf diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 7406d159c5..5efbfcb7bf 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -116,6 +116,18 @@ org.eclipse.paho.client.mqttv3 RELEASE + + org.apache.activemq + activemq-client + ${activemq5-version} + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + + test + diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml index 0e58838083..4a9eb47619 100644 --- a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml @@ -18,7 +18,7 @@ specific language governing permissions and limitations under the License. --> - + @@ -65,13 +65,18 @@ under the License. - - - - - - - + + + + + + + + + + + + @@ -95,8 +100,8 @@ under the License. ExpiryQueue 0 - 200MB - 100MB + 10MB + 1MB 10 PAGE @@ -107,7 +112,11 @@ under the License. - + +
+ + +
diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml index e67613d879..58a4b9fd74 100644 --- a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml @@ -18,7 +18,7 @@ specific language governing permissions and limitations under the License. --> - + @@ -67,16 +67,20 @@ under the License. - - - - - - - + + + + + + + + + + + + - @@ -98,8 +102,8 @@ under the License. ExpiryQueue 0 - 200MB - 100MB + 10MB + 1MB 10 PAGE @@ -110,7 +114,11 @@ under the License. - + +
+ + +
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java index 084c2426d2..0dd579d028 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java @@ -35,7 +35,6 @@ import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class ReplicationFlowControlTest extends SmokeTestBase { @@ -80,7 +79,6 @@ public class ReplicationFlowControlTest extends SmokeTestBase { internalTest(false); } - @Ignore // need to fix this before I can let it running @Test public void testPageWhileSyncFailover() throws Exception { internalTest(true); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java new file mode 100644 index 0000000000..f72990da86 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.replicationflow; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SoakPagingTest extends SmokeTestBase { + public static final String SERVER_NAME_0 = "replicated-static0"; + public static final String SERVER_NAME_1 = "replicated-static1"; + + static AtomicInteger produced = new AtomicInteger(0); + static AtomicInteger consumed = new AtomicInteger(0); + private static Process server0; + + private static Process server1; + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + cleanupData(SERVER_NAME_1); + + server0 = startServer(SERVER_NAME_0, 0, 30000); + server1 = startServer(SERVER_NAME_1, 0, 30000); + } + + final String destination = "exampleTopic"; + static final int consumer_threads = 20; + static final int producer_threads = 20; + static AtomicInteger j = new AtomicInteger(0); + + + public static void main(String[] arg) { + try { + final String host = "localhost"; + final int port = 61616; + + final ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory("failover:(amqp://" + host + ":" + port + ")"); + + for (int i = 0; i < producer_threads; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + SoakPagingTest app = new SoakPagingTest(); + app.produce(factory); + } + }); + t.start(); + } + + Thread.sleep(1000); + + for (int i = 0; i < consumer_threads; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + SoakPagingTest app = new SoakPagingTest(); + app.consume(factory, j.getAndIncrement()); + } + }); + t.start(); + } + Thread.sleep(15000); + + System.exit(consumed.get()); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + @Test + public void testPagingReplication() throws Throwable { + for (int i = 0; i < 3; i++) { + Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName()); + Assert.assertTrue(process.waitFor() > 0); + } + + server1.destroy(); + + server1 = startServer(SERVER_NAME_1, 0, 30000); + + for (int i = 0; i < 2; i++) { + Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName()); + Assert.assertTrue(process.waitFor() > 0); + } + } + + public void produce(ConnectionFactory factory) { + try { + Connection connection = factory.createConnection("admin", "admin"); + + connection.start(); + final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + + Destination address = session.createTopic(destination); + MessageProducer messageProducer = session.createProducer(address); + + int i = 0; + while (true) { + Message message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf"); + + messageProducer.send(message); + produced.incrementAndGet(); + i++; + if (i % 100 == 0) + System.out.println("Published " + i + " messages"); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void consume(ConnectionFactory factory, int j) { + try { + Connection connection = factory.createConnection("admin", "admin"); + + final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + + Topic address = session.createTopic(destination); + String consumerId = "ss" + (j % 5); + MessageConsumer messageConsumer = session.createSharedConsumer(address, consumerId); + + Thread.sleep(5000); + connection.start(); + + int i = 0; + while (true) { + Message m = messageConsumer.receive(1000); + consumed.incrementAndGet(); + if (m == null) + System.out.println("receive() returned null"); + i++; + if (i % 100 == 0) + System.out.println("Consumed " + i + " messages"); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +}