This commit is contained in:
Clebert Suconic 2020-03-23 20:03:00 -04:00
commit 489e6b38e0
19 changed files with 448 additions and 153 deletions

View File

@ -46,7 +46,7 @@ public interface ArtemisExecutor extends Executor {
* @param onPendingTask it will be called for each pending task found * @param onPendingTask it will be called for each pending task found
* @return the number of pending tasks that won't be executed * @return the number of pending tasks that won't be executed
*/ */
default int shutdownNow(Consumer<? super Runnable> onPendingTask) { default int shutdownNow(Consumer<? super Runnable> onPendingTask, int timeout, TimeUnit unit) {
return 0; return 0;
} }
@ -73,7 +73,7 @@ public interface ArtemisExecutor extends Executor {
*/ */
default int shutdownNow() { default int shutdownNow() {
return shutdownNow(t -> { return shutdownNow(t -> {
}); }, 1, TimeUnit.SECONDS);
} }

View File

@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -99,46 +98,27 @@ public abstract class ProcessorBase<T> extends HandlerBase {
} }
} }
/** /** It will shutdown the executor however it will not wait for finishing tasks*/
* It will wait the current execution (if there is one) to finish public int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit unit) {
* but will not complete any further executions
*/
public int shutdownNow(Consumer<? super T> onPendingItem) {
//alert anyone that has been requested (at least) an immediate shutdown //alert anyone that has been requested (at least) an immediate shutdown
requestedForcedShutdown = true; requestedForcedShutdown = true;
requestedShutdown = true; requestedShutdown = true;
if (inHandler()) { if (!inHandler()) {
// We don't have an option where we could do an immediate timeout
// I just need to make one roundtrip to make sure there's no pending tasks on the loop
// for that I ellected one second
flush(timeout, unit);
}
stateUpdater.set(this, STATE_FORCED_SHUTDOWN); stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
} else {
//it could take a very long time depending on the current executing task
do {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) {
//another thread has completed a forced shutdown: let it to manage the tasks cleanup
break;
}
if (startState == STATE_RUNNING) {
//wait 100 ms to avoid burning CPU while waiting and
//give other threads a chance to make progress
LockSupport.parkNanos(100_000_000L);
}
}
while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
//this could happen just one time: the forced shutdown state is the last one and
//can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued
}
int pendingItems = 0; int pendingItems = 0;
//there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them
synchronized (tasks) {
T item; T item;
while ((item = tasks.poll()) != null) { while ((item = tasks.poll()) != null) {
onPendingItem.accept(item); onPendingItem.accept(item);
pendingItems++; pendingItems++;
} }
}
return pendingItems; return pendingItems;
} }
@ -184,6 +164,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
protected void task(T command) { protected void task(T command) {
if (requestedShutdown) { if (requestedShutdown) {
logAddOnShutdown(); logAddOnShutdown();
return;
} }
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
tasks.add(command); tasks.add(command);
@ -203,11 +184,6 @@ public abstract class ProcessorBase<T> extends HandlerBase {
if (state == STATE_NOT_RUNNING) { if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited //startPoller could be deleted but is maintained because is inherited
delegate.execute(task); delegate.execute(task);
} else if (state == STATE_FORCED_SHUTDOWN) {
//help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add
synchronized (tasks) {
tasks.clear();
}
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.utils.actors;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -69,9 +70,7 @@ public class OrderedExecutorSanityTest {
executor.shutdownNow(); executor.shutdownNow();
Assert.assertEquals("There are no remaining tasks to be executed", 0, executor.remaining()); Assert.assertEquals("There are no remaining tasks to be executed", 0, executor.remaining());
//from now on new tasks won't be executed //from now on new tasks won't be executed
final CountDownLatch afterDeatchExecution = new CountDownLatch(1); executor.execute(() -> System.out.println("this will never happen"));
executor.execute(afterDeatchExecution::countDown);
Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(100, TimeUnit.MILLISECONDS));
//to avoid memory leaks the executor must take care of the new submitted tasks immediatly //to avoid memory leaks the executor must take care of the new submitted tasks immediatly
Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining()); Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining());
} finally { } finally {
@ -82,11 +81,11 @@ public class OrderedExecutorSanityTest {
@Test @Test
public void shutdownNowOnDelegateExecutor() throws InterruptedException { public void shutdownNowOnDelegateExecutor() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor(); final ExecutorService executorService = Executors.newSingleThreadExecutor();
try { try {
final OrderedExecutor executor = new OrderedExecutor(executorService); final OrderedExecutor executor = new OrderedExecutor(executorService);
final CountDownLatch latch = new CountDownLatch(1); final CyclicBarrier latch = new CyclicBarrier(2);
final AtomicInteger numberOfTasks = new AtomicInteger(0); final AtomicInteger numberOfTasks = new AtomicInteger(0);
final CountDownLatch ran = new CountDownLatch(1); final CountDownLatch ran = new CountDownLatch(1);
@ -105,7 +104,7 @@ public class OrderedExecutorSanityTest {
executor.execute(() -> System.out.println("Dont worry, this will never happen")); executor.execute(() -> System.out.println("Dont worry, this will never happen"));
} }
latch.countDown(); latch.await();
ran.await(1, TimeUnit.SECONDS); ran.await(1, TimeUnit.SECONDS);
Assert.assertEquals(100, numberOfTasks.get()); Assert.assertEquals(100, numberOfTasks.get());
@ -116,6 +115,44 @@ public class OrderedExecutorSanityTest {
} }
} }
@Test
public void shutdownNowWithBlocked() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final OrderedExecutor executor = new OrderedExecutor(executorService);
final CyclicBarrier latch = new CyclicBarrier(2);
final CyclicBarrier secondlatch = new CyclicBarrier(2);
final CountDownLatch ran = new CountDownLatch(1);
executor.execute(() -> {
try {
latch.await(1, TimeUnit.MINUTES);
secondlatch.await(1, TimeUnit.MINUTES);
ran.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
for (int i = 0; i < 100; i++) {
executor.execute(() -> System.out.println("Dont worry, this will never happen"));
}
latch.await();
try {
Assert.assertEquals(100, executor.shutdownNow());
} finally {
secondlatch.await();
}
Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status());
Assert.assertEquals(0, executor.remaining());
} finally {
executorService.shutdown();
}
}
@Test @Test
public void testMeasure() throws InterruptedException { public void testMeasure() throws InterruptedException {

View File

@ -162,6 +162,12 @@
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId> <artifactId>activemq-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>

View File

@ -61,7 +61,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.RunnableEx;
import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -142,7 +141,18 @@ public class AMQPSessionCallback implements SessionCallback {
return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED; return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED;
} }
public void withinContext(RunnableEx run) throws Exception { public void withinSessionExecutor(Runnable run) {
sessionExecutor.execute(() -> {
try {
withinContext(run);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
public void withinContext(Runnable run) throws Exception {
OperationContext context = recoverContext(); OperationContext context = recoverContext();
try { try {
run.run(); run.run();
@ -438,18 +448,6 @@ public class AMQPSessionCallback implements SessionCallback {
return new AMQPStandardMessage(delivery.getMessageFormat(), data, null, coreMessageObjectPools); return new AMQPStandardMessage(delivery.getMessageFormat(), data, null, coreMessageObjectPools);
} }
public void serverSend(final ProtonServerReceiverContext context,
final Transaction transaction,
final Receiver receiver,
final Delivery delivery,
SimpleString address,
int messageFormat,
ReadableBuffer data,
RoutingContext routingContext) throws Exception {
AMQPStandardMessage message = new AMQPStandardMessage(messageFormat, data, null, coreMessageObjectPools);
serverSend(context, transaction, receiver, delivery, address, routingContext, message);
}
public void serverSend(ProtonServerReceiverContext context, public void serverSend(ProtonServerReceiverContext context,
Transaction transaction, Transaction transaction,
Receiver receiver, Receiver receiver,
@ -491,7 +489,9 @@ public class AMQPSessionCallback implements SessionCallback {
throw e; throw e;
} }
} else { } else {
serverSend(context, transaction, message, delivery, receiver, routingContext); message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
// We need to transfer IO execution to a different thread otherwise we may deadlock netty loop
sessionExecutor.execute(() -> inSessionSend(context, transaction, message, delivery, receiver, routingContext));
} }
} finally { } finally {
resetContext(oldcontext); resetContext(oldcontext);
@ -523,13 +523,14 @@ public class AMQPSessionCallback implements SessionCallback {
} }
private void serverSend(final ProtonServerReceiverContext context, private void inSessionSend(final ProtonServerReceiverContext context,
final Transaction transaction, final Transaction transaction,
final Message message, final Message message,
final Delivery delivery, final Delivery delivery,
final Receiver receiver, final Receiver receiver,
final RoutingContext routingContext) throws Exception { final RoutingContext routingContext) {
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); OperationContext oldContext = recoverContext();
try {
if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) { if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) {
serverSession.send(transaction, message, directDeliver, false, routingContext); serverSession.send(transaction, message, directDeliver, false, routingContext);
@ -548,21 +549,32 @@ public class AMQPSessionCallback implements SessionCallback {
} }
delivery.settle(); delivery.settle();
context.flow(); context.flow();
connection.flush(); connection.instantFlush();
}); });
} }
@Override @Override
public void onError(int errorCode, String errorMessage) { public void onError(int errorCode, String errorMessage) {
connection.runNow(() -> { sendError(errorCode, errorMessage, receiver);
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
connection.flush();
});
} }
}); });
} else { } else {
rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message"); rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message");
} }
} catch (Exception e) {
logger.warn(e.getMessage(), e);
context.deliveryFailed(delivery, receiver, e);
} finally {
resetContext(oldContext);
}
}
private void sendError(int errorCode, String errorMessage, Receiver receiver) {
connection.runNow(() -> {
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
connection.flush();
});
} }
/** Will execute a Runnable on an Address when there's space in memory*/ /** Will execute a Runnable on an Address when there's space in memory*/

