This closes #463 ARTEMIS-474 and ARTEMIS-484

This commit is contained in:
Clebert Suconic 2016-04-14 21:14:17 -04:00
commit a3ae2c4ad2
32 changed files with 883 additions and 350 deletions

View File

@ -16,6 +16,11 @@
*/
package org.apache.activemq.artemis.api.core;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
/**
@ -25,11 +30,49 @@ import org.jgroups.JChannel;
*/
public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory {
private static final Logger logger = Logger.getLogger(ChannelBroadcastEndpointFactory.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final JChannel channel;
private final String channelName;
private final JChannelManager manager;
private static final Map<JChannel, JChannelManager> managers = new ConcurrentHashMap<>();
private static final JChannelManager singletonManager = new JChannelManager();
// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly
//
// private static JChannelManager recoverManager(JChannel channel) {
// JChannelManager manager = managers.get(channel);
// if (manager == null) {
// if (isTrace) {
// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
// }
// manager = new JChannelManager();
// managers.put(channel, manager);
// }
// else {
// if (isTrace) {
// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace"));
// }
//
// }
//
// return manager;
// }
//
public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) {
// TODO: use recoverManager(channel)
this(singletonManager, channel, channelName);
}
private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel channel, String channelName) {
if (isTrace) {
logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " + channel + ", " + channelName, new Exception("trace"));
}
this.manager = manager;
this.channel = channel;
this.channelName = channelName;
}
@ -42,8 +85,17 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
return channelName;
}
@Override
public String toString() {
return "ChannelBroadcastEndpointFactory{" +
"channel=" + channel +
", channelName='" + channelName + '\'' +
", manager=" + manager +
'}';
}
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel();
return new JGroupsChannelBroadcastEndpoint(manager, channel, channelName).initChannel();
}
}

View File

