NO-JIRA Adding tests over JMSBridge statistics
This commit is contained in:
parent
6536b4238c
commit
824e0b6e7e
1
pom.xml
1
pom.xml
|
@ -1600,6 +1600,7 @@
|
|||
<exclude>**/.factorypath</exclude>
|
||||
<exclude>**/org.apache.activemq.artemis.cfg</exclude>
|
||||
<exclude>**/nb-configuration.xml</exclude>
|
||||
<exclude>**/nbactions-tests.xml</exclude>
|
||||
<!-- activemq5 unit tests exclude -->
|
||||
<exclude>**/*.data</exclude>
|
||||
<exclude>**/*.bin</exclude>
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.jms.bridge;
|
||||
|
||||
import com.arjuna.ats.arjuna.common.Uid;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.transaction.HeuristicMixedException;
|
||||
import javax.transaction.HeuristicRollbackException;
|
||||
import javax.transaction.InvalidTransactionException;
|
||||
import javax.transaction.NotSupportedException;
|
||||
import javax.transaction.RollbackException;
|
||||
import javax.transaction.Synchronization;
|
||||
import javax.transaction.SystemException;
|
||||
import javax.transaction.Transaction;
|
||||
import javax.transaction.TransactionManager;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
|
||||
public class FailingTransactionManager implements TransactionManager {
|
||||
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
private final TransactionManager tm;
|
||||
private int calls;
|
||||
private final int limit;
|
||||
private final AtomicInteger failures = new AtomicInteger(0);
|
||||
private final Map<Uid, FailingTransaction> transactions = Collections.synchronizedMap(new HashMap<>(10));
|
||||
|
||||
public FailingTransactionManager(TransactionManager tm, int limit) {
|
||||
this.tm = tm;
|
||||
this.calls = 0;
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void begin() throws NotSupportedException, SystemException {
|
||||
tm.begin();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException {
|
||||
transactions.remove(((com.arjuna.ats.jta.transaction.Transaction) tm.getTransaction()).get_uid()).commit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollback() throws IllegalStateException, SecurityException, SystemException {
|
||||
transactions.remove(((com.arjuna.ats.jta.transaction.Transaction) tm.getTransaction()).get_uid()).rollback();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRollbackOnly() throws IllegalStateException, SystemException {
|
||||
tm.setRollbackOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStatus() throws SystemException {
|
||||
return tm.getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transaction getTransaction() throws SystemException {
|
||||
com.arjuna.ats.jta.transaction.Transaction real = (com.arjuna.ats.jta.transaction.Transaction) tm.getTransaction();
|
||||
if (transactions.containsKey(real.get_uid())) {
|
||||
return transactions.get(real.get_uid());
|
||||
}
|
||||
FailingTransaction tx = new FailingTransaction(real, calls++);
|
||||
transactions.put(real.get_uid(), tx);
|
||||
return tx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTransactionTimeout(int i) throws SystemException {
|
||||
tm.setTransactionTimeout(i);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transaction suspend() throws SystemException {
|
||||
Transaction real = tm.suspend();
|
||||
if (real == null) {
|
||||
return null;
|
||||
}
|
||||
return transactions.get(((com.arjuna.ats.jta.transaction.Transaction) real).get_uid());
|
||||
}
|
||||
|
||||
public int getFailures() {
|
||||
return failures.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume(Transaction transaction) throws InvalidTransactionException, IllegalStateException, SystemException {
|
||||
tm.resume(((FailingTransaction)transaction).transaction);
|
||||
}
|
||||
|
||||
private final class FailingTransaction implements Transaction {
|
||||
|
||||
private final com.arjuna.ats.jta.transaction.Transaction transaction;
|
||||
private final int number;
|
||||
|
||||
private FailingTransaction(com.arjuna.ats.jta.transaction.Transaction transaction, int number) {
|
||||
this.transaction = transaction;
|
||||
this.number = number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException {
|
||||
if (number < limit) {
|
||||
transaction.commit();
|
||||
transactions.remove(transaction.get_uid());
|
||||
} else {
|
||||
int fails = failures.incrementAndGet();
|
||||
RollbackException ex = new RollbackException("Expected rollback for test");
|
||||
log.tracef(ex, "We are about to fail commit for %s th time", fails);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean delistResource(XAResource arg0, int arg1) throws IllegalStateException, SystemException {
|
||||
return transaction.delistResource(arg0, arg1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean enlistResource(XAResource arg0) throws RollbackException, IllegalStateException, SystemException {
|
||||
return transaction.enlistResource(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStatus() throws SystemException {
|
||||
return transaction.getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerSynchronization(Synchronization arg0) throws RollbackException, IllegalStateException, SystemException {
|
||||
transaction.registerSynchronization(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollback() throws IllegalStateException, SystemException {
|
||||
transaction.rollback();
|
||||
transactions.remove(transaction.get_uid());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRollbackOnly() throws IllegalStateException, SystemException {
|
||||
transaction.setRollbackOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "FailingTransaction{" + "transaction=" + transaction.get_uid() + ", number=" + number + '}';
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -42,6 +42,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
|||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import static org.apache.activemq.artemis.core.settings.impl.AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;
|
||||
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
|
||||
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
|
||||
import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
|
||||
|
@ -791,6 +792,8 @@ public class JMSBridgeTest extends BridgeTestBase {
|
|||
|
||||
TransactionManager mgr = newTransactionManager();
|
||||
|
||||
final int NUM_MESSAGES = 10;
|
||||
|
||||
try {
|
||||
|
||||
toResume = mgr.suspend();
|
||||
|
@ -799,14 +802,14 @@ public class JMSBridgeTest extends BridgeTestBase {
|
|||
|
||||
started = mgr.getTransaction();
|
||||
|
||||
final int NUM_MESSAGES = 10;
|
||||
|
||||
bridge = new JMSBridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory, null, null, null, null, null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE, 1, -1, null, null, false).setBridgeName("test-bridge");
|
||||
bridge.start();
|
||||
|
||||
sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage);
|
||||
|
||||
checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
|
||||
Assert.assertEquals(0L, bridge.getAbortedMessageCount());
|
||||
Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount());
|
||||
} finally {
|
||||
if (started != null) {
|
||||
try {
|
||||
|
@ -829,6 +832,47 @@ public class JMSBridgeTest extends BridgeTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortedMessages() throws Exception {
|
||||
JMSBridgeImpl bridge = null;
|
||||
|
||||
final int NUM_MESSAGES = 20;
|
||||
final int MAX_BATCH_SIZE = 1;
|
||||
final int RETRY = 2;
|
||||
final int LIMIT = 2;
|
||||
final int FAILURES = (NUM_MESSAGES - LIMIT) * DEFAULT_MAX_DELIVERY_ATTEMPTS;
|
||||
FailingTransactionManager transactionManager = new FailingTransactionManager(newTransactionManager(), LIMIT);
|
||||
try {
|
||||
bridge = new JMSBridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory, null, null, null, null, null, 5000, RETRY, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, MAX_BATCH_SIZE, -1, null, null, false).setBridgeName("test-bridge");
|
||||
bridge.setTransactionManager(transactionManager);
|
||||
bridge.start();
|
||||
sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, false, false);
|
||||
try (Connection conn = cf1.createConnection()) {
|
||||
conn.start();
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer cons = sess.createConsumer(targetQueue);
|
||||
// Consume the messages
|
||||
for (int i = 0; i <= LIMIT; i++) {
|
||||
Message tm = cons.receive(3000);
|
||||
if (tm != null) {
|
||||
Assert.assertNotNull("Message " + i + " is null", tm);
|
||||
Assert.assertEquals("message" + i, ((TextMessage) tm).getText());
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert.assertEquals("We didn't get the correct number failures", FAILURES, transactionManager.getFailures());
|
||||
Assert.assertEquals("We didn't get the correct number of aborted messages", FAILURES, bridge.getAbortedMessageCount());
|
||||
Assert.assertEquals("We didn't get the correct number of processed messages", FAILURES + LIMIT, bridge.getMessageCount());
|
||||
} finally {
|
||||
if (bridge != null) {
|
||||
bridge.stop();
|
||||
}
|
||||
}
|
||||
Assert.assertEquals("We didn't get the correct number failures", FAILURES, transactionManager.getFailures());
|
||||
Assert.assertEquals("We didn't get the correct number of aborted messages", FAILURES, bridge.getAbortedMessageCount());
|
||||
Assert.assertEquals("We didn't get the correct number of processed messages", FAILURES + LIMIT, bridge.getMessageCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonDurableSubscriberLargeMessage() throws Exception {
|
||||
internalTestNonDurableSubscriber(true, 1);
|
||||
|
@ -852,6 +896,8 @@ public class JMSBridgeTest extends BridgeTestBase {
|
|||
sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage);
|
||||
|
||||
checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
|
||||
Assert.assertEquals(0L, bridge.getAbortedMessageCount());
|
||||
Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount());
|
||||
} finally {
|
||||
if (bridge != null) {
|
||||
bridge.stop();
|
||||
|
@ -882,6 +928,8 @@ public class JMSBridgeTest extends BridgeTestBase {
|
|||
sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true, largeMessage);
|
||||
|
||||
checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
|
||||
Assert.assertEquals(0L, bridge.getAbortedMessageCount());
|
||||
Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount());
|
||||
} finally {
|
||||
if (bridge != null) {
|
||||
bridge.stop();
|
||||
|
|
Loading…
Reference in New Issue