Fix test failure in CI

Has race condition on the ArrayList it uses to track subs and 
Fix the unreliable sleep used to track locked messages in subs
Ensure Broker is shut down on test completion.
This commit is contained in:
Timothy Bish 2016-06-16 12:26:50 -04:00
parent 5ba8679083
commit 9ac5f83473
1 changed files with 67 additions and 74 deletions

View File

@ -1,20 +1,4 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -32,14 +16,19 @@
*/
package org.apache.activemq.broker.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
@ -61,28 +50,31 @@ import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import junit.framework.TestCase;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class SubscriptionAddRemoveQueueTest extends TestCase {
public class SubscriptionAddRemoveQueueTest {
Queue queue;
private BrokerService brokerService;
private Queue queue;
private ConsumerInfo info = new ConsumerInfo();
private List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
private ConnectionContext context = new ConnectionContext();
private ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
private ProducerInfo producerInfo = new ProducerInfo();
private ProducerState producerState = new ProducerState(producerInfo);
private ActiveMQDestination destination = new ActiveMQQueue("TEST");
private int numSubscriptions = 1000;
private boolean working = true;
private int senders = 20;
ConsumerInfo info = new ConsumerInfo();
List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
ConnectionContext context = new ConnectionContext();
ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
ProducerInfo producerInfo = new ProducerInfo();
ProducerState producerState = new ProducerState(producerInfo);
ActiveMQDestination destination = new ActiveMQQueue("TEST");
int numSubscriptions = 1000;
boolean working = true;
int senders = 20;
@Override
@Before
public void setUp() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService = new BrokerService();
brokerService.start();
DestinationStatistics parentStats = new DestinationStatistics();
parentStats.setEnabled(true);
@ -99,6 +91,15 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
queue.initialize();
}
@After
public void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
}
}
@Test(timeout = 120000)
public void testNoDispatchToRemovedConsumers() throws Exception {
final AtomicInteger producerId = new AtomicInteger();
Runnable sender = new Runnable() {
@ -134,21 +135,34 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
}
};
for (int i=0;i<numSubscriptions; i++) {
for (int i = 0; i < numSubscriptions; i++) {
SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription();
subs.add(sub);
queue.addSubscription(context, sub);
}
assertEquals("there are X subscriptions", numSubscriptions, queue.getDestinationStatistics().getConsumers().getCount());
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i<senders ; i++) {
for (int i = 0; i < senders; i++) {
executor.submit(sender);
}
Thread.sleep(1000);
for (SimpleImmediateDispatchSubscription sub : subs) {
assertTrue("There are some locked messages in the subscription", hasSomeLocks(sub.dispatched));
}
assertTrue("All subs should have some locks", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
boolean allHaveLocks = true;
for (SimpleImmediateDispatchSubscription sub : subs) {
if (!hasSomeLocks(sub.dispatched)) {
allHaveLocks = false;
break;
}
}
return allHaveLocks;
}
}));
Future<?> result = executor.submit(subRemover);
result.get();
@ -158,12 +172,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
for (SimpleImmediateDispatchSubscription sub : subs) {
assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched));
}
}
private boolean hasSomeLocks(List<MessageReference> dispatched) {
boolean hasLock = false;
for (MessageReference mr: dispatched) {
for (MessageReference mr : dispatched) {
QueueMessageReference qmr = (QueueMessageReference) mr;
if (qmr.getLockOwner() != null) {
hasLock = true;
@ -173,21 +186,19 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return hasLock;
}
public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
private class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
List<MessageReference> dispatched =
Collections.synchronizedList(new ArrayList<MessageReference>());
List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
@Override
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
}
@Override
public void add(MessageReference node) throws Exception {
// immediate dispatch
QueueMessageReference qmr = (QueueMessageReference)node;
QueueMessageReference qmr = (QueueMessageReference) node;
qmr.lock(this);
dispatched.add(qmr);
}
@ -234,8 +245,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
}
@Override
public void add(ConnectionContext context, Destination destination)
throws Exception {
public void add(ConnectionContext context, Destination destination) throws Exception {
}
@Override
@ -331,13 +341,8 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return false;
}
public boolean isSlave() {
return false;
}
@Override
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
return true;
}
@ -347,13 +352,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
}
@Override
public void processMessageDispatchNotification(
MessageDispatchNotification mdn) throws Exception {
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
}
@Override
public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
return null;
}
@ -363,8 +366,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
}
@Override
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
return new ArrayList<MessageReference>(dispatched);
}
@ -373,8 +375,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
}
@Override
public void setSelector(String selector)
throws InvalidSelectorException, UnsupportedOperationException {
public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
}
@Override
@ -382,8 +383,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
}
@Override
public boolean addRecoveredMessage(ConnectionContext context,
MessageReference message) throws Exception {
public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
return false;
}
@ -402,12 +402,6 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return false;
}
public void addDestination(Destination destination) {
}
public void removeDestination(Destination destination) {
}
@Override
public int countBeforeFull() {
return 10;
@ -422,6 +416,5 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}
}
}