diff --git a/src/main/java/org/elasticsearch/common/network/MulticastChannel.java b/src/main/java/org/elasticsearch/common/network/MulticastChannel.java index 2d08cb9a781..1e7e9df640c 100644 --- a/src/main/java/org/elasticsearch/common/network/MulticastChannel.java +++ b/src/main/java/org/elasticsearch/common/network/MulticastChannel.java @@ -157,24 +157,25 @@ public abstract class MulticastChannel implements Closeable { * channel once their reference count has reached 0. It also handles de-registering relevant * listener from the shared list of listeners. */ - private static class Shared extends MulticastChannel { + private final static class Shared extends MulticastChannel { private static final Map sharedChannels = Maps.newHashMap(); private static final Object mutex = new Object(); // global mutex so we don't sync on static methods (.class) static MulticastChannel getSharedChannel(Listener listener, Config config) throws Exception { + synchronized (mutex) { Shared shared = sharedChannels.get(config); if (shared != null) { shared.incRef(); ((MultiListener) shared.listener).add(listener); - return new Delegate(listener, shared); + } else { + MultiListener multiListener = new MultiListener(); + multiListener.add(listener); + shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config)); + sharedChannels.put(config, shared); } - MultiListener multiListener = new MultiListener(); - multiListener.add(listener); - shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config)); - sharedChannels.put(config, shared); - return shared; + return new Delegate(listener, shared); } } @@ -184,6 +185,7 @@ public abstract class MulticastChannel implements Closeable { boolean removed = ((MultiListener) shared.listener).remove(listener); assert removed : "a listener should be removed"; if (shared.decRef() == 0) { + assert ((MultiListener) shared.listener).listeners.isEmpty(); sharedChannels.remove(shared.channel.getConfig()); shared.channel.close(); } @@ -213,6 +215,11 @@ public abstract class MulticastChannel implements Closeable { channel.send(data); } + @Override + public void close() { + assert false : "Shared references should never be closed directly, only via Delegate"; + } + @Override protected void close(Listener listener) { close(this, listener); @@ -223,7 +230,7 @@ public abstract class MulticastChannel implements Closeable { * A light weight delegate that wraps another channel, mainly to support delegating * the close method with the provided listener and not holding existing listener. */ - private static class Delegate extends MulticastChannel { + private final static class Delegate extends MulticastChannel { private final MulticastChannel channel;