NO-JIRA Test fixes

- LargeServerMessageImpl.finalize is eventually causing deadlocks
- CoreMessage needs to check properties before decoding
- PagingTest tweaks
- ServerLocatorImpl can deadlock eventually, avoiding a lock and using actors
- ActiveMQServerImpl.finalize is also evil and can cause deadlocks on the testsuite
- MqttClusterRemoteSubscribeTest needs to setup the Address now on the setup
This commit is contained in:
Clebert Suconic 2018-02-14 10:15:01 -05:00
parent f91432eecc
commit b5bf5afde7
6 changed files with 35 additions and 34 deletions

View File

@ -67,6 +67,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
import org.jboss.logging.Logger;
@ -199,6 +200,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private Executor startExecutor;
private Actor<Long> updateArrayActor;
private AfterConnectInternalListener afterConnectListener;
private String groupID;
@ -251,6 +254,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
}
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
}
@Override
@ -534,6 +539,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private TransportConfiguration selectConnector() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
flushTopology();
synchronized (topologyArrayGuard) {
usedTopology = topologyArray;
}
@ -743,6 +750,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
initialise();
flushTopology();
if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) {
// Wait for an initial broadcast to give us at least one node in the cluster
long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
@ -812,6 +821,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return factory;
}
public void flushTopology() {
if (updateArrayActor != null) {
updateArrayActor.flush(10, TimeUnit.SECONDS);
}
}
@Override
public boolean isHA() {
return ha;
@ -1426,14 +1441,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
topology.removeMember(eventTime, nodeID);
if (clusterConnection) {
updateArraysAndPairs();
updateArraysAndPairs(eventTime);
} else {
if (topology.isEmpty()) {
// Resetting the topology to its original condition as it was brand new
receivedTopology = false;
topologyArray = null;
} else {
updateArraysAndPairs();
updateArraysAndPairs(eventTime);
if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) {
// Resetting the topology to its original condition as it was brand new
@ -1472,7 +1487,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
updateArraysAndPairs();
updateArraysAndPairs(uniqueEventID);
if (last) {
receivedTopology = true;
@ -1496,7 +1511,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
@SuppressWarnings("unchecked")
private void updateArraysAndPairs() {
private void updateArraysAndPairs(long time) {
if (updateArrayActor == null) {
// if for some reason we don't have an actor, just go straight
internalUpdateArray(time);
} else {
updateArrayActor.act(time);
}
}
private void internalUpdateArray(long time) {
synchronized (topologyArrayGuard) {
Collection<TopologyMemberImpl> membersCopy = topology.getMembers();
@ -1506,7 +1530,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
for (TopologyMemberImpl pair : membersCopy) {
Pair<TransportConfiguration, TransportConfiguration> transportConfigs = pair.getConnector();
topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()),
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
}
this.topologyArray = topologyArrayLocal;

View File

@ -369,6 +369,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public Message copy() {
checkProperties();
checkEncode();
return new CoreMessage(this);
}
@ -936,8 +937,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public CoreMessage putObjectProperty(final SimpleString key,
final Object value) throws ActiveMQPropertyConversionException {
messageChanged();
checkProperties();
messageChanged();
TypedProperties.setObjectProperty(key, value, properties);
return this;
}

View File

@ -362,12 +362,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
}
@Override
protected void finalize() throws Throwable {
releaseResources();
super.finalize();
}
// Private -------------------------------------------------------
public synchronized void validateFile() throws ActiveMQException {

View File

@ -679,17 +679,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
@Override
protected final void finalize() throws Throwable {
if (state != SERVER_STATE.STOPPED) {
ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped();
stop();
}
super.finalize();
}
@Override
public void setState(SERVER_STATE state) {
this.state = state;

View File

@ -435,6 +435,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
coreAddressConfiguration.setName(TOPIC);
CoreQueueConfiguration coreQueueConfiguration = new CoreQueueConfiguration();
coreQueueConfiguration.setName(TOPIC);
coreQueueConfiguration.setAddress(TOPIC);
coreQueueConfiguration.setRoutingType(RoutingType.ANYCAST);
coreAddressConfiguration.addQueueConfiguration(coreQueueConfiguration);
return coreAddressConfiguration;

View File

@ -835,7 +835,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = session.createMessage(true);
if (i < 1000) {
message.setExpiration(System.currentTimeMillis() + 1000);
message.setExpiration(System.currentTimeMillis() + 100);
}
message.putIntProperty("tst-count", i);
@ -852,12 +852,7 @@ public class PagingTest extends ActiveMQTestBase {
session.commit();
producer.close();
for (long timeout = System.currentTimeMillis() + 60000; timeout > System.currentTimeMillis() && getMessageCount(qEXP) < 1000; ) {
System.out.println("count = " + getMessageCount(qEXP));
Thread.sleep(100);
}
assertEquals(1000, getMessageCount(qEXP));
Wait.assertEquals(1000, qEXP::getMessageCount);
session.start();
@ -874,10 +869,7 @@ public class PagingTest extends ActiveMQTestBase {
assertNull(consumer.receiveImmediate());
for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(queue1) != 0; ) {
Thread.sleep(100);
}
assertEquals(0, getMessageCount(queue1));
Wait.assertEquals(0, queue1::getMessageCount);
consumer.close();