This closes #262 on serialization changes over topology
This commit is contained in:
commit
fe093fcce3
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.client.impl;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -32,14 +31,12 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
|||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connector;
|
||||
|
||||
public final class Topology implements Serializable
|
||||
public final class Topology
|
||||
{
|
||||
|
||||
private static final long serialVersionUID = -9037171688692471371L;
|
||||
private final Set<ClusterTopologyListener> topologyListeners;
|
||||
|
||||
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
|
||||
|
||||
private transient Executor executor = null;
|
||||
private final Executor executor;
|
||||
|
||||
/**
|
||||
* Used to debug operations.
|
||||
|
@ -56,12 +53,31 @@ public final class Topology implements Serializable
|
|||
* keys are node IDs
|
||||
* values are a pair of live/backup transport configurations
|
||||
*/
|
||||
private final Map<String, TopologyMemberImpl> topology = new ConcurrentHashMap<String, TopologyMemberImpl>();
|
||||
private final Map<String, TopologyMemberImpl> topology;
|
||||
|
||||
private transient Map<String, Long> mapDelete;
|
||||
private Map<String, Long> mapDelete;
|
||||
|
||||
private static final class DirectExecutor implements Executor
|
||||
{
|
||||
public void execute(final Runnable runnable)
|
||||
{
|
||||
runnable.run();
|
||||
}
|
||||
}
|
||||
public Topology(final Object owner)
|
||||
{
|
||||
this(owner, new DirectExecutor());
|
||||
}
|
||||
|
||||
public Topology(final Object owner, final Executor executor)
|
||||
{
|
||||
this.topologyListeners = new HashSet<>();
|
||||
this.topology = new ConcurrentHashMap<>();
|
||||
if (executor == null)
|
||||
{
|
||||
throw new IllegalArgumentException("Executor is required");
|
||||
}
|
||||
this.executor = executor;
|
||||
this.owner = owner;
|
||||
if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
|
||||
{
|
||||
|
@ -70,11 +86,6 @@ public final class Topology implements Serializable
|
|||
}
|
||||
}
|
||||
|
||||
public void setExecutor(final Executor executor)
|
||||
{
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* It will remove all elements as if it haven't received anyone from the server.
|
||||
*/
|
||||
|
@ -247,7 +258,7 @@ public final class Topology implements Serializable
|
|||
return true;
|
||||
}
|
||||
/*
|
||||
* always add the backup, better to try to reconnect to something thats not there then to
|
||||
* always add the backup, better to try to reconnect to something that's not there then to
|
||||
* not know about it at all
|
||||
*/
|
||||
if (currentMember.getBackup() == null && memberInput.getBackup() != null)
|
||||
|
@ -273,7 +284,7 @@ public final class Topology implements Serializable
|
|||
|
||||
if (copy.size() > 0)
|
||||
{
|
||||
execute(new Runnable()
|
||||
executor.execute(new Runnable()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
|
@ -311,7 +322,7 @@ public final class Topology implements Serializable
|
|||
ArrayList<ClusterTopologyListener> listenersCopy;
|
||||
synchronized (topologyListeners)
|
||||
{
|
||||
listenersCopy = new ArrayList<ClusterTopologyListener>(topologyListeners);
|
||||
listenersCopy = new ArrayList<>(topologyListeners);
|
||||
}
|
||||
return listenersCopy;
|
||||
}
|
||||
|
@ -353,7 +364,7 @@ public final class Topology implements Serializable
|
|||
{
|
||||
final ArrayList<ClusterTopologyListener> copy = copyListeners();
|
||||
|
||||
execute(new Runnable()
|
||||
executor.execute(new Runnable()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
|
@ -374,23 +385,10 @@ public final class Topology implements Serializable
|
|||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
return member != null;
|
||||
}
|
||||
|
||||
private void execute(final Runnable runnable)
|
||||
{
|
||||
if (executor != null)
|
||||
{
|
||||
executor.execute(runnable);
|
||||
}
|
||||
else
|
||||
{
|
||||
runnable.run();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void sendTopology(final ClusterTopologyListener listener)
|
||||
{
|
||||
if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
|
||||
|
@ -398,7 +396,7 @@ public final class Topology implements Serializable
|
|||
ActiveMQClientLogger.LOGGER.debug(this + " is sending topology to " + listener);
|
||||
}
|
||||
|
||||
execute(new Runnable()
|
||||
executor.execute(new Runnable()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
|
@ -456,7 +454,7 @@ public final class Topology implements Serializable
|
|||
ArrayList<TopologyMemberImpl> members;
|
||||
synchronized (this)
|
||||
{
|
||||
members = new ArrayList<TopologyMemberImpl>(topology.values());
|
||||
members = new ArrayList<>(topology.values());
|
||||
}
|
||||
return members;
|
||||
}
|
||||
|
@ -486,11 +484,11 @@ public final class Topology implements Serializable
|
|||
private synchronized String describe(final String text)
|
||||
{
|
||||
StringBuilder desc = new StringBuilder(text + "topology on " + this + ":\n");
|
||||
for (Entry<String, TopologyMemberImpl> entry : new HashMap<String, TopologyMemberImpl>(topology).entrySet())
|
||||
for (Entry<String, TopologyMemberImpl> entry : new HashMap<>(topology).entrySet())
|
||||
{
|
||||
desc.append("\t" + entry.getKey() + " => " + entry.getValue() + "\n");
|
||||
desc.append("\t").append(entry.getKey()).append(" => ").append(entry.getValue()).append("\n");
|
||||
}
|
||||
desc.append("\t" + "nodes=" + nodes() + "\t" + "members=" + members());
|
||||
desc.append("\t" + "nodes=").append(nodes()).append("\t").append("members=").append(members());
|
||||
if (topology.isEmpty())
|
||||
{
|
||||
desc.append("\tEmpty");
|
||||
|
@ -537,7 +535,7 @@ public final class Topology implements Serializable
|
|||
{
|
||||
if (mapDelete == null)
|
||||
{
|
||||
mapDelete = new ConcurrentHashMap<String, Long>();
|
||||
mapDelete = new ConcurrentHashMap<>();
|
||||
}
|
||||
return mapDelete;
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
// Stuff that used to be on the ClusterManager
|
||||
|
||||
private final Topology topology = new Topology(this);
|
||||
private final Topology topology;
|
||||
|
||||
private volatile boolean stopping = false;
|
||||
|
||||
|
@ -228,7 +228,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
this.executor = executorFactory.getExecutor();
|
||||
|
||||
this.topology.setExecutor(executor);
|
||||
this.topology = new Topology(this, executor);
|
||||
|
||||
this.server = server;
|
||||
|
||||
|
@ -341,7 +341,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
this.executor = executorFactory.getExecutor();
|
||||
|
||||
this.topology.setExecutor(executor);
|
||||
this.topology = new Topology(this, executor);
|
||||
|
||||
this.server = server;
|
||||
|
||||
|
|
Loading…
Reference in New Issue