Topology executor should be final
This commit is contained in:
parent
59e4f34415
commit
3c6e213f66
|
@ -34,10 +34,9 @@ import org.apache.activemq.artemis.spi.core.remoting.Connector;
|
||||||
public final class Topology
|
public final class Topology
|
||||||
{
|
{
|
||||||
|
|
||||||
|
private final Set<ClusterTopologyListener> topologyListeners;
|
||||||
|
|
||||||
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
|
private final Executor executor;
|
||||||
|
|
||||||
private final Executor executor = null;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to debug operations.
|
* Used to debug operations.
|
||||||
|
@ -54,12 +53,31 @@ public final class Topology
|
||||||
* keys are node IDs
|
* keys are node IDs
|
||||||
* values are a pair of live/backup transport configurations
|
* values are a pair of live/backup transport configurations
|
||||||
*/
|
*/
|
||||||
private final Map<String, TopologyMemberImpl> topology = new ConcurrentHashMap<String, TopologyMemberImpl>();
|
private final Map<String, TopologyMemberImpl> topology;
|
||||||
|
|
||||||
private Map<String, Long> mapDelete;
|
private Map<String, Long> mapDelete;
|
||||||
|
|
||||||
|
private static final class DirectExecutor implements Executor
|
||||||
|
{
|
||||||
|
public void execute(final Runnable runnable)
|
||||||
|
{
|
||||||
|
runnable.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
public Topology(final Object owner)
|
public Topology(final Object owner)
|
||||||
{
|
{
|
||||||
|
this(owner, new DirectExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Topology(final Object owner, final Executor executor)
|
||||||
|
{
|
||||||
|
this.topologyListeners = new HashSet<>();
|
||||||
|
this.topology = new ConcurrentHashMap<>();
|
||||||
|
if (executor == null)
|
||||||
|
{
|
||||||
|
throw new IllegalArgumentException("Executor is required");
|
||||||
|
}
|
||||||
|
this.executor = executor;
|
||||||
this.owner = owner;
|
this.owner = owner;
|
||||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
|
if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
|
||||||
{
|
{
|
||||||
|
@ -68,11 +86,6 @@ public final class Topology
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setExecutor(final Executor executor)
|
|
||||||
{
|
|
||||||
this.executor = executor;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* It will remove all elements as if it haven't received anyone from the server.
|
* It will remove all elements as if it haven't received anyone from the server.
|
||||||
*/
|
*/
|
||||||
|
@ -271,7 +284,7 @@ public final class Topology
|
||||||
|
|
||||||
if (copy.size() > 0)
|
if (copy.size() > 0)
|
||||||
{
|
{
|
||||||
execute(new Runnable()
|
executor.execute(new Runnable()
|
||||||
{
|
{
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
|
@ -351,7 +364,7 @@ public final class Topology
|
||||||
{
|
{
|
||||||
final ArrayList<ClusterTopologyListener> copy = copyListeners();
|
final ArrayList<ClusterTopologyListener> copy = copyListeners();
|
||||||
|
|
||||||
execute(new Runnable()
|
executor.execute(new Runnable()
|
||||||
{
|
{
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
|
@ -372,23 +385,10 @@ public final class Topology
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
return member != null;
|
return member != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void execute(final Runnable runnable)
|
|
||||||
{
|
|
||||||
if (executor != null)
|
|
||||||
{
|
|
||||||
executor.execute(runnable);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
runnable.run();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void sendTopology(final ClusterTopologyListener listener)
|
public synchronized void sendTopology(final ClusterTopologyListener listener)
|
||||||
{
|
{
|
||||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
|
if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
|
||||||
|
@ -396,7 +396,7 @@ public final class Topology
|
||||||
ActiveMQClientLogger.LOGGER.debug(this + " is sending topology to " + listener);
|
ActiveMQClientLogger.LOGGER.debug(this + " is sending topology to " + listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
execute(new Runnable()
|
executor.execute(new Runnable()
|
||||||
{
|
{
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
|
|
|
@ -151,7 +151,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
||||||
|
|
||||||
// Stuff that used to be on the ClusterManager
|
// Stuff that used to be on the ClusterManager
|
||||||
|
|
||||||
private final Topology topology = new Topology(this);
|
private final Topology topology;
|
||||||
|
|
||||||
private volatile boolean stopping = false;
|
private volatile boolean stopping = false;
|
||||||
|
|
||||||
|
@ -228,7 +228,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
||||||
|
|
||||||
this.executor = executorFactory.getExecutor();
|
this.executor = executorFactory.getExecutor();
|
||||||
|
|
||||||
this.topology.setExecutor(executor);
|
this.topology = new Topology(this, executor);
|
||||||
|
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
|
||||||
|
@ -341,7 +341,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
||||||
|
|
||||||
this.executor = executorFactory.getExecutor();
|
this.executor = executorFactory.getExecutor();
|
||||||
|
|
||||||
this.topology.setExecutor(executor);
|
this.topology = new Topology(this, executor);
|
||||||
|
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue