This commit is contained in:
Justin Bertram 2018-02-14 10:07:41 -06:00
commit 646e555148
7 changed files with 58 additions and 55 deletions

View File

@ -67,6 +67,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
import org.jboss.logging.Logger;
@ -199,6 +200,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private Executor startExecutor;
private Actor<Long> updateArrayActor;
private AfterConnectInternalListener afterConnectListener;
private String groupID;
@ -251,6 +254,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
}
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
}
@Override
@ -534,6 +539,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private TransportConfiguration selectConnector() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
flushTopology();
synchronized (topologyArrayGuard) {
usedTopology = topologyArray;
}
@ -743,6 +750,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
initialise();
flushTopology();
if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) {
// Wait for an initial broadcast to give us at least one node in the cluster
long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
@ -812,6 +821,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return factory;
}
public void flushTopology() {
if (updateArrayActor != null) {
updateArrayActor.flush(10, TimeUnit.SECONDS);
}
}
@Override
public boolean isHA() {
return ha;
@ -1426,14 +1441,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
topology.removeMember(eventTime, nodeID);
if (clusterConnection) {
updateArraysAndPairs();
updateArraysAndPairs(eventTime);
} else {
if (topology.isEmpty()) {
// Resetting the topology to its original condition as it was brand new
receivedTopology = false;
topologyArray = null;
} else {
updateArraysAndPairs();
updateArraysAndPairs(eventTime);
if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) {
// Resetting the topology to its original condition as it was brand new
@ -1472,7 +1487,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
updateArraysAndPairs();
updateArraysAndPairs(uniqueEventID);
if (last) {
receivedTopology = true;
@ -1496,7 +1511,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
@SuppressWarnings("unchecked")
private void updateArraysAndPairs() {
private void updateArraysAndPairs(long time) {
if (updateArrayActor == null) {
// if for some reason we don't have an actor, just go straight
internalUpdateArray(time);
} else {
updateArrayActor.act(time);
}
}
private void internalUpdateArray(long time) {
synchronized (topologyArrayGuard) {
Collection<TopologyMemberImpl> membersCopy = topology.getMembers();
@ -1506,7 +1530,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
for (TopologyMemberImpl pair : membersCopy) {
Pair<TransportConfiguration, TransportConfiguration> transportConfigs = pair.getConnector();
topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()),
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
}
this.topologyArray = topologyArrayLocal;

View File

@ -369,6 +369,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public Message copy() {
checkProperties();
checkEncode();
return new CoreMessage(this);
}
@ -936,8 +937,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public CoreMessage putObjectProperty(final SimpleString key,
final Object value) throws ActiveMQPropertyConversionException {
messageChanged();
checkProperties();
messageChanged();
TypedProperties.setObjectProperty(key, value, properties);
return this;
}

View File

@ -18,8 +18,8 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -47,9 +47,9 @@ public class MQTTConnection implements RemotingConnection {
private String clientID;
private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>());
private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>());
private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
public MQTTConnection(Connection transportConnection) throws Exception {
this.transportConnection = transportConnection;
@ -100,15 +100,14 @@ public class MQTTConnection implements RemotingConnection {
@Override
public List<CloseListener> removeCloseListeners() {
synchronized (closeListeners) {
List<CloseListener> deletedCloseListeners = new ArrayList<>(closeListeners);
closeListeners.clear();
return deletedCloseListeners;
}
List<CloseListener> deletedCloseListeners = copyCloseListeners();
closeListeners.clear();
return deletedCloseListeners;
}
@Override
public void setCloseListeners(List<CloseListener> listeners) {
closeListeners.clear();
closeListeners.addAll(listeners);
}
@ -119,19 +118,15 @@ public class MQTTConnection implements RemotingConnection {
@Override
public List<FailureListener> removeFailureListeners() {
synchronized (failureListeners) {
List<FailureListener> deletedFailureListeners = new ArrayList<>(failureListeners);
failureListeners.clear();
return deletedFailureListeners;
}
List<FailureListener> deletedFailureListeners = copyFailureListeners();
failureListeners.clear();
return deletedFailureListeners;
}
@Override
public void setFailureListeners(List<FailureListener> listeners) {
synchronized (failureListeners) {
failureListeners.clear();
failureListeners.addAll(listeners);
}
failureListeners.clear();
failureListeners.addAll(listeners);
}
@Override
@ -141,13 +136,20 @@ public class MQTTConnection implements RemotingConnection {
@Override
public void fail(ActiveMQException me) {
synchronized (failureListeners) {
for (FailureListener listener : failureListeners) {
listener.connectionFailed(me, false);
}
List<FailureListener> copy = copyFailureListeners();
for (FailureListener listener : copy) {
listener.connectionFailed(me, false);
}
}
private List<FailureListener> copyFailureListeners() {
return new ArrayList<>(failureListeners);
}
private List<CloseListener> copyCloseListeners() {
return new ArrayList<>(closeListeners);
}
@Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
synchronized (failureListeners) {

View File

@ -362,12 +362,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
}
@Override
protected void finalize() throws Throwable {
releaseResources();
super.finalize();
}
// Private -------------------------------------------------------
public synchronized void validateFile() throws ActiveMQException {

View File

@ -679,17 +679,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
@Override
protected final void finalize() throws Throwable {
if (state != SERVER_STATE.STOPPED) {
ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped();
stop();
}
super.finalize();
}
@Override
public void setState(SERVER_STATE state) {
this.state = state;

View File

@ -435,6 +435,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
coreAddressConfiguration.setName(TOPIC);
CoreQueueConfiguration coreQueueConfiguration = new CoreQueueConfiguration();
coreQueueConfiguration.setName(TOPIC);
coreQueueConfiguration.setAddress(TOPIC);
coreQueueConfiguration.setRoutingType(RoutingType.ANYCAST);
coreAddressConfiguration.addQueueConfiguration(coreQueueConfiguration);
return coreAddressConfiguration;

View File

@ -835,7 +835,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = session.createMessage(true);
if (i < 1000) {
message.setExpiration(System.currentTimeMillis() + 1000);
message.setExpiration(System.currentTimeMillis() + 100);
}
message.putIntProperty("tst-count", i);
@ -852,12 +852,7 @@ public class PagingTest extends ActiveMQTestBase {
session.commit();
producer.close();
for (long timeout = System.currentTimeMillis() + 60000; timeout > System.currentTimeMillis() && getMessageCount(qEXP) < 1000; ) {
System.out.println("count = " + getMessageCount(qEXP));
Thread.sleep(100);
}
assertEquals(1000, getMessageCount(qEXP));
Wait.assertEquals(1000, qEXP::getMessageCount);
session.start();
@ -874,10 +869,7 @@ public class PagingTest extends ActiveMQTestBase {
assertNull(consumer.receiveImmediate());
for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(queue1) != 0; ) {
Thread.sleep(100);
}
assertEquals(0, getMessageCount(queue1));
Wait.assertEquals(0, queue1::getMessageCount);
consumer.close();