@ -16,15 +16,11 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
import org.apache.activemq.artemis.api.core.jgroups.JGroupsReceiver;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
import org.jgroups.ReceiverAdapter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
@ -32,6 +28,9 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final String channelName;
private boolean clientOpened;
@ -42,12 +41,16 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
private JGroupsReceiver receiver;
public JGroupsBroadcastEndpoint(String channelName) {
private JChannelManager manager;
public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) {
this.manager = manager;
this.channelName = channelName;
}
@Override
public void broadcast(final byte[] data) throws Exception {
if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
if (broadcastOpened) {
org.jgroups.Message msg = new org.jgroups.Message();
@ -59,6 +62,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
@Override
public byte[] receiveBroadcast() throws Exception {
if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
if (clientOpened) {
return receiver.receiveBroadcast();
}
@ -69,6 +73,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
@Override
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
if (clientOpened) {
return receiver.receiveBroadcast(time, unit);
}
@ -99,7 +104,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
public abstract JChannel createChannel() throws Exception;
public JGroupsBroadcastEndpoint initChannel() throws Exception {
this.channel = JChannelManager.getJChannel(channelName, this);
this.channel = manager.getJChannel(channelName, this);
return this;
}
@ -128,146 +133,4 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
channel.close(true);
}
/**
* This class is used to receive messages from a JGroups channel.
* Incoming messages are put into a queue.
*/
private static final class JGroupsReceiver extends ReceiverAdapter {
private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
@Override
public void receive(org.jgroups.Message msg) {
dequeue.add(msg.getBuffer());
}
public byte[] receiveBroadcast() throws Exception {
return dequeue.take();
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
return dequeue.poll(time, unit);
}
}
/**
* This class wraps a JChannel with a reference counter. The reference counter
* controls the life of the JChannel. When reference count is zero, the channel
* will be disconnected.
*/
protected static class JChannelWrapper {
int refCount = 1;
JChannel channel;
String channelName;
final List<JGroupsReceiver> receivers = new ArrayList<>();
public JChannelWrapper(String channelName, JChannel channel) throws Exception {
this.refCount = 1;
this.channelName = channelName;
this.channel = channel;
//we always add this for the first ref count
channel.setReceiver(new ReceiverAdapter() {
@Override
public void receive(org.jgroups.Message msg) {
synchronized (receivers) {
for (JGroupsReceiver r : receivers) {
r.receive(msg);
}
}
}
});
}
public synchronized void close(boolean closeWrappedChannel) {
refCount--;
if (refCount == 0) {
if (closeWrappedChannel) {
JChannelManager.closeChannel(this.channelName, channel);
}
else {
JChannelManager.removeChannel(this.channelName);
}
//we always remove the receiver as its no longer needed
channel.setReceiver(null);
}
}
public void removeReceiver(JGroupsReceiver receiver) {
synchronized (receivers) {
receivers.remove(receiver);
}
}
public synchronized void connect() throws Exception {
if (channel.isConnected())
return;
channel.connect(channelName);
}
public void addReceiver(JGroupsReceiver jGroupsReceiver) {
synchronized (receivers) {
receivers.add(jGroupsReceiver);
}
}
public void send(org.jgroups.Message msg) throws Exception {
channel.send(msg);
}
public JChannelWrapper addRef() {
this.refCount++;
return this;
}
@Override
public String toString() {
return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
}
}
/**
* This class maintain a global Map of JChannels wrapped in JChannelWrapper for
* the purpose of reference counting.
*
* Wherever a JChannel is needed it should only get it by calling the getChannel()
* method of this class. The real disconnect of channels are also done here only.
*/
protected static class JChannelManager {
private static Map<String, JChannelWrapper> channels;
public static synchronized JChannelWrapper getJChannel(String channelName,
JGroupsBroadcastEndpoint endpoint) throws Exception {
if (channels == null) {
channels = new HashMap<>();
}
JChannelWrapper wrapper = channels.get(channelName);
if (wrapper == null) {
wrapper = new JChannelWrapper(channelName, endpoint.createChannel());
channels.put(channelName, wrapper);
return wrapper;
}
return wrapper.addRef();
}
public static synchronized void closeChannel(String channelName, JChannel channel) {
channel.setReceiver(null);
channel.disconnect();
channel.close();
JChannelWrapper wrapper = channels.remove(channelName);
if (wrapper == null) {
throw new IllegalStateException("Did not find channel " + channelName);
}
}
public static void removeChannel(String channelName) {
JChannelWrapper wrapper = channels.remove(channelName);
if (wrapper == null) {
throw new IllegalStateException("Did not find channel " + channelName);
}
}
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
import org.jgroups.JChannel;
/**
@ -27,8 +29,8 @@ public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint {
private final JChannel jChannel;
public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws Exception {
super(channelName);
public JGroupsChannelBroadcastEndpoint(JChannelManager manager, JChannel jChannel, final String channelName) {
super(manager, channelName);
this.jChannel = jChannel;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jgroups.JChannel;
import java.net.URL;
@ -27,8 +28,8 @@ public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint
private String file;
public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws Exception {
super(channelName);
public JGroupsFileBroadcastEndpoint(final JChannelManager manager, final String file, final String channelName) throws Exception {
super(manager, channelName);
this.file = file;
}

View File

@ -16,15 +16,20 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory {
private String file;
private String channelName;
private final JChannelManager manager = new JChannelManager();
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel();
return new JGroupsFileBroadcastEndpoint(manager, file, channelName).initChannel();
}
public String getFile() {

View File

@ -16,18 +16,19 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator;
/**
* This class is the implementation of ActiveMQ Artemis members discovery that will use JGroups.
*/
public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint {
public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint {
private String properties;
public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName) throws Exception {
super(channelName);
public JGroupsPropertiesBroadcastEndpoint(final JChannelManager manager, final String properties, final String channelName) throws Exception {
super(manager, channelName);
this.properties = properties;
}
@ -37,3 +38,4 @@ public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEn
return new JChannel(configurator);
}
}

View File

@ -16,15 +16,19 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory {
private String properties;
private String channelName;
private final JChannelManager manager = new JChannelManager();
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel();
return new JGroupsPropertiesBroadcastEndpoint(manager, properties, channelName).initChannel();
}
public String getProperties() {

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.jgroups;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint;
import org.jboss.logging.Logger;
/**
* This class maintain a global Map of JChannels wrapped in JChannelWrapper for
* the purpose of reference counting.
*
* Wherever a JChannel is needed it should only get it by calling the getChannel()
* method of this class. The real disconnect of channels are also done here only.
*/
public class JChannelManager {
private static final Logger logger = Logger.getLogger(JChannelManager.class);
private static final boolean isTrace = logger.isTraceEnabled();
private Map<String, JChannelWrapper> channels;
public synchronized JChannelWrapper getJChannel(String channelName,
JGroupsBroadcastEndpoint endpoint) throws Exception {
if (channels == null) {
channels = new HashMap<>();
}
JChannelWrapper wrapper = channels.get(channelName);
if (wrapper == null) {
wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());
channels.put(channelName, wrapper);
if (isTrace)
logger.trace("Put Channel " + channelName);
return wrapper;
}
if (isTrace)
logger.trace("Add Ref Count " + channelName);
return wrapper.addRef();
}
public synchronized void removeChannel(String channelName) {
channels.remove(channelName);
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.jgroups;
import java.util.ArrayList;
import java.util.List;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
import org.jgroups.ReceiverAdapter;
/**
* This class wraps a JChannel with a reference counter. The reference counter
* controls the life of the JChannel. When reference count is zero, the channel
* will be disconnected.
*/
public class JChannelWrapper {
private static final Logger logger = Logger.getLogger(JChannelWrapper.class);
private static final boolean isTrace = logger.isTraceEnabled();
private boolean connected = false;
int refCount = 1;
final JChannel channel;
final String channelName;
final List<JGroupsReceiver> receivers = new ArrayList<>();
private final JChannelManager manager;
public JChannelWrapper(JChannelManager manager, final String channelName, JChannel channel) throws Exception {
this.refCount = 1;
this.channelName = channelName;
this.channel = channel;
this.manager = manager;
if (channel.getReceiver() != null) {
logger.warn("The channel already had a receiver previously!!!!", new Exception("trace"));
}
//we always add this for the first ref count
channel.setReceiver(new ReceiverAdapter() {
@Override
public void receive(org.jgroups.Message msg) {
if (isTrace) {
logger.trace(this + ":: Wrapper received " + msg + " on channel " + channelName);
}
synchronized (receivers) {
for (JGroupsReceiver r : receivers) {
r.receive(msg);
}
}
}
});
}
public JChannel getChannel() {
return channel;
}
public String getChannelName() {
return channelName;
}
public synchronized void close(boolean closeWrappedChannel) {
refCount--;
if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
if (refCount == 0) {
if (closeWrappedChannel) {
connected = false;
channel.setReceiver(null);
logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
channel.close();
}
manager.removeChannel(channelName);
}
}
public void removeReceiver(JGroupsReceiver receiver) {
if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace"));
synchronized (receivers) {
receivers.remove(receiver);
}
}
public synchronized void connect() throws Exception {
if (isTrace) {
logger.trace(this + ":: Connecting " + channelName, new Exception("Trace"));
}
// It is important to check this otherwise we could reconnect an already connected channel
if (connected) {
return;
}
connected = true;
if (!channel.isConnected()) {
channel.connect(channelName);
}
}
public void addReceiver(JGroupsReceiver jGroupsReceiver) {
synchronized (receivers) {
if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName);
receivers.add(jGroupsReceiver);
}
}
public void send(org.jgroups.Message msg) throws Exception {
if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
channel.send(msg);
}
public JChannelWrapper addRef() {
this.refCount++;
if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName);
return this;
}
@Override
public String toString() {
return super.toString() +
"{refCount=" + refCount +
", channel=" + channel +
", channelName='" + channelName + '\'' +
", connected=" + connected +
'}';
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.jgroups;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
import org.jgroups.ReceiverAdapter;
/**
* This class is used to receive messages from a JGroups channel.
* Incoming messages are put into a queue.
*/
public class JGroupsReceiver extends ReceiverAdapter {
private static final Logger logger = Logger.getLogger(JGroupsReceiver.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
@Override
public void receive(org.jgroups.Message msg) {
if (isTrace) logger.trace("sending message " + msg);
dequeue.add(msg.getBuffer());
}
public byte[] receiveBroadcast() throws Exception {
byte[] bytes = dequeue.take();
if (isTrace) {
logBytes("receiveBroadcast()", bytes);
}
return bytes;
}
private void logBytes(String methodName, byte[] bytes) {
if (bytes != null) {
logger.trace(methodName + "::" + bytes.length + " bytes");
}
else {
logger.trace(methodName + ":: no bytes");
}
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
byte[] bytes = dequeue.poll(time, unit);
if (isTrace) {
logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes);
}
return bytes;
}
}

View File

@ -141,7 +141,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override
public String toString() {
return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
}
@Override

View File

@ -809,7 +809,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// how the sendSubscription happens.
// in case this ever changes.
if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) {
factory.cleanup();
if (factory != null) {
factory.cleanup();
}
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}

View File

@ -106,25 +106,41 @@ public class NettyConnection implements Connection {
}
@Override
public synchronized boolean isWritable(ReadyListener callback) {
if (!ready) {
readyListeners.push(callback);
}
public boolean isWritable(ReadyListener callback) {
synchronized (readyListeners) {
if (!ready) {
readyListeners.push(callback);
}
return ready;
return ready;
}
}
@Override
public synchronized void fireReady(final boolean ready) {
this.ready = ready;
public void fireReady(final boolean ready) {
LinkedList<ReadyListener> readyToCall = null;
synchronized (readyListeners) {
this.ready = ready;
if (ready) {
for (;;) {
ReadyListener readyListener = readyListeners.poll();
if (readyListener == null) {
return;
if (ready) {
for (;;) {
ReadyListener readyListener = readyListeners.poll();
if (readyListener == null) {
break;
}
if (readyToCall == null) {
readyToCall = new LinkedList<>();
}
readyToCall.add(readyListener);
}
}
}
if (readyToCall != null) {
for (ReadyListener readyListener : readyToCall) {
try {
readyListener.readyForWriting();
}

View File

@ -58,12 +58,12 @@ public interface ActiveMQJMSClientLogger extends BasicLogger {
void errorCallingExcListener(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 124002, value = "Queue Browser failed to create message", format = Message.Format.MESSAGE_FORMAT)
void errorCreatingMessage(@Cause Throwable e);
@Message(id = 124002, value = "Queue Browser failed to create message {0}", format = Message.Format.MESSAGE_FORMAT)
void errorCreatingMessage(String messageToString, @Cause Throwable e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 124003, value = "Message Listener failed to prepare message for receipt", format = Message.Format.MESSAGE_FORMAT)
void errorPreparingMessageForReceipt(@Cause Throwable e);
@Message(id = 124003, value = "Message Listener failed to prepare message for receipt, message={0}", format = Message.Format.MESSAGE_FORMAT)
void errorPreparingMessageForReceipt(String messagetoString, @Cause Throwable e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 124004, value = "Message Listener failed to process message", format = Message.Format.MESSAGE_FORMAT)

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
/**
* ActiveMQ Artemis implementation of a JMS MessageConsumer.
@ -211,7 +212,18 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE;
jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null);
jmsMsg.doBeforeReceive();
try {
jmsMsg.doBeforeReceive();
}
catch (IndexOutOfBoundsException ioob) {
// In case this exception happen you will need to know where it happened.
// it has been a bug here in the past, and this was used to debug it.
// nothing better than keep it for future investigations in case it happened again
IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage());
newIOOB.initCause(ioob);
ActiveMQClientLogger.LOGGER.warn(newIOOB.getMessage(), newIOOB);
throw ioob;
}
// We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered
// https://issues.jboss.org/browse/JBPAPP-6110

View File

@ -141,7 +141,7 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
msg.doBeforeReceive();
}
catch (Exception e) {
ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(e);
ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(msg.getCoreMessage().toString(), e);
return null;
}

View File

@ -73,7 +73,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
msg.doBeforeReceive();
}
catch (Exception e) {
ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(e);
ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);
return;
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ReusableLatch;
public class AIOSequentialFile extends AbstractSequentialFile {
@ -202,7 +203,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
*/
@Override
public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) {
checkOpened();
try {
checkOpened();
}
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
callback.onError(-1, e.getMessage());
return;
}
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.jboss.logging.Logger;
public class FileIOUtil {
private static final Logger logger = Logger.getLogger(Logger.class);
private static final boolean isTrace = logger.isTraceEnabled();
public static void copyData(SequentialFile from, SequentialFile to, ByteBuffer buffer) throws Exception {
boolean fromIsOpen = from.isOpen();
boolean toIsOpen = to.isOpen();
from.close();
from.open();
if (!toIsOpen) {
to.open();
}
to.position(to.size());
from.position(0);
try {
for (;;) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = from.read(buffer);
if (isTrace) {
logger.trace("appending " + bytesRead + " bytes on " + to.getFileName());
}
if (bytesRead > 0) {
to.writeDirect(buffer, false);
}
if (bytesRead < buffer.capacity()) {
logger.trace("Interrupting reading as the whole thing was sent on " + to.getFileName());
break;
}
}
}
finally {
if (!fromIsOpen) {
from.close();
}
else {
from.position(from.size());
}
if (!toIsOpen) {
to.close();
}
else {
to.position(to.size());
}
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.aio;
import java.io.File;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.util.FileIOUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class FileIOUtilTest {
@Rule
public TemporaryFolder temporaryFolder;
public FileIOUtilTest() {
File parent = new File("./target");
parent.mkdirs();
temporaryFolder = new TemporaryFolder(parent);
}
@Test
public void testCopy() throws Exception {
System.out.println("Data at " + temporaryFolder.getRoot());
SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
SequentialFile file = factory.createSequentialFile("file1.bin");
file.open();
ByteBuffer buffer = ByteBuffer.allocate(204800);
buffer.put(new byte[204800]);
buffer.rewind();
file.writeDirect(buffer, true);
buffer = ByteBuffer.allocate(409605);
buffer.put(new byte[409605]);
buffer.rewind();
SequentialFile file2 = factory.createSequentialFile("file2.bin");
file2.open();
file2.writeDirect(buffer, true);
// This is allocating a reusable buffer to perform the copy, just like it's used within LargeMessageInSync
buffer = ByteBuffer.allocate(4 * 1024);
SequentialFile newFile = factory.createSequentialFile("file1.cop");
FileIOUtil.copyData(file, newFile, buffer);
SequentialFile newFile2 = factory.createSequentialFile("file2.cop");
FileIOUtil.copyData(file2, newFile2, buffer);
Assert.assertEquals(file.size(), newFile.size());
Assert.assertEquals(file2.size(), newFile2.size());
newFile.close();
newFile2.close();
file.close();
file2.close();
System.out.println("Test result::");
}
}

View File

@ -174,7 +174,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
finally {
try {
if (page != null) {
page.close();
page.close(false);
}
}
catch (Throwable ignored) {
@ -448,14 +448,14 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
finally {
try {
depagedPage.close();
depagedPage.close(false);
}
catch (Exception e) {
}
storageManager.afterPageRead();
}
depagedPage.close();
depagedPage.close(false);
pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
}
else {

View File

@ -216,8 +216,14 @@ public final class Page implements Comparable<Page> {
file.position(0);
}
public synchronized void close() throws Exception {
if (storageManager != null) {
public void close() throws Exception {
close(false);
}
/** sendEvent means it's a close happening from a major event such moveNext.
* While reading the cache we don't need (and shouldn't inform the backup */
public synchronized void close(boolean sendEvent) throws Exception {
if (sendEvent && storageManager != null) {
storageManager.pageClosed(storeName, pageId);
}
if (pageCache != null) {

View File

@ -348,7 +348,7 @@ public class PagingStoreImpl implements PagingStore {
flushExecutors();
if (currentPage != null) {
currentPage.close();
currentPage.close(false);
currentPage = null;
}
}
@ -390,7 +390,7 @@ public class PagingStoreImpl implements PagingStore {
currentPageId = 0;
if (currentPage != null) {
currentPage.close();
currentPage.close(false);
}
currentPage = null;
@ -589,7 +589,7 @@ public class PagingStoreImpl implements PagingStore {
}
returnPage = currentPage;
returnPage.close();
returnPage.close(false);
currentPage = null;
// The current page is empty... which means we reached the end of the pages
@ -1021,7 +1021,7 @@ public class PagingStoreImpl implements PagingStore {
int tmpCurrentPageId = currentPageId + 1;
if (currentPage != null) {
currentPage.close();
currentPage.close(true);
}
currentPage = createPage(tmpCurrentPageId);

View File

@ -21,14 +21,19 @@ import java.nio.ByteBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.util.FileIOUtil;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.StorageManager.LargeMessageExtension;
import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.jboss.logging.Logger;
public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
private static final Logger logger = Logger.getLogger(LargeServerMessageInSync.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final LargeServerMessage mainLM;
private final StorageManager storageManager;
private SequentialFile appendFile;
@ -50,20 +55,33 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
if (!mainSeqFile.isOpen()) {
mainSeqFile.open();
}
if (appendFile != null) {
appendFile.close();
appendFile.open();
for (;;) {
buffer.rewind();
int bytesRead = appendFile.read(buffer);
if (bytesRead > 0)
mainSeqFile.writeDirect(buffer, false);
if (bytesRead < buffer.capacity()) {
break;
try {
if (appendFile != null) {
if (isTrace) {
logger.trace("joinSyncedData on " + mainLM + ", currentSize on mainMessage=" + mainSeqFile.size() + ", appendFile size = " + appendFile.size());
}
FileIOUtil.copyData(appendFile, mainSeqFile, buffer);
deleteAppendFile();
}
else {
if (isTrace) {
logger.trace("joinSyncedData, appendFile is null, ignoring joinSyncedData on " + mainLM);
}
}
deleteAppendFile();
}
catch (Throwable e) {
logger.warn("Error while sincing data on largeMessageInSync::" + mainLM);
}
if (isTrace) {
logger.trace("joinedSyncData on " + mainLM + " finished with " + mainSeqFile.size());
}
syncDone = true;
}
@ -85,6 +103,9 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
@Override
public synchronized void releaseResources() {
if (isTrace) {
logger.warn("release resources called on " + mainLM, new Exception("trace"));
}
mainLM.releaseResources();
if (appendFile != null && appendFile.isOpen()) {
try {
@ -122,11 +143,19 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
public synchronized void addBytes(byte[] bytes) throws Exception {
if (deleted)
return;
if (syncDone) {
if (isTrace) {
logger.trace("Adding " + bytes.length + " towards sync message::" + mainLM);
}
mainLM.addBytes(bytes);
return;
}
if (isTrace) {
logger.trace("addBytes(bytes.length=" + bytes.length + ") on message=" + mainLM);
}
if (appendFile == null) {
appendFile = storageManager.createFileForLargeMessage(mainLM.getMessageID(), LargeMessageExtension.SYNC);
}

View File

@ -57,6 +57,13 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl {
return result;
}
@Override
public String toString() {
return "ReplicationLargeMessageBeginMessage{" +
"messageId=" + messageId +
'}';
}
@Override
public boolean equals(Object obj) {
if (this == obj)

View File

@ -57,6 +57,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
return result;
}
@Override
public String toString() {
return "ReplicationLargeMessageEndMessage{" +
"messageId=" + messageId +
'}';
}
@Override
public boolean equals(Object obj) {
if (this == obj)

View File

@ -80,6 +80,14 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
return result;
}
@Override
public String toString() {
return "ReplicationLargeMessageWriteMessage{" +
"messageId=" + messageId +
", body.size=" + body.length +
'}';
}
@Override
public boolean equals(Object obj) {
if (this == obj)

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.jboss.logging.Logger;
/**
* Handles all the synchronization necessary for replication on the backup side (that is the
@ -87,7 +88,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
*/
public final class ReplicationEndpoint implements ChannelHandler, ActiveMQComponent {
private static final boolean trace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
private static final Logger logger = Logger.getLogger(ReplicationEndpoint.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final IOCriticalErrorListener criticalErrorListener;
private final ActiveMQServerImpl server;
@ -153,11 +155,18 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
@Override
public void handlePacket(final Packet packet) {
if (isTrace) {
logger.trace("handlePacket::handling " + packet);
}
PacketImpl response = new ReplicationResponseMessage();
final byte type = packet.getType();
try {
if (!started) {
if (isTrace) {
logger.trace("handlePacket::ignoring " + packet);
}
return;
}
@ -306,7 +315,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
for (Page page : map.values()) {
try {
page.sync();
page.close();
page.close(false);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingPageOnReplication(e);
@ -340,56 +349,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
this.channel = channel;
}
public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws ActiveMQException {
if (!activation.isRemoteBackupUpToDate()) {
throw ActiveMQMessageBundle.BUNDLE.journalsNotInSync();
}
if (journalLoadInformation == null || journalLoadInformation.length != journalInformation.length) {
throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals();
}
for (int i = 0; i < journalInformation.length; i++) {
if (!journalInformation[i].equals(journalLoadInformation[i])) {
ActiveMQServerLogger.LOGGER.journalcomparisonMismatch(journalParametersToString(journalInformation));
throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals();
}
}
}
/**
* Used on tests only. To simulate missing page deletes
*/
public void setDeletePages(final boolean deletePages) {
this.deletePages = deletePages;
}
/**
* @param journalInformation
*/
private String journalParametersToString(final JournalLoadInformation[] journalInformation) {
return "**********************************************************\n" + "parameters:\n" +
"BindingsImpl = " +
journalInformation[0] +
"\n" +
"Messaging = " +
journalInformation[1] +
"\n" +
"**********************************************************" +
"\n" +
"Expected:" +
"\n" +
"BindingsImpl = " +
journalLoadInformation[0] +
"\n" +
"Messaging = " +
journalLoadInformation[1] +
"\n" +
"**********************************************************";
}
private void finishSynchronization(String liveID) throws Exception {
if (isTrace) {
logger.trace("finishSynchronization::" + liveID);
}
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc);
journal.synchronizationLock();
@ -427,7 +390,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* @param msg
* @throws Exception
*/
private synchronized void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
Long id = Long.valueOf(msg.getId());
byte[] data = msg.getData();
SequentialFile channel1;
@ -462,7 +425,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
if (data == null) {
channel1.close();
return;
}
@ -477,69 +439,73 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* {@link FileWrapperJournal} in place to store messages while synchronization is going on.
*
* @param packet
* @throws Exception
* @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise
* return an empty response
* return an empty response
* @throws Exception
*/
private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception {
if (isTrace) {
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
}
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (activation.isRemoteBackupUpToDate()) {
throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate();
if (!started)
return replicationResponseMessage;
if (packet.isSynchronizationFinished()) {
finishSynchronization(packet.getNodeID());
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
return replicationResponseMessage;
}
synchronized (this) {
if (!started)
return replicationResponseMessage;
switch (packet.getDataType()) {
case LargeMessages:
for (long msgID : packet.getFileIds()) {
createLargeMessage(msgID, true);
}
break;
case JournalBindings:
case JournalMessages:
if (wantedFailBack && !packet.isServerToFailBack()) {
ActiveMQServerLogger.LOGGER.autoFailBackDenied();
}
if (packet.isSynchronizationFinished()) {
finishSynchronization(packet.getNodeID());
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
return replicationResponseMessage;
}
final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType());
final Journal journal = journalsHolder.get(journalContent);
switch (packet.getDataType()) {
case LargeMessages:
for (long msgID : packet.getFileIds()) {
createLargeMessage(msgID, true);
}
break;
case JournalBindings:
case JournalMessages:
if (wantedFailBack && !packet.isServerToFailBack()) {
ActiveMQServerLogger.LOGGER.autoFailBackDenied();
}
if (packet.getNodeID() != null) {
// At the start of replication, we still do not know which is the nodeID that the live uses.
// This is the point where the backup gets this information.
backupQuorum.liveIDSet(packet.getNodeID());
}
Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent);
final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType());
final Journal journal = journalsHolder.get(journalContent);
if (packet.getNodeID() != null) {
// At the start of replication, we still do not know which is the nodeID that the live uses.
// This is the point where the backup gets this information.
backupQuorum.liveIDSet(packet.getNodeID());
}
Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent);
for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
}
FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
registerJournal(journalContent.typeByte, syncJournal);
break;
default:
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
}
for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
}
FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
registerJournal(journalContent.typeByte, syncJournal);
break;
default:
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
}
return replicationResponseMessage;
}
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {
if (isTrace) {
logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
}
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
if (message != null) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
if (isTrace) {
logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
}
message.deleteFile();
}
catch (Exception e) {
@ -560,7 +526,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
}
private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete, final boolean createIfNotExists) {
private ReplicatedLargeMessage lookupLargeMessage(final long messageId,
final boolean delete,
final boolean createIfNotExists) {
ReplicatedLargeMessage message;
if (delete) {
@ -590,7 +558,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void handleLargeMessageBegin(final ReplicationLargeMessageBeginMessage packet) {
final long id = packet.getMessageId();
createLargeMessage(id, false);
ActiveMQServerLogger.LOGGER.trace("Receiving Large Message " + id + " on backup");
if (isTrace) {
logger.trace("Receiving Large Message Begin " + id + " on backup");
}
}
private void createLargeMessage(final long id, boolean liveToBackupSync) {
@ -666,14 +636,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception {
Journal journalToUse = getJournal(packet.getJournalID());
if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) {
if (ReplicationEndpoint.trace) {
ActiveMQServerLogger.LOGGER.trace("Endpoint appendUpdate id = " + packet.getId());
if (isTrace) {
logger.trace("Endpoint appendUpdate id = " + packet.getId());
}
journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
}
else {
if (ReplicationEndpoint.trace) {
ActiveMQServerLogger.LOGGER.trace("Endpoint append id = " + packet.getId());
if (isTrace) {
logger.trace("Endpoint append id = " + packet.getId());
}
journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
}
@ -698,7 +668,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
}
else {
page.close();
page.close(false);
}
}
@ -807,7 +777,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
*
* @param backupQuorum
*/
public synchronized void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) {
public void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) {
this.backupQuorum = backupQuorum;
}

View File

@ -70,6 +70,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
/**
* Manages replication tasks on the live server (that is the live server side of a "remote backup"
@ -81,6 +82,10 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
*/
public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
Logger logger = Logger.getLogger(ReplicationManager.class);
final boolean isTrace = logger.isTraceEnabled();
public enum ADD_OPERATION_TYPE {
UPDATE {
@Override
@ -330,7 +335,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
return sendReplicatePacket(packet, true);
}
private synchronized OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
if (!enabled)
return null;
boolean runItNow = false;
@ -578,6 +583,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
*/
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
if (enabled) {
if (isTrace) {
logger.trace("sendSynchronizationDone ::" + nodeID + ", " + initialReplicationSyncTimeout);
}
synchronizationIsFinishedAcknowledgement.countUp();
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
try {

View File

@ -408,8 +408,9 @@ public class ClusterController implements ActiveMQComponent {
}
}
catch (ActiveMQException e) {
if (!started)
if (!started) {
return;
}
server.getScheduledPool().schedule(this, serverLocator.getRetryInterval(), TimeUnit.MILLISECONDS);
}
}

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBacku
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -53,6 +54,10 @@ import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothi
public final class SharedNothingBackupActivation extends Activation {
private static final Logger logger = Logger.getLogger(SharedNothingBackupActivation.class);
private static final boolean isTrace = logger.isTraceEnabled();
//this is how we act when we start as a backup
private ReplicaPolicy replicaPolicy;
@ -129,43 +134,86 @@ public final class SharedNothingBackupActivation extends Activation {
}
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);
if (isTrace) {
logger.trace("Waiting on cluster connection");
}
//todo do we actually need to wait?
clusterController.awaitConnectionToReplicationCluster();
if (isTrace) {
logger.trace("Cluster Connected");
}
clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
// nodeManager.startBackup();
if (isTrace) {
logger.trace("Starting backup manager");
}
activeMQServer.getBackupManager().start();
if (isTrace) {
logger.trace("Set backup Quorum");
}
replicationEndpoint.setBackupQuorum(backupQuorum);
replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
EndpointConnector endpointConnector = new EndpointConnector();
if (isTrace) {
logger.trace("Starting Backup Server");
}
ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(), activeMQServer.getNodeManager().getNodeId());
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
if (isTrace) logger.trace("Setting server state as started");
SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
do {
//locate the first live server to try to replicate
nodeLocator.locateNode();
if (closed) {
if (isTrace) {
logger.trace("Activation is closed, so giving up");
}
return;
}
if (isTrace) {
logger.trace("looking up the node through nodeLocator.locateNode()");
}
//locate the first live server to try to replicate
nodeLocator.locateNode();
Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();
nodeID = nodeLocator.getNodeID();
if (isTrace) {
logger.trace("nodeID = " + nodeID);
}
//in a normal (non failback) scenario if we couldn't find our live server we should fail
if (!attemptFailBack) {
if (isTrace) {
logger.trace("attemptFailback=false, nodeID=" + nodeID);
}
//this shouldn't happen
if (nodeID == null)
if (nodeID == null) {
logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");
throw new RuntimeException("Could not establish the connection");
}
activeMQServer.getNodeManager().setNodeID(nodeID);
}
try {
if (isTrace) {
logger.trace("Calling clusterController.connectToNodeInReplicatedCluster(" + possibleLive.getA() + ")");
}
clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
}
catch (Exception e) {
logger.debug(e.getMessage(), e);
if (possibleLive.getB() != null) {
try {
clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB());
@ -176,6 +224,10 @@ public final class SharedNothingBackupActivation extends Activation {
}
}
if (clusterControl == null) {
if (isTrace) {
logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry");
}
//its ok to retry here since we haven't started replication yet
//it may just be the server has gone since discovery
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
@ -190,23 +242,43 @@ public final class SharedNothingBackupActivation extends Activation {
* process again on the next live server. All the action happens inside {@link BackupQuorum}
*/
signal = backupQuorum.waitForStatusChange();
if (isTrace) {
logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()");
}
/**
* replicationEndpoint will be holding lots of open files. Make sure they get
* closed/sync'ed.
*/
ActiveMQServerImpl.stopComponent(replicationEndpoint);
// time to give up
if (!activeMQServer.isStarted() || signal == STOP)
if (!activeMQServer.isStarted() || signal == STOP) {
if (isTrace) {
logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal);
}
return;
}
// time to fail over
else if (signal == FAIL_OVER)
else if (signal == FAIL_OVER) {
if (isTrace) {
logger.trace("signal == FAIL_OVER, breaking the loop");
}
break;
}
// something has gone badly run restart from scratch
else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) {
if (isTrace) {
logger.trace("Starting a new thread to stop the server!");
}
Thread startThread = new Thread(new Runnable() {
@Override
public void run() {
try {
if (isTrace) {
logger.trace("Calling activeMQServer.stop()");
}
activeMQServer.stop();
}
catch (Exception e) {
@ -227,17 +299,30 @@ public final class SharedNothingBackupActivation extends Activation {
}
} while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
if (isTrace) {
logger.trace("Activation loop finished, current signal = " + signal);
}
activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum);
if (!isRemoteBackupUpToDate()) {
logger.debug("throwing exception for !isRemoteBackupUptoDate");
throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync();
}
if (isTrace) {
logger.trace("setReplicaPolicy::" + replicaPolicy);
}
replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy());
synchronized (activeMQServer) {
if (!activeMQServer.isStarted())
if (!activeMQServer.isStarted()) {
logger.trace("Server is stopped, giving up right before becomingLive");
return;
}
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
activeMQServer.getNodeManager().stopBackup();
activeMQServer.getStorageManager().start();
@ -262,6 +347,9 @@ public final class SharedNothingBackupActivation extends Activation {
}
}
catch (Exception e) {
if (isTrace) {
logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e);
}
if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted())
// do not log these errors if the server is being stopped.
return;
@ -374,8 +462,10 @@ public final class SharedNothingBackupActivation extends Activation {
* @throws ActiveMQException
*/
public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException {
ActiveMQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
backupUpToDate);
if (isTrace) {
logger.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
backupUpToDate);
}
if (!activeMQServer.getHAPolicy().isBackup() || activeMQServer.getHAPolicy().isSharedStore()) {
throw new ActiveMQInternalErrorException();
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -53,7 +54,6 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
public static int messageChunkCount = 0;
private static final ReusableLatch ruleFired = new ReusableLatch(1);
private static ActiveMQServer backupServer;
private static ActiveMQServer liveServer;
@ -68,16 +68,12 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
Configuration liveConfig;
// To inform the main thread the condition is met
static final ReusableLatch flagArrived = new ReusableLatch(1);
static final ReusableLatch flagChunkEntered = new ReusableLatch(1);
// To wait while the condition is worked out
static final ReusableLatch flagWait = new ReusableLatch(1);
static final ReusableLatch flag15Arrived = new ReusableLatch(1);
// To wait while the condition is worked out
static final ReusableLatch flag15Wait = new ReusableLatch(1);
static final ReusableLatch flagChunkWait = new ReusableLatch(1);
// To inform the main thread the condition is met
static final ReusableLatch flagSyncArrived = new ReusableLatch(1);
static final ReusableLatch flagSyncEntered = new ReusableLatch(1);
// To wait while the condition is worked out
static final ReusableLatch flagSyncWait = new ReusableLatch(1);
@ -88,13 +84,12 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
System.out.println("Tmp::" + getTemporaryDir());
flagArrived.setCount(1);
flagWait.setCount(1);
flagChunkEntered.setCount(1);
flagChunkWait.setCount(1);
flag15Arrived.setCount(1);
flag15Wait.setCount(1);
flagSyncEntered.setCount(1);
flagSyncWait.setCount(1);
ruleFired.setCount(1);
messageChunkCount = 0;
TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
@ -188,7 +183,6 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
final MapMessage message = createLargeMessage();
t = new Thread() {
@Override
public void run() {
try {
producer.send(message);
@ -206,26 +200,24 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
// I'm trying to simulate the following race here:
// The message is syncing while the client is already sending the body of the message
Assert.assertTrue(flagArrived.await(10, TimeUnit.SECONDS));
Assert.assertTrue(flagChunkEntered.await(10, TimeUnit.SECONDS));
startBackup();
Assert.assertTrue(flagSyncArrived.await(10, TimeUnit.SECONDS));
Assert.assertTrue(flagSyncEntered.await(10, TimeUnit.SECONDS));
flagWait.countDown();
Assert.assertTrue(flag15Arrived.await(10, TimeUnit.SECONDS));
flag15Wait.countDown();
flagChunkWait.countDown();
t.join(5000);
flagSyncWait.countDown();
System.out.println("Thread joined");
Assert.assertFalse(t.isAlive());
flagSyncWait.countDown();
Assert.assertTrue(((SharedNothingBackupActivation)backupServer.getActivation()).waitForBackupSync(10, TimeUnit.SECONDS));
waitForRemoteBackup(connection.getSessionFactory(), 30);
@ -253,8 +245,8 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
public static void syncLargeMessage() {
try {
flagSyncArrived.countDown();
flagSyncWait.await(10, TimeUnit.SECONDS);
flagSyncEntered.countDown();
flagSyncWait.await(100, TimeUnit.SECONDS);
}
catch (Exception e) {
e.printStackTrace();
@ -266,13 +258,9 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
messageChunkCount++;
try {
if (messageChunkCount == 10) {
flagArrived.countDown();
flagWait.await(10, TimeUnit.SECONDS);
}
if (messageChunkCount == 15) {
flag15Arrived.countDown();
flag15Wait.await(10, TimeUnit.SECONDS);
if (messageChunkCount == 1) {
flagChunkEntered.countDown();
flagChunkWait.await(10, TimeUnit.SECONDS);
}
}
catch (Exception e) {