View File

@ -360,17 +360,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(this, tx, receiver, delivery, address, routingContext, message); sessionSPI.serverSend(this, tx, receiver, delivery, address, routingContext, message);
} catch (Exception e) { } catch (Exception e) {
log.warn(e.getMessage(), e); log.warn(e.getMessage(), e);
deliveryFailed(delivery, receiver, e);
}
}
public void deliveryFailed(Delivery delivery, Receiver receiver, Exception e) {
connection.runNow(() -> {
DeliveryState deliveryState = determineDeliveryState(((Source) receiver.getSource()), DeliveryState deliveryState = determineDeliveryState(((Source) receiver.getSource()),
useModified, useModified,
e); e);
connection.runLater(() -> {
delivery.disposition(deliveryState); delivery.disposition(deliveryState);
delivery.settle(); delivery.settle();
flow(); flow();
connection.flush(); connection.flush();
}); });
}
} }
private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) { private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) {

View File

@ -62,7 +62,6 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome; import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@ -630,16 +629,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// this can happen in the twice ack mode, that is the receiver accepts and settles separately // this can happen in the twice ack mode, that is the receiver accepts and settles separately
// acking again would show an exception but would have no negative effect but best to handle anyway. // acking again would show an exception but would have no negative effect but best to handle anyway.
if (!delivery.isSettled()) { if (!delivery.isSettled()) {
// we have to individual ack as we can't guarantee we will get the delivery updates inSessionACK(delivery, message);
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
delivery.settle();
} }
} else { } else {
handleExtendedDeliveryOutcomes(message, delivery, remoteState); handleExtendedDeliveryOutcomes(message, delivery, remoteState);
@ -654,6 +644,37 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
private void inSessionACK(Delivery delivery, Message message) throws ActiveMQAMQPIllegalStateException {
OperationContext oldContext = sessionSPI.recoverContext();
try {
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
sessionSPI.afterIO(new IOCallback() {
@Override
public void done() {
connection.runLater(() -> {
delivery.settle();
connection.instantFlush();
});
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
} finally {
sessionSPI.resetContext(oldContext);
}
}
private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException { private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
boolean settleImmediate = true; boolean settleImmediate = true;
boolean handled = true; boolean handled = true;

View File

@ -145,26 +145,45 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}; };
if (discharge.getFail()) { if (discharge.getFail()) {
sessionSPI.withinContext(() -> tx.rollback()); sessionSPI.withinSessionExecutor(() -> {
try {
tx.rollback();
sessionSPI.afterIO(ioAction); sessionSPI.afterIO(ioAction);
} catch (Throwable e) {
txError(delivery, e);
}
});
} else { } else {
sessionSPI.withinContext(() -> tx.commit()); sessionSPI.withinSessionExecutor(() -> {
try {
tx.commit();
sessionSPI.afterIO(ioAction); sessionSPI.afterIO(ioAction);
} catch (Throwable e) {
txError(delivery, e);
}
});
} }
} }
} catch (ActiveMQAMQPException amqpE) { } catch (ActiveMQAMQPException amqpE) {
log.warn(amqpE.getMessage(), amqpE); txError(delivery, amqpE);
delivery.settle();
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
connection.flush();
} catch (Throwable e) { } catch (Throwable e) {
log.warn(e.getMessage(), e); txError(delivery, e);
delivery.settle();
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
connection.flush();
} }
} }
private void txError(Delivery delivery, Throwable e) {
log.warn(e.getMessage(), e);
connection.runNow(() -> {
delivery.settle();
if (e instanceof ActiveMQAMQPException) {
delivery.disposition(createRejected(((ActiveMQAMQPException) e).getAmqpError(), e.getMessage()));
} else {
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
}
connection.flush();
});
}
@Override @Override
public void onFlow(int credits, boolean drain) { public void onFlow(int credits, boolean drain) {
} }

