ARTEMIS-1297 Load balance or redistribution of AMQP Messages

This commit is contained in:
Clebert Suconic 2017-07-27 17:25:27 -04:00 committed by Justin Bertram
parent b19637a347
commit 6fda75a9fc
23 changed files with 515 additions and 71 deletions

View File

@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -64,7 +63,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
public class PrintData extends OptionalLocking {
static {
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
}
@Override

View File

@ -21,6 +21,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
public interface Persister<T extends Object> {
/** This is to be used to store the protocol-id on Messages.
* Messages are stored on their bare format.
* The protocol manager will be responsible to code or decode messages.
* The caveat here is that the first short-sized bytes need to be this constant. */
default byte getID() {
return (byte)0;
}
int getEncodeSize(T record);
void encode(ActiveMQBuffer buffer, T record);

View File

@ -164,6 +164,9 @@ public interface Message {
byte STREAM_TYPE = 6;
/** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/
byte EMBEDDED_TYPE = 7;
default void cleanupInternalProperties() {
// only on core
}
@ -438,6 +441,19 @@ public interface Message {
}
}
default org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
return putBytesProperty(key, value);
}
default byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return getBytesProperty(key);
}
default byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return (byte[])removeProperty(key);
}
default Object getDuplicateProperty() {
return null;
}

View File

@ -24,16 +24,24 @@ import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.utils.DataConstants;
public class CoreMessagePersister implements Persister<Message> {
public static final byte ID = 1;
public static CoreMessagePersister theInstance = new CoreMessagePersister();
public static CoreMessagePersister theInstance;
public static CoreMessagePersister getInstance() {
if (theInstance == null) {
theInstance = new CoreMessagePersister();
}
return theInstance;
}
protected CoreMessagePersister() {
}
@Override
public byte getID() {
return ID;
}
@Override
public int getEncodeSize(Message record) {

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
@ -94,6 +95,10 @@ public class AMQPMessage extends RefCountMessage {
Set<Object> rejectedConsumers;
/** These are properties set at the broker level..
* these are properties created by the broker only */
private volatile TypedProperties extraProperties;
public AMQPMessage(long messageFormat, byte[] data) {
this.data = Unpooled.wrappedBuffer(data);
this.messageFormat = messageFormat;
@ -331,7 +336,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
return AMQPMessagePersister.getInstance();
return AMQPMessagePersisterV2.getInstance();
}
@Override
@ -483,7 +488,7 @@ public class AMQPMessage extends RefCountMessage {
System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
newEncode.setDurable(isDurable());
newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
return newEncode;
}
@ -698,6 +703,50 @@ public class AMQPMessage extends RefCountMessage {
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
}
public TypedProperties createExtraProperties() {
if (extraProperties == null) {
extraProperties = new TypedProperties();
}
return extraProperties;
}
public TypedProperties getExtraProperties() {
return extraProperties;
}
public AMQPMessage setExtraProperties(TypedProperties extraProperties) {
this.extraProperties = extraProperties;
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
createExtraProperties().putBytesProperty(key, value);
return this;
}
@Override
public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
if (extraProperties == null) {
return null;
} else {
return extraProperties.getBytesProperty(key);
}
}
@Override
public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
if (extraProperties == null) {
return null;
} else {
return (byte[])extraProperties.removeProperty(key);
}
}
@Override
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
getApplicationPropertiesMap().put(key, Boolean.valueOf(value));

View File

@ -25,18 +25,23 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class AMQPMessagePersister extends MessagePersister {
public static AMQPMessagePersister theInstance = new AMQPMessagePersister();
public static final byte ID = 2;
public static AMQPMessagePersister theInstance;
public static AMQPMessagePersister getInstance() {
if (theInstance == null) {
theInstance = new AMQPMessagePersister();
}
return theInstance;
}
private AMQPMessagePersister() {
protected AMQPMessagePersister() {
}
@Override
protected byte getID() {
return ProtonProtocolManagerFactory.ID;
public byte getID() {
return ID;
}
@Override

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
public static final byte ID = 3;
public static AMQPMessagePersisterV2 theInstance;
public static AMQPMessagePersisterV2 getInstance() {
if (theInstance == null) {
theInstance = new AMQPMessagePersisterV2();
}
return theInstance;
}
@Override
public byte getID() {
return ID;
}
public AMQPMessagePersisterV2() {
super();
}
@Override
public int getEncodeSize(Message record) {
int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT;
TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
return encodeSize + (properties != null ? properties.getEncodeSize() : 0);
}
/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
super.encode(buffer, record);
TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
if (properties == null) {
buffer.writeInt(0);
} else {
buffer.writeInt(properties.getEncodeSize());
properties.encode(buffer.byteBuf());
}
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
AMQPMessage message = (AMQPMessage)super.decode(buffer, record);
int size = buffer.readInt();
if (size != 0) {
TypedProperties properties = new TypedProperties();
properties.decode(buffer.byteBuf());
message.setExtraProperties(properties);
}
return message;
}
}

View File

@ -32,8 +32,6 @@ import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<AmqpInterceptor> {
public static final byte ID = 2;
public static final String AMQP_PROTOCOL_NAME = "AMQP";
private static final String MODULE_NAME = "artemis-amqp-protocol";
@ -41,13 +39,10 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
@Override
public byte getStoreID() {
return ID;
}
public Persister<Message>[] getPersister() {
@Override
public Persister<Message> getPersister() {
return AMQPMessagePersister.getInstance();
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
return persisters;
}
@Override

View File

@ -31,12 +31,14 @@ import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128;
import org.apache.qpid.proton.amqp.Decimal32;
@ -170,6 +172,13 @@ public class AmqpCoreConverter {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
TypedProperties properties = message.getExtraProperties();
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
result.getInnerMessage().putBytesProperty(str, properties.getBytesProperty(str));
}
}
populateMessage(result, message.getProtonMessage());
result.getInnerMessage().setReplyTo(message.getReplyTo());

View File

@ -25,8 +25,15 @@ import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.commons.collections.map.HashedMap;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
@ -34,6 +41,7 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
@ -207,6 +215,42 @@ public class AMQPMessageTest {
assertEquals(0L, decoded.getTimestamp());
}
@Test
public void testExtraProperty() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
byte[] original = RandomUtil.randomBytes();
SimpleString name = SimpleString.toSimpleString("myProperty");
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
decoded.setAddress("someAddress");
decoded.setMessageID(33);
decoded.putExtraBytesProperty(name, original);
ICoreMessage coreMessage = decoded.toCore();
Assert.assertEquals(original, coreMessage.getBytesProperty(name));
ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
try {
decoded.getPersister().encode(buffer, decoded);
Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
} finally {
buffer.release();
}
{
ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
}
}
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);

View File

@ -238,7 +238,7 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/
byte[] ids = (byte[]) message.removeAnnotation(Message.HDR_SCALEDOWN_TO_IDS);
byte[] ids = (byte[]) message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
if (ids != null) {
ByteBuffer buffer = ByteBuffer.wrap(ids);
@ -268,7 +268,7 @@ public final class BindingsImpl implements Bindings {
if (!routed) {
// Remove the ids now, in order to avoid double check
ids = (byte[]) message.removeAnnotation(Message.HDR_ROUTE_TO_IDS);
ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
// Fetch the groupId now, in order to avoid double checking
SimpleString groupId = message.getGroupID();

View File

@ -1227,7 +1227,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
AtomicBoolean startedTX) throws Exception {
// Check the DuplicateCache for the Bridge first
Object bridgeDup = message.removeAnnotation(Message.HDR_BRIDGE_DUPLICATE_ID);
Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup;

View File

@ -90,6 +90,7 @@ import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
@ -686,7 +687,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
try {
final SessionSendMessage message = (SessionSendMessage) packet;
requiresResponse = message.isRequiresResponse();
this.session.send(message.getMessage(), this.direct);
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
if (requiresResponse) {
response = new NullResponseMessage();
}

View File

@ -32,19 +32,13 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
public static final byte ID = 1;
private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
private static final String MODULE_NAME = "artemis-server";
@Override
public byte getStoreID() {
return ID;
}
@Override
public Persister<Message> getPersister() {
return CoreMessagePersister.getInstance();
public Persister<Message>[] getPersister() {
return new Persister[]{CoreMessagePersister.getInstance()};
}
/**

View File

@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
@ -515,7 +516,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
if (transformer != null) {
@ -528,9 +529,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
" as transformedMessage");
}
}
return transformedMessage;
return EmbedMessageUtil.embedAsCoreMessage(transformedMessage);
} else {
return message;
return EmbedMessageUtil.embedAsCoreMessage(message);
}
}
@ -575,7 +576,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
dest = forwardingAddress;
} else {
// Preserve the original address
dest = message.getAddressSimpleString();
dest = ref.getMessage().getAddressSimpleString();
}
pendingAcks.countUp();

View File

@ -166,7 +166,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
if (queueIds == null) {
// Sanity check only
@ -180,7 +180,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
}
messageCopy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy);

View File

@ -315,7 +315,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
* @param message
*/
private void addRouteContextToMessage(final Message message) {
byte[] ids = message.getBytesProperty(idsHeaderName);
byte[] ids = message.getExtraBytesProperty(idsHeaderName);
if (ids == null) {
ids = new byte[8];
@ -331,7 +331,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
buff.putLong(remoteQueueID);
message.putBytesProperty(idsHeaderName, ids);
message.putExtraBytesProperty(idsHeaderName, ids);
if (logger.isTraceEnabled()) {
logger.trace("Adding remoteQueue ID = " + remoteQueueID + " into message=" + message + " store-forward-queue=" + storeAndForwardQueue);

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.spi.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.jboss.logging.Logger;
public class EmbedMessageUtil {
private static final byte[] signature = new byte[]{(byte) 'E', (byte) 'M', (byte) 'B'};
private static final Logger logger = Logger.getLogger(EmbedMessageUtil.class);
public static ICoreMessage embedAsCoreMessage(Message source) {
if (source instanceof ICoreMessage) {
return (ICoreMessage) source;
} else {
Persister persister = source.getPersister();
CoreMessage message = new CoreMessage(source.getMessageID(), persister.getEncodeSize(source) + signature.length + CoreMessage.BODY_OFFSET).setType(Message.EMBEDDED_TYPE);
ActiveMQBuffer buffer = message.getBodyBuffer();
buffer.writeBytes(signature);
persister.encode(buffer, source);
return message;
}
}
public static Message extractEmbedded(ICoreMessage message) {
if (message.getType() == Message.EMBEDDED_TYPE) {
ActiveMQBuffer buffer = message.getReadOnlyBodyBuffer();
if (buffer.readableBytes() < signature.length || !checkSignature(buffer)) {
if (!logger.isTraceEnabled()) {
logger.trace("Message type " + Message.EMBEDDED_TYPE + " was used for something other than embed messages, ignoring content and treating as a regular message");
}
return message;
}
try {
return MessagePersister.getInstance().decode(buffer, null);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return message;
}
} else {
return message;
}
}
private static boolean checkSignature(final ActiveMQBuffer buffer) {
return buffer.readByte() == signature[0] && buffer.readByte() == signature[1] && buffer.readByte() == signature[2];
}
}

View File

@ -17,15 +17,12 @@
package org.apache.activemq.artemis.spi.core.protocol;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.jboss.logging.Logger;
public class MessagePersister implements Persister<Message> {
@ -35,33 +32,47 @@ public class MessagePersister implements Persister<Message> {
private static final MessagePersister theInstance = new MessagePersister();
/** This will be used for reading messages */
private static Map<Byte, Persister<Message>> protocols = new ConcurrentHashMap<>();
private static final int MAX_PERSISTERS = 3;
private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS];
static {
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
CoreMessagePersister persister = CoreMessagePersister.getInstance();
MessagePersister.registerPersister(persister);
Iterable<ProtocolManagerFactory> protocols = ServiceLoader.load(ProtocolManagerFactory.class, MessagePersister.class.getClassLoader());
for (ProtocolManagerFactory next : protocols) {
MessagePersister.registerPersister(next.getStoreID(), next.getPersister());
registerProtocol(next);
}
}
public static void registerProtocol(ProtocolManagerFactory manager) {
Persister<Message> messagePersister = manager.getPersister();
if (messagePersister == null) {
Persister<Message>[] messagePersisters = manager.getPersister();
if (messagePersisters == null || messagePersisters.length == 0) {
logger.debug("Cannot find persister for " + manager);
} else {
registerPersister(manager.getStoreID(), manager.getPersister());
for (Persister p : messagePersisters) {
registerPersister(p);
}
}
}
public static void clearPersisters() {
protocols.clear();
for (int i = 0; i < persisters.length; i++) {
persisters[i] = null;
}
}
public static void registerPersister(byte recordType, Persister<Message> persister) {
public static Persister getPersister(byte id) {
if (id == 0 || id > MAX_PERSISTERS) {
return null;
}
return persisters[id - 1];
}
public static void registerPersister(Persister<Message> persister) {
if (persister != null) {
protocols.put(recordType, persister);
assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
persisters[persister.getID() - 1] = persister;
}
}
@ -73,10 +84,6 @@ public class MessagePersister implements Persister<Message> {
protected MessagePersister() {
}
protected byte getID() {
return (byte)0;
}
@Override
public int getEncodeSize(Message record) {
return 0;
@ -92,7 +99,7 @@ public class MessagePersister implements Persister<Message> {
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
byte protocol = buffer.readByte();
Persister<Message> persister = protocols.get(protocol);
Persister<Message> persister = getPersister(protocol);
if (persister == null) {
throw new NullPointerException("couldn't find factory for type=" + protocol);
}

View File

@ -26,16 +26,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
public interface ProtocolManagerFactory<P extends BaseInterceptor> {
/** This is to be used to store the protocol-id on Messages.
* Messages are stored on their bare format.
* The protocol manager will be responsible to code or decode messages.
* The caveat here is that the first short-sized bytes need to be this constant. */
default byte getStoreID() {
return (byte)0;
}
default Persister<Message> getPersister() {
return null;
default Persister<Message>[] getPersister() {
return new Persister[]{};
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AMQPMessageLoadBalancingTest extends ClusterTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
start();
}
private void start() throws Exception {
setupServers();
setRedistributionDelay(0);
}
protected boolean isNetty() {
return true;
}
@Test
public void testLoadBalanceAMQP() throws Exception {
setupCluster(MessageLoadBalancingType.STRICT);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
createQueue(1, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
final int NUMBER_OF_MESSAGES = 100;
// sending AMQP Messages.. they should be load balanced
{
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("queues.0"));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
connection.close();
}
receiveOnBothNodes(NUMBER_OF_MESSAGES);
// If a user used a message type = 7, for messages that are not embedded,
// it should still be treated as a normal message
{
ClientSession sessionProducer = sfs[0].createSession();
ClientProducer producer = sessionProducer.createProducer("queues.0");
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
// The user is mistakenly using the same type we used for embedded messages. it should still work
ClientMessage message = sessionProducer.createMessage(Message.EMBEDDED_TYPE, true).putIntProperty("i", i);
message.getBodyBuffer().writeString("hello!");
producer.send(message);
// will send 2 messages.. one with stuff, another empty
message = sessionProducer.createMessage(Message.EMBEDDED_TYPE, true);
producer.send(message);
}
receiveOnBothNodes(NUMBER_OF_MESSAGES * 2);
}
}
private void receiveOnBothNodes(int NUMBER_OF_MESSAGES) throws ActiveMQException {
for (int x = 0; x <= 1; x++) {
ClientSession sessionX = sfs[x].createSession();
ClientConsumer consumerX = sessionX.createConsumer("queues.0");
sessionX.start();
for (int i = 0; i < NUMBER_OF_MESSAGES / 2; i++) {
ClientMessage msg = consumerX.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
}
Assert.assertNull(consumerX.receiveImmediate());
sessionX.commit();
sessionX.close();
}
}
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
}
protected void setRedistributionDelay(final long delay) {
AddressSettings as = new AddressSettings().setRedistributionDelay(delay);
getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
}
protected void setupServers() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
}
protected void stopServers() throws Exception {
closeAllConsumers();
closeAllSessionFactories();
closeAllServerLocatorsFactories();
stopServers(0, 1);
clearServer(0, 1);
}
}

View File

@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
@ -52,8 +51,8 @@ public class PageTest extends ActiveMQTestBase {
@Before
public void registerProtocols() {
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
MessagePersister.registerPersister((byte)2, AMQPMessagePersister.getInstance());
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
MessagePersister.registerPersister(AMQPMessagePersister.getInstance());
}
@Test

View File

@ -51,14 +51,12 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
@ -75,8 +73,8 @@ import org.junit.Test;
public class PagingStoreImplTest extends ActiveMQTestBase {
static {
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
MessagePersister.registerPersister(ProtonProtocolManagerFactory.ID, AMQPMessagePersister.getInstance());
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
MessagePersister.registerPersister(AMQPMessagePersister.getInstance());
}
private static final SimpleString destinationTestName = new SimpleString("test");