MulticastChannel returned wrong channel in shared mode

If the shared channel is used a wrong refrence was returned and
close calls couldn't find the listener since it go never registered
in that instance.
This commit is contained in:
Simon Willnauer 2014-03-16 18:19:12 +01:00
parent 43617cf5dc
commit ba8e34755f
1 changed files with 15 additions and 8 deletions

View File

@ -157,24 +157,25 @@ public abstract class MulticastChannel implements Closeable {
* channel once their reference count has reached 0. It also handles de-registering relevant * channel once their reference count has reached 0. It also handles de-registering relevant
* listener from the shared list of listeners. * listener from the shared list of listeners.
*/ */
private static class Shared extends MulticastChannel { private final static class Shared extends MulticastChannel {
private static final Map<Config, Shared> sharedChannels = Maps.newHashMap(); private static final Map<Config, Shared> sharedChannels = Maps.newHashMap();
private static final Object mutex = new Object(); // global mutex so we don't sync on static methods (.class) private static final Object mutex = new Object(); // global mutex so we don't sync on static methods (.class)
static MulticastChannel getSharedChannel(Listener listener, Config config) throws Exception { static MulticastChannel getSharedChannel(Listener listener, Config config) throws Exception {
synchronized (mutex) { synchronized (mutex) {
Shared shared = sharedChannels.get(config); Shared shared = sharedChannels.get(config);
if (shared != null) { if (shared != null) {
shared.incRef(); shared.incRef();
((MultiListener) shared.listener).add(listener); ((MultiListener) shared.listener).add(listener);
return new Delegate(listener, shared); } else {
MultiListener multiListener = new MultiListener();
multiListener.add(listener);
shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config));
sharedChannels.put(config, shared);
} }
MultiListener multiListener = new MultiListener(); return new Delegate(listener, shared);
multiListener.add(listener);
shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config));
sharedChannels.put(config, shared);
return shared;
} }
} }
@ -184,6 +185,7 @@ public abstract class MulticastChannel implements Closeable {
boolean removed = ((MultiListener) shared.listener).remove(listener); boolean removed = ((MultiListener) shared.listener).remove(listener);
assert removed : "a listener should be removed"; assert removed : "a listener should be removed";
if (shared.decRef() == 0) { if (shared.decRef() == 0) {
assert ((MultiListener) shared.listener).listeners.isEmpty();
sharedChannels.remove(shared.channel.getConfig()); sharedChannels.remove(shared.channel.getConfig());
shared.channel.close(); shared.channel.close();
} }
@ -213,6 +215,11 @@ public abstract class MulticastChannel implements Closeable {
channel.send(data); channel.send(data);
} }
@Override
public void close() {
assert false : "Shared references should never be closed directly, only via Delegate";
}
@Override @Override
protected void close(Listener listener) { protected void close(Listener listener) {
close(this, listener); close(this, listener);
@ -223,7 +230,7 @@ public abstract class MulticastChannel implements Closeable {
* A light weight delegate that wraps another channel, mainly to support delegating * A light weight delegate that wraps another channel, mainly to support delegating
* the close method with the provided listener and not holding existing listener. * the close method with the provided listener and not holding existing listener.
*/ */
private static class Delegate extends MulticastChannel { private final static class Delegate extends MulticastChannel {
private final MulticastChannel channel; private final MulticastChannel channel;