View File

@ -132,6 +132,12 @@ public class ProtonServerReceiverContextTest {
runnable.run(); runnable.run();
return null; return null;
}).when(mockConnContext).runLater(any(Runnable.class)); }).when(mockConnContext).runLater(any(Runnable.class));
doAnswer((Answer<Void>) invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(mockConnContext).runNow(any(Runnable.class));
ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class); ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class);
when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true); when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true);
when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager); when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager);

View File

@ -363,7 +363,9 @@ public class PagingStoreImpl implements PagingStore {
cursorProvider.stop(); cursorProvider.stop();
final List<Runnable> pendingTasks = new ArrayList<>(); final List<Runnable> pendingTasks = new ArrayList<>();
final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add);
// TODO we could have a parameter to use this
final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS);
if (pendingTasksWhileShuttingDown > 0) { if (pendingTasksWhileShuttingDown > 0) {
logger.tracef("Try executing %d pending tasks on stop", pendingTasksWhileShuttingDown); logger.tracef("Try executing %d pending tasks on stop", pendingTasksWhileShuttingDown);
for (Runnable pendingTask : pendingTasks) { for (Runnable pendingTask : pendingTasks) {

View File

@ -886,8 +886,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) { if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
pagingManager.deletePageStore(binding.getAddress());
deleteDuplicateCache(binding.getAddress()); deleteDuplicateCache(binding.getAddress());
} }

View File

@ -45,6 +45,12 @@ under the License.
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId> <artifactId>activemq-client</artifactId>
<version>${activemq5-version}</version> <version>${activemq5-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@ -580,6 +580,12 @@
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId> <artifactId>activemq-client</artifactId>
<version>${activemq5-version}</version> <version>${activemq5-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
<!-- License: Apache 2.0 --> <!-- License: Apache 2.0 -->
</dependency> </dependency>
<dependency> <dependency>

View File

@ -66,6 +66,12 @@
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId> <artifactId>activemq-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.fusesource.hawtbuf</groupId> <groupId>org.fusesource.hawtbuf</groupId>

View File

@ -116,6 +116,18 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>RELEASE</version> <version>RELEASE</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${activemq5-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -65,13 +65,18 @@ under the License.
<security-settings> <security-settings>
<!--security for example queue--> <!--security for example queue-->
<security-setting match="exampleQueue"> <security-setting match="#">
<permission roles="guest" type="createDurableQueue"/> <permission type="createNonDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="deleteDurableQueue"/> <permission type="deleteNonDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="createNonDurableQueue"/> <permission type="createDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="deleteNonDurableQueue"/> <permission type="deleteDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="consume"/> <permission type="createAddress" roles="amq, guest"/>
<permission roles="guest" type="send"/> <permission type="deleteAddress" roles="amq, guest"/>
<permission type="consume" roles="amq, guest"/>
<permission type="browse" roles="amq, guest"/>
<permission type="send" roles="amq, guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting> </security-setting>
</security-settings> </security-settings>
<address-settings> <address-settings>
@ -95,8 +100,8 @@ under the License.
<expiry-address>ExpiryQueue</expiry-address> <expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay> <redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting --> <!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>200MB</max-size-bytes> <max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>100MB</page-size-bytes> <page-size-bytes>1MB</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit> <message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy> <address-full-policy>PAGE</address-full-policy>
@ -108,6 +113,10 @@ under the License.
</address-settings> </address-settings>
<addresses> <addresses>
<address name="exampleTopic">
<multicast>
</multicast>
</address>
<address name="exampleQueue"> <address name="exampleQueue">
<anycast> <anycast>
<queue name="exampleQueue"/> <queue name="exampleQueue"/>

View File

@ -67,16 +67,20 @@ under the License.
<security-settings> <security-settings>
<!--security for example queue--> <!--security for example queue-->
<security-setting match="exampleQueue"> <security-setting match="#">
<permission roles="guest" type="createDurableQueue"/> <permission type="createNonDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="deleteDurableQueue"/> <permission type="deleteNonDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="createNonDurableQueue"/> <permission type="createDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="deleteNonDurableQueue"/> <permission type="deleteDurableQueue" roles="amq, guest"/>
<permission roles="guest" type="consume"/> <permission type="createAddress" roles="amq, guest"/>
<permission roles="guest" type="send"/> <permission type="deleteAddress" roles="amq, guest"/>
<permission type="consume" roles="amq, guest"/>
<permission type="browse" roles="amq, guest"/>
<permission type="send" roles="amq, guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting> </security-setting>
</security-settings> </security-settings>
<address-settings> <address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create --> <!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#"> <address-setting match="activemq.management#">
@ -98,8 +102,8 @@ under the License.
<expiry-address>ExpiryQueue</expiry-address> <expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay> <redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting --> <!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>200MB</max-size-bytes> <max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>100MB</page-size-bytes> <page-size-bytes>1MB</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit> <message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy> <address-full-policy>PAGE</address-full-policy>
@ -111,6 +115,10 @@ under the License.
</address-settings> </address-settings>
<addresses> <addresses>
<address name="exampleTopic">
<multicast>
</multicast>
</address>
<address name="exampleQueue"> <address name="exampleQueue">
<anycast> <anycast>
<queue name="exampleQueue"/> <queue name="exampleQueue"/>

View File

@ -35,7 +35,6 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class ReplicationFlowControlTest extends SmokeTestBase { public class ReplicationFlowControlTest extends SmokeTestBase {
@ -80,7 +79,6 @@ public class ReplicationFlowControlTest extends SmokeTestBase {
internalTest(false); internalTest(false);
} }
@Ignore // need to fix this before I can let it running
@Test @Test
public void testPageWhileSyncFailover() throws Exception { public void testPageWhileSyncFailover() throws Exception {
internalTest(true); internalTest(true);

View File

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.replicationflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SoakPagingTest extends SmokeTestBase {
public static final String SERVER_NAME_0 = "replicated-static0";
public static final String SERVER_NAME_1 = "replicated-static1";
static AtomicInteger produced = new AtomicInteger(0);
static AtomicInteger consumed = new AtomicInteger(0);
private static Process server0;
private static Process server1;
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
server0 = startServer(SERVER_NAME_0, 0, 30000);
server1 = startServer(SERVER_NAME_1, 0, 30000);
}
final String destination = "exampleTopic";
static final int consumer_threads = 20;
static final int producer_threads = 20;
static AtomicInteger j = new AtomicInteger(0);
public static void main(String[] arg) {
try {
final String host = "localhost";
final int port = 61616;
final ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory("failover:(amqp://" + host + ":" + port + ")");
for (int i = 0; i < producer_threads; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
SoakPagingTest app = new SoakPagingTest();
app.produce(factory);
}
});
t.start();
}
Thread.sleep(1000);
for (int i = 0; i < consumer_threads; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
SoakPagingTest app = new SoakPagingTest();
app.consume(factory, j.getAndIncrement());
}
});
t.start();
}
Thread.sleep(15000);
System.exit(consumed.get());
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testPagingReplication() throws Throwable {
for (int i = 0; i < 3; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
Assert.assertTrue(process.waitFor() > 0);
}
server1.destroy();
server1 = startServer(SERVER_NAME_1, 0, 30000);
for (int i = 0; i < 2; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
Assert.assertTrue(process.waitFor() > 0);
}
}
public void produce(ConnectionFactory factory) {
try {
Connection connection = factory.createConnection("admin", "admin");
connection.start();
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Destination address = session.createTopic(destination);
MessageProducer messageProducer = session.createProducer(address);
int i = 0;
while (true) {
Message message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf");
messageProducer.send(message);
produced.incrementAndGet();
i++;
if (i % 100 == 0)
System.out.println("Published " + i + " messages");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void consume(ConnectionFactory factory, int j) {
try {
Connection connection = factory.createConnection("admin", "admin");
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Topic address = session.createTopic(destination);
String consumerId = "ss" + (j % 5);
MessageConsumer messageConsumer = session.createSharedConsumer(address, consumerId);
Thread.sleep(5000);
connection.start();
int i = 0;
while (true) {
Message m = messageConsumer.receive(1000);
consumed.incrementAndGet();
if (m == null)
System.out.println("receive() returned null");
i++;
if (i % 100 == 0)
System.out.println("Consumed " + i + " messages");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}