Add missing @Overrides
This commit is contained in:
parent
e285c2f30c
commit
e6e1311f8d
|
@ -139,6 +139,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void nodeDown(long eventUID, String nodeID) {
|
public void nodeDown(long eventUID, String nodeID) {
|
||||||
if (topologyMap.remove(nodeID) != null) {
|
if (topologyMap.remove(nodeID) != null) {
|
||||||
updateClientClusterInfo();
|
updateClientClusterInfo();
|
||||||
|
|
|
@ -278,6 +278,7 @@ public class AMQSession implements SessionCallback {
|
||||||
|
|
||||||
if (sendProducerAck) {
|
if (sendProducerAck) {
|
||||||
runnable = new Runnable() {
|
runnable = new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
||||||
|
@ -302,6 +303,7 @@ public class AMQSession implements SessionCallback {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
runnable = new Runnable() {
|
runnable = new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
transportConnection.setAutoRead(true);
|
transportConnection.setAutoRead(true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1317,6 +1317,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
return tcConfigs;
|
return tcConfigs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String debugConnectors() {
|
public String debugConnectors() {
|
||||||
StringWriter stringWriter = new StringWriter();
|
StringWriter stringWriter = new StringWriter();
|
||||||
PrintWriter writer = new PrintWriter(stringWriter);
|
PrintWriter writer = new PrintWriter(stringWriter);
|
||||||
|
|
|
@ -1063,6 +1063,7 @@ public class PagingStoreImpl implements PagingStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
// To be used on isDropMessagesWhenFull
|
// To be used on isDropMessagesWhenFull
|
||||||
|
@Override
|
||||||
public boolean isFull() {
|
public boolean isFull() {
|
||||||
return maxSize > 0 && getAddressSize() > maxSize;
|
return maxSize > 0 && getAddressSize() > maxSize;
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,6 +190,7 @@ public class MessageReferenceImpl implements MessageReference {
|
||||||
this.acknowledge(null);
|
this.acknowledge(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void acknowledge(Transaction tx) throws Exception {
|
public void acknowledge(Transaction tx) throws Exception {
|
||||||
if (tx == null) {
|
if (tx == null) {
|
||||||
getQueue().acknowledge(this);
|
getQueue().acknowledge(this);
|
||||||
|
|
|
@ -2337,6 +2337,7 @@ public class QueueImpl implements Queue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
|
public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
|
||||||
sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
|
sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
|
||||||
}
|
}
|
||||||
|
|
|
@ -736,6 +736,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
* This method will fetch the delivering references, remove them from the delivering list and return a list.
|
* This method will fetch the delivering references, remove them from the delivering list and return a list.
|
||||||
*
|
*
|
||||||
* This will be useful for other protocols that will need this such as openWire or MQTT. */
|
* This will be useful for other protocols that will need this such as openWire or MQTT. */
|
||||||
|
@Override
|
||||||
public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove, Object protocolDataStart, Object protocolDataEnd) {
|
public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove, Object protocolDataStart, Object protocolDataEnd) {
|
||||||
LinkedList<MessageReference> retReferences = new LinkedList<>();
|
LinkedList<MessageReference> retReferences = new LinkedList<>();
|
||||||
boolean hit = false;
|
boolean hit = false;
|
||||||
|
|
|
@ -253,6 +253,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
this.securityEnabled = false;
|
this.securityEnabled = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isClosed() {
|
public boolean isClosed() {
|
||||||
return closed;
|
return closed;
|
||||||
}
|
}
|
||||||
|
@ -395,6 +396,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
return this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null);
|
return this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ServerConsumer createConsumer(final long consumerID,
|
public ServerConsumer createConsumer(final long consumerID,
|
||||||
final SimpleString queueName,
|
final SimpleString queueName,
|
||||||
final SimpleString filterString,
|
final SimpleString filterString,
|
||||||
|
@ -458,6 +460,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
/** Some protocols may chose to hold their transactions outside of the ServerSession.
|
/** Some protocols may chose to hold their transactions outside of the ServerSession.
|
||||||
* This can be used to replace the transaction.
|
* This can be used to replace the transaction.
|
||||||
* Notice that we set autoCommitACK and autoCommitSends to true if tx == null */
|
* Notice that we set autoCommitACK and autoCommitSends to true if tx == null */
|
||||||
|
@Override
|
||||||
public void resetTX(Transaction transaction) {
|
public void resetTX(Transaction transaction) {
|
||||||
this.tx = transaction;
|
this.tx = transaction;
|
||||||
this.autoCommitAcks = transaction == null;
|
this.autoCommitAcks = transaction == null;
|
||||||
|
@ -648,6 +651,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ServerConsumer locateConsumer(long consumerID) {
|
public ServerConsumer locateConsumer(long consumerID) {
|
||||||
return consumers.get(consumerID);
|
return consumers.get(consumerID);
|
||||||
}
|
}
|
||||||
|
@ -756,6 +760,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
/**
|
/**
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public Transaction newTransaction() {
|
public Transaction newTransaction() {
|
||||||
return new TransactionImpl(null, storageManager, timeoutSeconds);
|
return new TransactionImpl(null, storageManager, timeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ public abstract class AutoFailTestSupport extends TestCase {
|
||||||
private boolean useAutoFail; // Disable auto fail by default
|
private boolean useAutoFail; // Disable auto fail by default
|
||||||
private AtomicBoolean isTestSuccess;
|
private AtomicBoolean isTestSuccess;
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
// Runs the auto fail thread before performing any setup
|
// Runs the auto fail thread before performing any setup
|
||||||
if (isAutoFail()) {
|
if (isAutoFail()) {
|
||||||
|
@ -52,6 +53,7 @@ public abstract class AutoFailTestSupport extends TestCase {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
|
|
||||||
|
@ -69,6 +71,7 @@ public abstract class AutoFailTestSupport extends TestCase {
|
||||||
setAutoFail(true);
|
setAutoFail(true);
|
||||||
isTestSuccess = new AtomicBoolean(false);
|
isTestSuccess = new AtomicBoolean(false);
|
||||||
autoFailThread = new Thread(new Runnable() {
|
autoFailThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// Wait for test to finish succesfully
|
// Wait for test to finish succesfully
|
||||||
|
|
|
@ -52,6 +52,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
|
||||||
|
|
||||||
public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
|
public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
BrokerService.disableWrapper = disableWrapper;
|
BrokerService.disableWrapper = disableWrapper;
|
||||||
File tmpRoot = new File("./target/tmp");
|
File tmpRoot = new File("./target/tmp");
|
||||||
|
|
|
@ -40,6 +40,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
|
||||||
/**
|
/**
|
||||||
* @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
|
* @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
protected JmsResourceProvider getJmsResourceProvider() {
|
protected JmsResourceProvider getJmsResourceProvider() {
|
||||||
JmsResourceProvider p = new JmsResourceProvider();
|
JmsResourceProvider p = new JmsResourceProvider();
|
||||||
p.setTopic(false);
|
p.setTopic(false);
|
||||||
|
|
|
@ -139,6 +139,7 @@ public class TopicClusterTest extends OpenwireArtemisBaseTest implements Message
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onMessage(Message msg) {
|
public void onMessage(Message msg) {
|
||||||
// log.info("GOT: " + msg);
|
// log.info("GOT: " + msg);
|
||||||
receivedMessageCount.incrementAndGet();
|
receivedMessageCount.incrementAndGet();
|
||||||
|
|
|
@ -75,6 +75,7 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis
|
||||||
final CountDownLatch starter = new CountDownLatch(1);
|
final CountDownLatch starter = new CountDownLatch(1);
|
||||||
final AtomicBoolean restarted = new AtomicBoolean();
|
final AtomicBoolean restarted = new AtomicBoolean();
|
||||||
new Thread(new Runnable() {
|
new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
starter.await();
|
starter.await();
|
||||||
|
@ -130,6 +131,7 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis
|
||||||
final CountDownLatch starter = new CountDownLatch(1);
|
final CountDownLatch starter = new CountDownLatch(1);
|
||||||
final AtomicBoolean restarted = new AtomicBoolean();
|
final AtomicBoolean restarted = new AtomicBoolean();
|
||||||
new Thread(new Runnable() {
|
new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
starter.await();
|
starter.await();
|
||||||
|
@ -375,6 +377,7 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onException(JMSException exception) {
|
public void onException(JMSException exception) {
|
||||||
this.exception = exception;
|
this.exception = exception;
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,7 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
|
||||||
doByteman.set(true);
|
doByteman.set(true);
|
||||||
testConsumer.setMessageListener(new MessageListener() {
|
testConsumer.setMessageListener(new MessageListener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
LOG.info("consume one and commit");
|
LOG.info("consume one and commit");
|
||||||
|
|
||||||
|
@ -136,6 +137,7 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
|
||||||
|
|
||||||
// may block if broker shutodwn happens quickly
|
// may block if broker shutodwn happens quickly
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("producer started");
|
LOG.info("producer started");
|
||||||
try {
|
try {
|
||||||
|
@ -231,6 +233,7 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
|
||||||
doByteman.set(true);
|
doByteman.set(true);
|
||||||
testConsumer.setMessageListener(new MessageListener() {
|
testConsumer.setMessageListener(new MessageListener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
LOG.info("consume one: " + message);
|
LOG.info("consume one: " + message);
|
||||||
assertNotNull("got message", message);
|
assertNotNull("got message", message);
|
||||||
|
@ -253,6 +256,7 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
|
||||||
|
|
||||||
// may block if broker shutdown happens quickly
|
// may block if broker shutdown happens quickly
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("producer started");
|
LOG.info("producer started");
|
||||||
try {
|
try {
|
||||||
|
@ -364,6 +368,7 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
|
||||||
public static void stopServerInTransaction() {
|
public static void stopServerInTransaction() {
|
||||||
if (doByteman.get()) {
|
if (doByteman.get()) {
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Stopping broker in transaction...");
|
LOG.info("Stopping broker in transaction...");
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -167,6 +167,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
produceMessage(consumerSession, destination, maxConsumers * prefetch);
|
produceMessage(consumerSession, destination, maxConsumers * prefetch);
|
||||||
|
|
||||||
assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() {
|
assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
int totalDelivered = 0;
|
int totalDelivered = 0;
|
||||||
for (TestConsumer testConsumer : testConsumers) {
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
@ -181,6 +182,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
|
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
|
||||||
|
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
LOG.info("add last consumer...");
|
LOG.info("add last consumer...");
|
||||||
|
@ -216,6 +218,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
|
|
||||||
// each should again get prefetch messages - all unacked deliveries should be rolledback
|
// each should again get prefetch messages - all unacked deliveries should be rolledback
|
||||||
assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
|
assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
int totalDelivered = 0;
|
int totalDelivered = 0;
|
||||||
for (TestConsumer testConsumer : testConsumers) {
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
@ -259,6 +262,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
produceMessage(consumerSession, destination, maxConsumers * prefetch);
|
produceMessage(consumerSession, destination, maxConsumers * prefetch);
|
||||||
|
|
||||||
assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() {
|
assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
int totalUnconsumed = 0;
|
int totalUnconsumed = 0;
|
||||||
for (TestConsumer testConsumer : testConsumers) {
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
@ -273,6 +277,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
|
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
|
||||||
|
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
LOG.info("add last consumer...");
|
LOG.info("add last consumer...");
|
||||||
|
@ -288,6 +293,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
|
|
||||||
// verify interrupt
|
// verify interrupt
|
||||||
assertTrue("add messages dispatched and unconsumed are cleaned up", Wait.waitFor(new Wait.Condition() {
|
assertTrue("add messages dispatched and unconsumed are cleaned up", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
int totalUnconsumed = 0;
|
int totalUnconsumed = 0;
|
||||||
for (TestConsumer testConsumer : testConsumers) {
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
@ -309,6 +315,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
|
|
||||||
// each should again get prefetch messages - all unconsumed deliveries should be rolledback
|
// each should again get prefetch messages - all unconsumed deliveries should be rolledback
|
||||||
assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
|
assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
int totalUnconsumed = 0;
|
int totalUnconsumed = 0;
|
||||||
for (TestConsumer testConsumer : testConsumers) {
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
@ -365,6 +372,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
if (consumerCount.incrementAndGet() == maxConsumers) {
|
if (consumerCount.incrementAndGet() == maxConsumers) {
|
||||||
context.getContext().setDontSendReponse(true);
|
context.getContext().setDontSendReponse(true);
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
broker.stop();
|
broker.stop();
|
||||||
|
@ -384,6 +392,7 @@ public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
|
||||||
if (consumerCount.incrementAndGet() == maxConsumers + (watchTopicAdvisories.get() ? 1 : 0)) {
|
if (consumerCount.incrementAndGet() == maxConsumers + (watchTopicAdvisories.get() ? 1 : 0)) {
|
||||||
context.getContext().setDontSendReponse(true);
|
context.getContext().setDontSendReponse(true);
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
broker.stop();
|
broker.stop();
|
||||||
|
|
|
@ -102,6 +102,7 @@ public class FailoverPrefetchZeroTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch receiveDone = new CountDownLatch(1);
|
final CountDownLatch receiveDone = new CountDownLatch(1);
|
||||||
final Vector<Message> received = new Vector<>();
|
final Vector<Message> received = new Vector<>();
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
LOG.info("receive one...");
|
LOG.info("receive one...");
|
||||||
|
@ -147,6 +148,7 @@ public class FailoverPrefetchZeroTest extends OpenwireArtemisBaseTest {
|
||||||
context.getContext().setDontSendReponse(true);
|
context.getContext().setDontSendReponse(true);
|
||||||
pullDone.countDown();
|
pullDone.countDown();
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
broker.stop();
|
broker.stop();
|
||||||
|
|
|
@ -165,6 +165,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
|
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
|
||||||
// broker will die on commit reply so this will hang till restart
|
// broker will die on commit reply so this will hang till restart
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("doing async commit...");
|
LOG.info("doing async commit...");
|
||||||
try {
|
try {
|
||||||
|
@ -259,6 +260,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
|
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
|
||||||
// broker will die on send reply so this will hang till restart
|
// broker will die on send reply so this will hang till restart
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("doing async send...");
|
LOG.info("doing async send...");
|
||||||
try {
|
try {
|
||||||
|
@ -348,6 +350,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
|
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
|
||||||
// proxy connection will die on send reply so this will hang on failover reconnect till open
|
// proxy connection will die on send reply so this will hang on failover reconnect till open
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("doing async send...");
|
LOG.info("doing async send...");
|
||||||
try {
|
try {
|
||||||
|
@ -476,12 +479,15 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
|
|
||||||
final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
|
connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
|
||||||
|
@Override
|
||||||
public ServerSession getServerSession() throws JMSException {
|
public ServerSession getServerSession() throws JMSException {
|
||||||
return new ServerSession() {
|
return new ServerSession() {
|
||||||
|
@Override
|
||||||
public Session getSession() throws JMSException {
|
public Session getSession() throws JMSException {
|
||||||
return poolSession;
|
return poolSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void start() throws JMSException {
|
public void start() throws JMSException {
|
||||||
connectionConsumerGotOne.countDown();
|
connectionConsumerGotOne.countDown();
|
||||||
poolSession.run();
|
poolSession.run();
|
||||||
|
@ -584,6 +590,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
|
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
|
||||||
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
|
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
|
||||||
Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
|
Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("doing async commit after consume...");
|
LOG.info("doing async commit after consume...");
|
||||||
try {
|
try {
|
||||||
|
@ -774,6 +781,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
|
for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
|
||||||
|
|
||||||
executorService.execute(new Runnable() {
|
executorService.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
MessageConsumer localConsumer = null;
|
MessageConsumer localConsumer = null;
|
||||||
try {
|
try {
|
||||||
|
@ -915,6 +923,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
final CountDownLatch commitDone = new CountDownLatch(1);
|
final CountDownLatch commitDone = new CountDownLatch(1);
|
||||||
// will block pending re-deliveries
|
// will block pending re-deliveries
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("doing async commit...");
|
LOG.info("doing async commit...");
|
||||||
try {
|
try {
|
||||||
|
@ -970,6 +979,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
|
|
||||||
// commit may fail if other consumer gets the message on restart
|
// commit may fail if other consumer gets the message on restart
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("doing async commit...");
|
LOG.info("doing async commit...");
|
||||||
try {
|
try {
|
||||||
|
@ -1017,6 +1027,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
if (doByteman.get()) {
|
if (doByteman.get()) {
|
||||||
context.getContext().setDontSendReponse(true);
|
context.getContext().setDontSendReponse(true);
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Stopping broker post commit...");
|
LOG.info("Stopping broker post commit...");
|
||||||
try {
|
try {
|
||||||
|
@ -1040,6 +1051,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
firstSend = false;
|
firstSend = false;
|
||||||
context.getContext().setDontSendReponse(true);
|
context.getContext().setDontSendReponse(true);
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Stopping connection post send...");
|
LOG.info("Stopping connection post send...");
|
||||||
try {
|
try {
|
||||||
|
@ -1060,6 +1072,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
|
||||||
if (count++ == 1) {
|
if (count++ == 1) {
|
||||||
LOG.info("ok stop broker...");
|
LOG.info("ok stop broker...");
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
if (broker != null) {
|
if (broker != null) {
|
||||||
|
|
|
@ -219,6 +219,7 @@ public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest {
|
||||||
// Slip in a new transport filter after the MockTransport
|
// Slip in a new transport filter after the MockTransport
|
||||||
MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
|
MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
|
||||||
mt.install(new TransportFilter(mt.getNext()) {
|
mt.install(new TransportFilter(mt.getNext()) {
|
||||||
|
@Override
|
||||||
public void oneway(Object command) throws IOException {
|
public void oneway(Object command) throws IOException {
|
||||||
LOG.info("Dropping: " + command);
|
LOG.info("Dropping: " + command);
|
||||||
// just eat it! to simulate a recent failure.
|
// just eat it! to simulate a recent failure.
|
||||||
|
@ -227,6 +228,7 @@ public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest {
|
||||||
|
|
||||||
// Send a message (async) as this will block
|
// Send a message (async) as this will block
|
||||||
new Thread() {
|
new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
// Send the message using the fail over publisher.
|
// Send the message using the fail over publisher.
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -305,6 +305,7 @@ public class SocketProxy {
|
||||||
pause.get().countDown();
|
pause.get().countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
byte[] buf = new byte[1024];
|
byte[] buf = new byte[1024];
|
||||||
try {
|
try {
|
||||||
|
@ -360,6 +361,7 @@ public class SocketProxy {
|
||||||
pause.get().countDown();
|
pause.get().countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
while(!socket.isClosed()) {
|
while(!socket.isClosed()) {
|
||||||
|
|
|
@ -188,6 +188,7 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
|
||||||
final MapMessage message = createLargeMessage();
|
final MapMessage message = createLargeMessage();
|
||||||
|
|
||||||
t = new Thread() {
|
t = new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
|
@ -170,6 +170,7 @@ public class RaceOnSyncLargeMessageOverReplicationTest extends ActiveMQTestBase
|
||||||
final MapMessage message = createLargeMessage();
|
final MapMessage message = createLargeMessage();
|
||||||
|
|
||||||
t = new Thread() {
|
t = new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
|
@ -301,6 +301,7 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isFull() {
|
public boolean isFull() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue