diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java index be5e04c4be..d7086a52ec 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java @@ -16,6 +16,11 @@ */ package org.apache.activemq.artemis.api.core; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; +import org.jboss.logging.Logger; import org.jgroups.JChannel; /** @@ -25,11 +30,49 @@ import org.jgroups.JChannel; */ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory { + private static final Logger logger = Logger.getLogger(ChannelBroadcastEndpointFactory.class); + private static final boolean isTrace = logger.isTraceEnabled(); + private final JChannel channel; private final String channelName; + private final JChannelManager manager; + + private static final Map managers = new ConcurrentHashMap<>(); + + private static final JChannelManager singletonManager = new JChannelManager(); +// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly +// +// private static JChannelManager recoverManager(JChannel channel) { +// JChannelManager manager = managers.get(channel); +// if (manager == null) { +// if (isTrace) { +// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace")); +// } +// manager = new JChannelManager(); +// managers.put(channel, manager); +// } +// else { +// if (isTrace) { +// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace")); +// } +// +// } +// +// return manager; +// } +// public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) { + // TODO: use recoverManager(channel) + this(singletonManager, channel, channelName); + } + + private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel channel, String channelName) { + if (isTrace) { + logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " + channel + ", " + channelName, new Exception("trace")); + } + this.manager = manager; this.channel = channel; this.channelName = channelName; } @@ -42,8 +85,17 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory return channelName; } + @Override + public String toString() { + return "ChannelBroadcastEndpointFactory{" + + "channel=" + channel + + ", channelName='" + channelName + '\'' + + ", manager=" + manager + + '}'; + } + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { - return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel(); + return new JGroupsChannelBroadcastEndpoint(manager, channel, channelName).initChannel(); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java index 5bcddbc548..7657b0bfcb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java @@ -16,15 +16,11 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; +import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper; +import org.apache.activemq.artemis.api.core.jgroups.JGroupsReceiver; +import org.jboss.logging.Logger; import org.jgroups.JChannel; -import org.jgroups.ReceiverAdapter; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; /** @@ -32,6 +28,9 @@ import java.util.concurrent.TimeUnit; */ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { + private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class); + + private static final boolean isTrace = logger.isTraceEnabled(); private final String channelName; private boolean clientOpened; @@ -42,12 +41,16 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { private JGroupsReceiver receiver; - public JGroupsBroadcastEndpoint(String channelName) { + private JChannelManager manager; + + public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) { + this.manager = manager; this.channelName = channelName; } @Override public void broadcast(final byte[] data) throws Exception { + if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (broadcastOpened) { org.jgroups.Message msg = new org.jgroups.Message(); @@ -59,6 +62,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { @Override public byte[] receiveBroadcast() throws Exception { + if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (clientOpened) { return receiver.receiveBroadcast(); } @@ -69,6 +73,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { @Override public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception { + if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (clientOpened) { return receiver.receiveBroadcast(time, unit); } @@ -99,7 +104,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { public abstract JChannel createChannel() throws Exception; public JGroupsBroadcastEndpoint initChannel() throws Exception { - this.channel = JChannelManager.getJChannel(channelName, this); + this.channel = manager.getJChannel(channelName, this); return this; } @@ -128,146 +133,4 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { channel.close(true); } - /** - * This class is used to receive messages from a JGroups channel. - * Incoming messages are put into a queue. - */ - private static final class JGroupsReceiver extends ReceiverAdapter { - - private final BlockingQueue dequeue = new LinkedBlockingDeque<>(); - - @Override - public void receive(org.jgroups.Message msg) { - dequeue.add(msg.getBuffer()); - } - - public byte[] receiveBroadcast() throws Exception { - return dequeue.take(); - } - - public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception { - return dequeue.poll(time, unit); - } - } - - /** - * This class wraps a JChannel with a reference counter. The reference counter - * controls the life of the JChannel. When reference count is zero, the channel - * will be disconnected. - */ - protected static class JChannelWrapper { - - int refCount = 1; - JChannel channel; - String channelName; - final List receivers = new ArrayList<>(); - - public JChannelWrapper(String channelName, JChannel channel) throws Exception { - this.refCount = 1; - this.channelName = channelName; - this.channel = channel; - - //we always add this for the first ref count - channel.setReceiver(new ReceiverAdapter() { - - @Override - public void receive(org.jgroups.Message msg) { - synchronized (receivers) { - for (JGroupsReceiver r : receivers) { - r.receive(msg); - } - } - } - }); - } - - public synchronized void close(boolean closeWrappedChannel) { - refCount--; - if (refCount == 0) { - if (closeWrappedChannel) { - JChannelManager.closeChannel(this.channelName, channel); - } - else { - JChannelManager.removeChannel(this.channelName); - } - //we always remove the receiver as its no longer needed - channel.setReceiver(null); - } - } - - public void removeReceiver(JGroupsReceiver receiver) { - synchronized (receivers) { - receivers.remove(receiver); - } - } - - public synchronized void connect() throws Exception { - if (channel.isConnected()) - return; - channel.connect(channelName); - } - - public void addReceiver(JGroupsReceiver jGroupsReceiver) { - synchronized (receivers) { - receivers.add(jGroupsReceiver); - } - } - - public void send(org.jgroups.Message msg) throws Exception { - channel.send(msg); - } - - public JChannelWrapper addRef() { - this.refCount++; - return this; - } - - @Override - public String toString() { - return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName; - } - } - - /** - * This class maintain a global Map of JChannels wrapped in JChannelWrapper for - * the purpose of reference counting. - * - * Wherever a JChannel is needed it should only get it by calling the getChannel() - * method of this class. The real disconnect of channels are also done here only. - */ - protected static class JChannelManager { - - private static Map channels; - - public static synchronized JChannelWrapper getJChannel(String channelName, - JGroupsBroadcastEndpoint endpoint) throws Exception { - if (channels == null) { - channels = new HashMap<>(); - } - JChannelWrapper wrapper = channels.get(channelName); - if (wrapper == null) { - wrapper = new JChannelWrapper(channelName, endpoint.createChannel()); - channels.put(channelName, wrapper); - return wrapper; - } - return wrapper.addRef(); - } - - public static synchronized void closeChannel(String channelName, JChannel channel) { - channel.setReceiver(null); - channel.disconnect(); - channel.close(); - JChannelWrapper wrapper = channels.remove(channelName); - if (wrapper == null) { - throw new IllegalStateException("Did not find channel " + channelName); - } - } - - public static void removeChannel(String channelName) { - JChannelWrapper wrapper = channels.remove(channelName); - if (wrapper == null) { - throw new IllegalStateException("Did not find channel " + channelName); - } - } - } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java index 4fbb24c2ad..96cfee64c7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; +import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper; import org.jgroups.JChannel; /** @@ -27,8 +29,8 @@ public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint { private final JChannel jChannel; - public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws Exception { - super(channelName); + public JGroupsChannelBroadcastEndpoint(JChannelManager manager, JChannel jChannel, final String channelName) { + super(manager, channelName); this.jChannel = jChannel; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java index 702cb5abab..be903d37df 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.jgroups.JChannel; import java.net.URL; @@ -27,8 +28,8 @@ public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint private String file; - public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws Exception { - super(channelName); + public JGroupsFileBroadcastEndpoint(final JChannelManager manager, final String file, final String channelName) throws Exception { + super(manager, channelName); this.file = file; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java index 132ac3ca9c..9f783e7813 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java @@ -16,15 +16,20 @@ */ package org.apache.activemq.artemis.api.core; + +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; + public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory { private String file; private String channelName; + private final JChannelManager manager = new JChannelManager(); + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { - return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel(); + return new JGroupsFileBroadcastEndpoint(manager, file, channelName).initChannel(); } public String getFile() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java index 25cefc3789..d10400a476 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java @@ -16,18 +16,19 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.jgroups.JChannel; import org.jgroups.conf.PlainConfigurator; /** * This class is the implementation of ActiveMQ Artemis members discovery that will use JGroups. */ -public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint { +public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint { private String properties; - public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName) throws Exception { - super(channelName); + public JGroupsPropertiesBroadcastEndpoint(final JChannelManager manager, final String properties, final String channelName) throws Exception { + super(manager, channelName); this.properties = properties; } @@ -37,3 +38,4 @@ public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEn return new JChannel(configurator); } } + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java index 4d804354fa..8ed03ab7d0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java @@ -16,15 +16,19 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; + public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory { private String properties; private String channelName; + private final JChannelManager manager = new JChannelManager(); + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { - return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel(); + return new JGroupsPropertiesBroadcastEndpoint(manager, properties, channelName).initChannel(); } public String getProperties() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java new file mode 100644 index 0000000000..296dc8a934 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.jgroups; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint; +import org.jboss.logging.Logger; + +/** + * This class maintain a global Map of JChannels wrapped in JChannelWrapper for + * the purpose of reference counting. + * + * Wherever a JChannel is needed it should only get it by calling the getChannel() + * method of this class. The real disconnect of channels are also done here only. + */ +public class JChannelManager { + + private static final Logger logger = Logger.getLogger(JChannelManager.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + private Map channels; + + public synchronized JChannelWrapper getJChannel(String channelName, + JGroupsBroadcastEndpoint endpoint) throws Exception { + if (channels == null) { + channels = new HashMap<>(); + } + JChannelWrapper wrapper = channels.get(channelName); + if (wrapper == null) { + wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel()); + channels.put(channelName, wrapper); + if (isTrace) + logger.trace("Put Channel " + channelName); + return wrapper; + } + if (isTrace) + logger.trace("Add Ref Count " + channelName); + return wrapper.addRef(); + } + + public synchronized void removeChannel(String channelName) { + channels.remove(channelName); + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java new file mode 100644 index 0000000000..08a8ff8377 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.jgroups; + +import java.util.ArrayList; +import java.util.List; + +import org.jboss.logging.Logger; +import org.jgroups.JChannel; +import org.jgroups.ReceiverAdapter; + +/** + * This class wraps a JChannel with a reference counter. The reference counter + * controls the life of the JChannel. When reference count is zero, the channel + * will be disconnected. + */ +public class JChannelWrapper { + private static final Logger logger = Logger.getLogger(JChannelWrapper.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + private boolean connected = false; + int refCount = 1; + final JChannel channel; + final String channelName; + final List receivers = new ArrayList<>(); + private final JChannelManager manager; + + public JChannelWrapper(JChannelManager manager, final String channelName, JChannel channel) throws Exception { + this.refCount = 1; + this.channelName = channelName; + this.channel = channel; + this.manager = manager; + + + if (channel.getReceiver() != null) { + logger.warn("The channel already had a receiver previously!!!!", new Exception("trace")); + } + + //we always add this for the first ref count + channel.setReceiver(new ReceiverAdapter() { + + @Override + public void receive(org.jgroups.Message msg) { + if (isTrace) { + logger.trace(this + ":: Wrapper received " + msg + " on channel " + channelName); + } + synchronized (receivers) { + for (JGroupsReceiver r : receivers) { + r.receive(msg); + } + } + } + }); + } + + public JChannel getChannel() { + return channel; + } + + public String getChannelName() { + return channelName; + } + + public synchronized void close(boolean closeWrappedChannel) { + refCount--; + if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace")); + if (refCount == 0) { + if (closeWrappedChannel) { + connected = false; + channel.setReceiver(null); + logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace")); + channel.close(); + } + manager.removeChannel(channelName); + } + } + + public void removeReceiver(JGroupsReceiver receiver) { + if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace")); + synchronized (receivers) { + receivers.remove(receiver); + } + } + + public synchronized void connect() throws Exception { + if (isTrace) { + logger.trace(this + ":: Connecting " + channelName, new Exception("Trace")); + } + + // It is important to check this otherwise we could reconnect an already connected channel + if (connected) { + return; + } + + connected = true; + + if (!channel.isConnected()) { + channel.connect(channelName); + } + } + + public void addReceiver(JGroupsReceiver jGroupsReceiver) { + synchronized (receivers) { + if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName); + receivers.add(jGroupsReceiver); + } + } + + public void send(org.jgroups.Message msg) throws Exception { + if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg); + channel.send(msg); + } + + public JChannelWrapper addRef() { + this.refCount++; + if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName); + return this; + } + + @Override + public String toString() { + return super.toString() + + "{refCount=" + refCount + + ", channel=" + channel + + ", channelName='" + channelName + '\'' + + ", connected=" + connected + + '}'; + } +} + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java new file mode 100644 index 0000000000..c931661557 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.jgroups; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.jboss.logging.Logger; +import org.jgroups.ReceiverAdapter; + +/** + * This class is used to receive messages from a JGroups channel. + * Incoming messages are put into a queue. + */ +public class JGroupsReceiver extends ReceiverAdapter { + + private static final Logger logger = Logger.getLogger(JGroupsReceiver.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + private final BlockingQueue dequeue = new LinkedBlockingDeque<>(); + + @Override + public void receive(org.jgroups.Message msg) { + if (isTrace) logger.trace("sending message " + msg); + dequeue.add(msg.getBuffer()); + } + + public byte[] receiveBroadcast() throws Exception { + byte[] bytes = dequeue.take(); + if (isTrace) { + logBytes("receiveBroadcast()", bytes); + } + + return bytes; + } + + private void logBytes(String methodName, byte[] bytes) { + if (bytes != null) { + logger.trace(methodName + "::" + bytes.length + " bytes"); + } + else { + logger.trace(methodName + ":: no bytes"); + } + } + + public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception { + byte[] bytes = dequeue.poll(time, unit); + + if (isTrace) { + logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes); + } + + return bytes; + } +} + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index e7cc55ad57..53ba9df862 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -809,7 +809,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery // how the sendSubscription happens. // in case this ever changes. if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) { - factory.cleanup(); + if (factory != null) { + factory.cleanup(); + } throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index b84164f8fb..175ca9961c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -408,8 +408,9 @@ public class ClusterController implements ActiveMQComponent { } } catch (ActiveMQException e) { - if (!started) + if (!started) { return; + } server.getScheduledPool().schedule(this, serverLocator.getRetryInterval(), TimeUnit.MILLISECONDS); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 12e92980d5..d9a5c78c7b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBacku import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.logging.Logger; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -53,6 +54,10 @@ import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothi public final class SharedNothingBackupActivation extends Activation { + + private static final Logger logger = Logger.getLogger(SharedNothingBackupActivation.class); + private static final boolean isTrace = logger.isTraceEnabled(); + //this is how we act when we start as a backup private ReplicaPolicy replicaPolicy; @@ -129,43 +134,86 @@ public final class SharedNothingBackupActivation extends Activation { } ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); clusterController.addClusterTopologyListenerForReplication(nodeLocator); + + if (isTrace) { + logger.trace("Waiting on cluster connection"); + } //todo do we actually need to wait? clusterController.awaitConnectionToReplicationCluster(); + if (isTrace) { + logger.trace("Cluster Connected"); + } clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator)); // nodeManager.startBackup(); - + if (isTrace) { + logger.trace("Starting backup manager"); + } activeMQServer.getBackupManager().start(); + if (isTrace) { + logger.trace("Set backup Quorum"); + } replicationEndpoint.setBackupQuorum(backupQuorum); + replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor()); EndpointConnector endpointConnector = new EndpointConnector(); + if (isTrace) { + logger.trace("Starting Backup Server"); + } + ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(), activeMQServer.getNodeManager().getNodeId()); activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); + if (isTrace) logger.trace("Setting server state as started"); + SharedNothingBackupQuorum.BACKUP_ACTIVATION signal; do { - //locate the first live server to try to replicate - nodeLocator.locateNode(); + + if (closed) { + if (isTrace) { + logger.trace("Activation is closed, so giving up"); + } return; } + + + if (isTrace) { + logger.trace("looking up the node through nodeLocator.locateNode()"); + } + //locate the first live server to try to replicate + nodeLocator.locateNode(); Pair possibleLive = nodeLocator.getLiveConfiguration(); nodeID = nodeLocator.getNodeID(); + + if (isTrace) { + logger.trace("nodeID = " + nodeID); + } //in a normal (non failback) scenario if we couldn't find our live server we should fail if (!attemptFailBack) { + if (isTrace) { + logger.trace("attemptFailback=false, nodeID=" + nodeID); + } + //this shouldn't happen - if (nodeID == null) + if (nodeID == null) { + logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false"); throw new RuntimeException("Could not establish the connection"); + } activeMQServer.getNodeManager().setNodeID(nodeID); } try { + if (isTrace) { + logger.trace("Calling clusterController.connectToNodeInReplicatedCluster(" + possibleLive.getA() + ")"); + } clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA()); } catch (Exception e) { + logger.debug(e.getMessage(), e); if (possibleLive.getB() != null) { try { clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB()); @@ -176,6 +224,10 @@ public final class SharedNothingBackupActivation extends Activation { } } if (clusterControl == null) { + + if (isTrace) { + logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry"); + } //its ok to retry here since we haven't started replication yet //it may just be the server has gone since discovery Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster()); @@ -190,23 +242,43 @@ public final class SharedNothingBackupActivation extends Activation { * process again on the next live server. All the action happens inside {@link BackupQuorum} */ signal = backupQuorum.waitForStatusChange(); + + if (isTrace) { + logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()"); + } + /** * replicationEndpoint will be holding lots of open files. Make sure they get * closed/sync'ed. */ ActiveMQServerImpl.stopComponent(replicationEndpoint); // time to give up - if (!activeMQServer.isStarted() || signal == STOP) + if (!activeMQServer.isStarted() || signal == STOP) { + if (isTrace) { + logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal); + } return; + } // time to fail over - else if (signal == FAIL_OVER) + else if (signal == FAIL_OVER) { + if (isTrace) { + logger.trace("signal == FAIL_OVER, breaking the loop"); + } break; + } // something has gone badly run restart from scratch else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) { + if (isTrace) { + logger.trace("Starting a new thread to stop the server!"); + } + Thread startThread = new Thread(new Runnable() { @Override public void run() { try { + if (isTrace) { + logger.trace("Calling activeMQServer.stop()"); + } activeMQServer.stop(); } catch (Exception e) { @@ -227,17 +299,30 @@ public final class SharedNothingBackupActivation extends Activation { } } while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING); + if (isTrace) { + logger.trace("Activation loop finished, current signal = " + signal); + } + activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum); if (!isRemoteBackupUpToDate()) { + logger.debug("throwing exception for !isRemoteBackupUptoDate"); throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync(); } + + if (isTrace) { + logger.trace("setReplicaPolicy::" + replicaPolicy); + } + replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy); activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy()); + synchronized (activeMQServer) { - if (!activeMQServer.isStarted()) + if (!activeMQServer.isStarted()) { + logger.trace("Server is stopped, giving up right before becomingLive"); return; + } ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); activeMQServer.getNodeManager().stopBackup(); activeMQServer.getStorageManager().start(); @@ -262,6 +347,9 @@ public final class SharedNothingBackupActivation extends Activation { } } catch (Exception e) { + if (isTrace) { + logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e); + } if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted()) // do not log these errors if the server is being stopped. return; @@ -374,8 +462,10 @@ public final class SharedNothingBackupActivation extends Activation { * @throws ActiveMQException */ public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException { - ActiveMQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" + - backupUpToDate); + if (isTrace) { + logger.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" + + backupUpToDate); + } if (!activeMQServer.getHAPolicy().isBackup() || activeMQServer.getHAPolicy().isSharedStore()) { throw new ActiveMQInternalErrorException(); }