ARTEMIS-281 - add channel receiver correctly/
We also need to add the receiver whn the refcount = 1 and the channel may already be connected. https://issues.apache.org/jira/browse/ARTEMIS-281
This commit is contained in:
parent
b34ee8717f
commit
00cac50a37
|
@ -160,6 +160,19 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
||||||
this.refCount = 1;
|
this.refCount = 1;
|
||||||
this.channelName = channelName;
|
this.channelName = channelName;
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
|
|
||||||
|
//we always add this for the first ref count
|
||||||
|
channel.setReceiver(new ReceiverAdapter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void receive(org.jgroups.Message msg) {
|
||||||
|
synchronized (receivers) {
|
||||||
|
for (JGroupsReceiver r : receivers) {
|
||||||
|
r.receive(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void close(boolean closeWrappedChannel) {
|
public synchronized void close(boolean closeWrappedChannel) {
|
||||||
|
@ -171,6 +184,8 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
||||||
else {
|
else {
|
||||||
JChannelManager.removeChannel(this.channelName);
|
JChannelManager.removeChannel(this.channelName);
|
||||||
}
|
}
|
||||||
|
//we always remove the receiver as its no longer needed
|
||||||
|
channel.setReceiver(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,17 +198,6 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
||||||
public synchronized void connect() throws Exception {
|
public synchronized void connect() throws Exception {
|
||||||
if (channel.isConnected())
|
if (channel.isConnected())
|
||||||
return;
|
return;
|
||||||
channel.setReceiver(new ReceiverAdapter() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void receive(org.jgroups.Message msg) {
|
|
||||||
synchronized (receivers) {
|
|
||||||
for (JGroupsReceiver r : receivers) {
|
|
||||||
r.receive(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
channel.connect(channelName);
|
channel.connect(channelName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue