This closes #1493
This commit is contained in:
commit
56cbed7294
|
@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -64,7 +63,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
|||
public class PrintData extends OptionalLocking {
|
||||
|
||||
static {
|
||||
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
|
||||
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
|
||||
public interface Persister<T extends Object> {
|
||||
|
||||
/** This is to be used to store the protocol-id on Messages.
|
||||
* Messages are stored on their bare format.
|
||||
* The protocol manager will be responsible to code or decode messages.
|
||||
* The caveat here is that the first short-sized bytes need to be this constant. */
|
||||
default byte getID() {
|
||||
return (byte)0;
|
||||
}
|
||||
|
||||
int getEncodeSize(T record);
|
||||
|
||||
void encode(ActiveMQBuffer buffer, T record);
|
||||
|
|
|
@ -164,6 +164,9 @@ public interface Message {
|
|||
|
||||
byte STREAM_TYPE = 6;
|
||||
|
||||
/** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/
|
||||
byte EMBEDDED_TYPE = 7;
|
||||
|
||||
default void cleanupInternalProperties() {
|
||||
// only on core
|
||||
}
|
||||
|
@ -438,6 +441,19 @@ public interface Message {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
default org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
|
||||
return putBytesProperty(key, value);
|
||||
}
|
||||
|
||||
default byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
return getBytesProperty(key);
|
||||
}
|
||||
|
||||
default byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
return (byte[])removeProperty(key);
|
||||
}
|
||||
|
||||
default Object getDuplicateProperty() {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -24,16 +24,24 @@ import org.apache.activemq.artemis.core.persistence.Persister;
|
|||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class CoreMessagePersister implements Persister<Message> {
|
||||
public static final byte ID = 1;
|
||||
|
||||
public static CoreMessagePersister theInstance = new CoreMessagePersister();
|
||||
public static CoreMessagePersister theInstance;
|
||||
|
||||
public static CoreMessagePersister getInstance() {
|
||||
if (theInstance == null) {
|
||||
theInstance = new CoreMessagePersister();
|
||||
}
|
||||
return theInstance;
|
||||
}
|
||||
|
||||
protected CoreMessagePersister() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getID() {
|
||||
return ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEncodeSize(Message record) {
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
|
|||
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||
|
@ -94,6 +95,10 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
Set<Object> rejectedConsumers;
|
||||
|
||||
/** These are properties set at the broker level..
|
||||
* these are properties created by the broker only */
|
||||
private volatile TypedProperties extraProperties;
|
||||
|
||||
public AMQPMessage(long messageFormat, byte[] data) {
|
||||
this.data = Unpooled.wrappedBuffer(data);
|
||||
this.messageFormat = messageFormat;
|
||||
|
@ -331,7 +336,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
@Override
|
||||
public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
|
||||
return AMQPMessagePersister.getInstance();
|
||||
return AMQPMessagePersisterV2.getInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -483,7 +488,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
|
||||
|
||||
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
|
||||
newEncode.setDurable(isDurable());
|
||||
newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
|
||||
return newEncode;
|
||||
}
|
||||
|
||||
|
@ -698,6 +703,50 @@ public class AMQPMessage extends RefCountMessage {
|
|||
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
|
||||
}
|
||||
|
||||
public TypedProperties createExtraProperties() {
|
||||
if (extraProperties == null) {
|
||||
extraProperties = new TypedProperties();
|
||||
}
|
||||
return extraProperties;
|
||||
}
|
||||
|
||||
public TypedProperties getExtraProperties() {
|
||||
return extraProperties;
|
||||
}
|
||||
|
||||
public AMQPMessage setExtraProperties(TypedProperties extraProperties) {
|
||||
this.extraProperties = extraProperties;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
|
||||
createExtraProperties().putBytesProperty(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
if (extraProperties == null) {
|
||||
return null;
|
||||
} else {
|
||||
return extraProperties.getBytesProperty(key);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
if (extraProperties == null) {
|
||||
return null;
|
||||
} else {
|
||||
return (byte[])extraProperties.removeProperty(key);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
|
||||
getApplicationPropertiesMap().put(key, Boolean.valueOf(value));
|
||||
|
|
|
@ -25,18 +25,23 @@ import org.apache.activemq.artemis.utils.DataConstants;
|
|||
|
||||
public class AMQPMessagePersister extends MessagePersister {
|
||||
|
||||
public static AMQPMessagePersister theInstance = new AMQPMessagePersister();
|
||||
public static final byte ID = 2;
|
||||
|
||||
public static AMQPMessagePersister theInstance;
|
||||
|
||||
public static AMQPMessagePersister getInstance() {
|
||||
if (theInstance == null) {
|
||||
theInstance = new AMQPMessagePersister();
|
||||
}
|
||||
return theInstance;
|
||||
}
|
||||
|
||||
private AMQPMessagePersister() {
|
||||
protected AMQPMessagePersister() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte getID() {
|
||||
return ProtonProtocolManagerFactory.ID;
|
||||
public byte getID() {
|
||||
return ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.protocol.amqp.broker;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
|
||||
public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
|
||||
public static final byte ID = 3;
|
||||
|
||||
public static AMQPMessagePersisterV2 theInstance;
|
||||
|
||||
public static AMQPMessagePersisterV2 getInstance() {
|
||||
if (theInstance == null) {
|
||||
theInstance = new AMQPMessagePersisterV2();
|
||||
}
|
||||
return theInstance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getID() {
|
||||
return ID;
|
||||
}
|
||||
|
||||
public AMQPMessagePersisterV2() {
|
||||
super();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getEncodeSize(Message record) {
|
||||
int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT;
|
||||
|
||||
TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
|
||||
|
||||
return encodeSize + (properties != null ? properties.getEncodeSize() : 0);
|
||||
}
|
||||
|
||||
|
||||
/** Sub classes must add the first short as the protocol-id */
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer, Message record) {
|
||||
super.encode(buffer, record);
|
||||
|
||||
TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
|
||||
if (properties == null) {
|
||||
buffer.writeInt(0);
|
||||
} else {
|
||||
buffer.writeInt(properties.getEncodeSize());
|
||||
properties.encode(buffer.byteBuf());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Message decode(ActiveMQBuffer buffer, Message record) {
|
||||
AMQPMessage message = (AMQPMessage)super.decode(buffer, record);
|
||||
int size = buffer.readInt();
|
||||
|
||||
if (size != 0) {
|
||||
TypedProperties properties = new TypedProperties();
|
||||
properties.decode(buffer.byteBuf());
|
||||
message.setExtraProperties(properties);
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
}
|
|
@ -32,8 +32,6 @@ import org.osgi.service.component.annotations.Component;
|
|||
@Component(service = ProtocolManagerFactory.class)
|
||||
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<AmqpInterceptor> {
|
||||
|
||||
public static final byte ID = 2;
|
||||
|
||||
public static final String AMQP_PROTOCOL_NAME = "AMQP";
|
||||
|
||||
private static final String MODULE_NAME = "artemis-amqp-protocol";
|
||||
|
@ -41,13 +39,10 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
|
|||
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
|
||||
|
||||
@Override
|
||||
public byte getStoreID() {
|
||||
return ID;
|
||||
}
|
||||
public Persister<Message>[] getPersister() {
|
||||
|
||||
@Override
|
||||
public Persister<Message> getPersister() {
|
||||
return AMQPMessagePersister.getInstance();
|
||||
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
|
||||
return persisters;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,12 +31,14 @@ import java.util.Set;
|
|||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
|
||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Decimal128;
|
||||
import org.apache.qpid.proton.amqp.Decimal32;
|
||||
|
@ -170,6 +172,13 @@ public class AmqpCoreConverter {
|
|||
throw new RuntimeException("Unexpected body type: " + body.getClass());
|
||||
}
|
||||
|
||||
TypedProperties properties = message.getExtraProperties();
|
||||
if (properties != null) {
|
||||
for (SimpleString str : properties.getPropertyNames()) {
|
||||
result.getInnerMessage().putBytesProperty(str, properties.getBytesProperty(str));
|
||||
}
|
||||
}
|
||||
|
||||
populateMessage(result, message.getProtonMessage());
|
||||
result.getInnerMessage().setReplyTo(message.getReplyTo());
|
||||
|
||||
|
|
|
@ -25,8 +25,15 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.commons.collections.map.HashedMap;
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
|
@ -34,6 +41,7 @@ import org.apache.qpid.proton.amqp.messaging.Header;
|
|||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -207,6 +215,42 @@ public class AMQPMessageTest {
|
|||
assertEquals(0L, decoded.getTimestamp());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtraProperty() {
|
||||
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
|
||||
|
||||
byte[] original = RandomUtil.randomBytes();
|
||||
SimpleString name = SimpleString.toSimpleString("myProperty");
|
||||
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
|
||||
decoded.setAddress("someAddress");
|
||||
decoded.setMessageID(33);
|
||||
decoded.putExtraBytesProperty(name, original);
|
||||
|
||||
ICoreMessage coreMessage = decoded.toCore();
|
||||
Assert.assertEquals(original, coreMessage.getBytesProperty(name));
|
||||
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
|
||||
try {
|
||||
decoded.getPersister().encode(buffer, decoded);
|
||||
Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
|
||||
AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
|
||||
Assert.assertEquals(33, readMessage.getMessageID());
|
||||
Assert.assertEquals("someAddress", readMessage.getAddress());
|
||||
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
|
||||
} finally {
|
||||
buffer.release();
|
||||
}
|
||||
|
||||
{
|
||||
ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
|
||||
AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
|
||||
Assert.assertEquals(33, readMessage.getMessageID());
|
||||
Assert.assertEquals("someAddress", readMessage.getAddress());
|
||||
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
|
||||
ByteBuf nettyBuffer = Unpooled.buffer(1500);
|
||||
|
||||
|
|
|
@ -238,7 +238,7 @@ public final class BindingsImpl implements Bindings {
|
|||
/* This is a special treatment for scaled-down messages involving SnF queues.
|
||||
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
|
||||
*/
|
||||
byte[] ids = (byte[]) message.removeAnnotation(Message.HDR_SCALEDOWN_TO_IDS);
|
||||
byte[] ids = (byte[]) message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
|
||||
|
||||
if (ids != null) {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(ids);
|
||||
|
@ -268,7 +268,7 @@ public final class BindingsImpl implements Bindings {
|
|||
|
||||
if (!routed) {
|
||||
// Remove the ids now, in order to avoid double check
|
||||
ids = (byte[]) message.removeAnnotation(Message.HDR_ROUTE_TO_IDS);
|
||||
ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
|
||||
|
||||
// Fetch the groupId now, in order to avoid double checking
|
||||
SimpleString groupId = message.getGroupID();
|
||||
|
|
|
@ -1227,7 +1227,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
AtomicBoolean startedTX) throws Exception {
|
||||
// Check the DuplicateCache for the Bridge first
|
||||
|
||||
Object bridgeDup = message.removeAnnotation(Message.HDR_BRIDGE_DUPLICATE_ID);
|
||||
Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
|
||||
if (bridgeDup != null) {
|
||||
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
|
||||
byte[] bridgeDupBytes = (byte[]) bridgeDup;
|
||||
|
|
|
@ -90,6 +90,7 @@ import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
|||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.utils.SimpleFuture;
|
||||
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
||||
|
@ -686,7 +687,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
try {
|
||||
final SessionSendMessage message = (SessionSendMessage) packet;
|
||||
requiresResponse = message.isRequiresResponse();
|
||||
this.session.send(message.getMessage(), this.direct);
|
||||
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
}
|
||||
|
|
|
@ -32,19 +32,13 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
|
|||
|
||||
public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
|
||||
|
||||
public static final byte ID = 1;
|
||||
private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
|
||||
|
||||
private static final String MODULE_NAME = "artemis-server";
|
||||
|
||||
@Override
|
||||
public byte getStoreID() {
|
||||
return ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Persister<Message> getPersister() {
|
||||
return CoreMessagePersister.getInstance();
|
||||
public Persister<Message>[] getPersister() {
|
||||
return new Persister[]{CoreMessagePersister.getInstance()};
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
|||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
|
@ -515,7 +516,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
|
||||
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
|
||||
|
||||
message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
|
||||
message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
|
||||
}
|
||||
|
||||
if (transformer != null) {
|
||||
|
@ -528,9 +529,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
" as transformedMessage");
|
||||
}
|
||||
}
|
||||
return transformedMessage;
|
||||
return EmbedMessageUtil.embedAsCoreMessage(transformedMessage);
|
||||
} else {
|
||||
return message;
|
||||
return EmbedMessageUtil.embedAsCoreMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -575,7 +576,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
dest = forwardingAddress;
|
||||
} else {
|
||||
// Preserve the original address
|
||||
dest = message.getAddressSimpleString();
|
||||
dest = ref.getMessage().getAddressSimpleString();
|
||||
}
|
||||
|
||||
pendingAcks.countUp();
|
||||
|
|
|
@ -166,7 +166,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
|||
|
||||
Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
|
||||
|
||||
byte[] queueIds = message.getBytesProperty(idsHeaderName);
|
||||
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
|
||||
|
||||
if (queueIds == null) {
|
||||
// Sanity check only
|
||||
|
@ -180,7 +180,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
|||
}
|
||||
}
|
||||
|
||||
messageCopy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
|
||||
messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
|
||||
|
||||
messageCopy = super.beforeForward(messageCopy);
|
||||
|
||||
|
|
|
@ -315,7 +315,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
|
|||
* @param message
|
||||
*/
|
||||
private void addRouteContextToMessage(final Message message) {
|
||||
byte[] ids = message.getBytesProperty(idsHeaderName);
|
||||
byte[] ids = message.getExtraBytesProperty(idsHeaderName);
|
||||
|
||||
if (ids == null) {
|
||||
ids = new byte[8];
|
||||
|
@ -331,7 +331,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
|
|||
|
||||
buff.putLong(remoteQueueID);
|
||||
|
||||
message.putBytesProperty(idsHeaderName, ids);
|
||||
message.putExtraBytesProperty(idsHeaderName, ids);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Adding remoteQueue ID = " + remoteQueueID + " into message=" + message + " store-forward-queue=" + storeAndForwardQueue);
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.spi.core.protocol;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class EmbedMessageUtil {
|
||||
|
||||
private static final byte[] signature = new byte[]{(byte) 'E', (byte) 'M', (byte) 'B'};
|
||||
|
||||
private static final Logger logger = Logger.getLogger(EmbedMessageUtil.class);
|
||||
|
||||
public static ICoreMessage embedAsCoreMessage(Message source) {
|
||||
|
||||
if (source instanceof ICoreMessage) {
|
||||
return (ICoreMessage) source;
|
||||
} else {
|
||||
Persister persister = source.getPersister();
|
||||
|
||||
CoreMessage message = new CoreMessage(source.getMessageID(), persister.getEncodeSize(source) + signature.length + CoreMessage.BODY_OFFSET).setType(Message.EMBEDDED_TYPE);
|
||||
|
||||
ActiveMQBuffer buffer = message.getBodyBuffer();
|
||||
buffer.writeBytes(signature);
|
||||
persister.encode(buffer, source);
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
||||
public static Message extractEmbedded(ICoreMessage message) {
|
||||
if (message.getType() == Message.EMBEDDED_TYPE) {
|
||||
ActiveMQBuffer buffer = message.getReadOnlyBodyBuffer();
|
||||
|
||||
if (buffer.readableBytes() < signature.length || !checkSignature(buffer)) {
|
||||
if (!logger.isTraceEnabled()) {
|
||||
logger.trace("Message type " + Message.EMBEDDED_TYPE + " was used for something other than embed messages, ignoring content and treating as a regular message");
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
try {
|
||||
return MessagePersister.getInstance().decode(buffer, null);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return message;
|
||||
}
|
||||
} else {
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean checkSignature(final ActiveMQBuffer buffer) {
|
||||
return buffer.readByte() == signature[0] && buffer.readByte() == signature[1] && buffer.readByte() == signature[2];
|
||||
}
|
||||
}
|
|
@ -17,15 +17,12 @@
|
|||
|
||||
package org.apache.activemq.artemis.spi.core.protocol;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
|
||||
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class MessagePersister implements Persister<Message> {
|
||||
|
@ -35,33 +32,47 @@ public class MessagePersister implements Persister<Message> {
|
|||
private static final MessagePersister theInstance = new MessagePersister();
|
||||
|
||||
/** This will be used for reading messages */
|
||||
private static Map<Byte, Persister<Message>> protocols = new ConcurrentHashMap<>();
|
||||
private static final int MAX_PERSISTERS = 3;
|
||||
private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS];
|
||||
|
||||
static {
|
||||
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
|
||||
CoreMessagePersister persister = CoreMessagePersister.getInstance();
|
||||
MessagePersister.registerPersister(persister);
|
||||
|
||||
Iterable<ProtocolManagerFactory> protocols = ServiceLoader.load(ProtocolManagerFactory.class, MessagePersister.class.getClassLoader());
|
||||
for (ProtocolManagerFactory next : protocols) {
|
||||
MessagePersister.registerPersister(next.getStoreID(), next.getPersister());
|
||||
registerProtocol(next);
|
||||
}
|
||||
}
|
||||
|
||||
public static void registerProtocol(ProtocolManagerFactory manager) {
|
||||
Persister<Message> messagePersister = manager.getPersister();
|
||||
if (messagePersister == null) {
|
||||
Persister<Message>[] messagePersisters = manager.getPersister();
|
||||
if (messagePersisters == null || messagePersisters.length == 0) {
|
||||
logger.debug("Cannot find persister for " + manager);
|
||||
} else {
|
||||
registerPersister(manager.getStoreID(), manager.getPersister());
|
||||
for (Persister p : messagePersisters) {
|
||||
registerPersister(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void clearPersisters() {
|
||||
protocols.clear();
|
||||
for (int i = 0; i < persisters.length; i++) {
|
||||
persisters[i] = null;
|
||||
}
|
||||
}
|
||||
|
||||
public static void registerPersister(byte recordType, Persister<Message> persister) {
|
||||
public static Persister getPersister(byte id) {
|
||||
if (id == 0 || id > MAX_PERSISTERS) {
|
||||
return null;
|
||||
}
|
||||
return persisters[id - 1];
|
||||
}
|
||||
|
||||
public static void registerPersister(Persister<Message> persister) {
|
||||
if (persister != null) {
|
||||
protocols.put(recordType, persister);
|
||||
assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
|
||||
persisters[persister.getID() - 1] = persister;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,10 +84,6 @@ public class MessagePersister implements Persister<Message> {
|
|||
protected MessagePersister() {
|
||||
}
|
||||
|
||||
protected byte getID() {
|
||||
return (byte)0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEncodeSize(Message record) {
|
||||
return 0;
|
||||
|
@ -92,7 +99,7 @@ public class MessagePersister implements Persister<Message> {
|
|||
@Override
|
||||
public Message decode(ActiveMQBuffer buffer, Message record) {
|
||||
byte protocol = buffer.readByte();
|
||||
Persister<Message> persister = protocols.get(protocol);
|
||||
Persister<Message> persister = getPersister(protocol);
|
||||
if (persister == null) {
|
||||
throw new NullPointerException("couldn't find factory for type=" + protocol);
|
||||
}
|
||||
|
|
|
@ -26,16 +26,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
|
||||
public interface ProtocolManagerFactory<P extends BaseInterceptor> {
|
||||
|
||||
/** This is to be used to store the protocol-id on Messages.
|
||||
* Messages are stored on their bare format.
|
||||
* The protocol manager will be responsible to code or decode messages.
|
||||
* The caveat here is that the first short-sized bytes need to be this constant. */
|
||||
default byte getStoreID() {
|
||||
return (byte)0;
|
||||
}
|
||||
|
||||
default Persister<Message> getPersister() {
|
||||
return null;
|
||||
default Persister<Message>[] getPersister() {
|
||||
return new Persister[]{};
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AMQPMessageLoadBalancingTest extends ClusterTestBase {
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
start();
|
||||
}
|
||||
|
||||
private void start() throws Exception {
|
||||
setupServers();
|
||||
|
||||
setRedistributionDelay(0);
|
||||
}
|
||||
|
||||
protected boolean isNetty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadBalanceAMQP() throws Exception {
|
||||
setupCluster(MessageLoadBalancingType.STRICT);
|
||||
|
||||
startServers(0, 1);
|
||||
|
||||
setupSessionFactory(0, isNetty());
|
||||
setupSessionFactory(1, isNetty());
|
||||
|
||||
createQueue(0, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
|
||||
createQueue(1, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
|
||||
|
||||
final int NUMBER_OF_MESSAGES = 100;
|
||||
|
||||
// sending AMQP Messages.. they should be load balanced
|
||||
{
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageProducer producer = session.createProducer(session.createQueue("queues.0"));
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
producer.send(session.createTextMessage("hello " + i));
|
||||
}
|
||||
session.commit();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
receiveOnBothNodes(NUMBER_OF_MESSAGES);
|
||||
|
||||
// If a user used a message type = 7, for messages that are not embedded,
|
||||
// it should still be treated as a normal message
|
||||
{
|
||||
ClientSession sessionProducer = sfs[0].createSession();
|
||||
ClientProducer producer = sessionProducer.createProducer("queues.0");
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
// The user is mistakenly using the same type we used for embedded messages. it should still work
|
||||
ClientMessage message = sessionProducer.createMessage(Message.EMBEDDED_TYPE, true).putIntProperty("i", i);
|
||||
message.getBodyBuffer().writeString("hello!");
|
||||
producer.send(message);
|
||||
|
||||
// will send 2 messages.. one with stuff, another empty
|
||||
message = sessionProducer.createMessage(Message.EMBEDDED_TYPE, true);
|
||||
producer.send(message);
|
||||
}
|
||||
receiveOnBothNodes(NUMBER_OF_MESSAGES * 2);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void receiveOnBothNodes(int NUMBER_OF_MESSAGES) throws ActiveMQException {
|
||||
for (int x = 0; x <= 1; x++) {
|
||||
ClientSession sessionX = sfs[x].createSession();
|
||||
ClientConsumer consumerX = sessionX.createConsumer("queues.0");
|
||||
sessionX.start();
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES / 2; i++) {
|
||||
ClientMessage msg = consumerX.receive(5000);
|
||||
Assert.assertNotNull(msg);
|
||||
msg.acknowledge();
|
||||
}
|
||||
Assert.assertNull(consumerX.receiveImmediate());
|
||||
sessionX.commit();
|
||||
sessionX.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
|
||||
|
||||
setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
|
||||
}
|
||||
|
||||
protected void setRedistributionDelay(final long delay) {
|
||||
AddressSettings as = new AddressSettings().setRedistributionDelay(delay);
|
||||
|
||||
getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
|
||||
getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
|
||||
}
|
||||
|
||||
protected void setupServers() throws Exception {
|
||||
setupServer(0, isFileStorage(), isNetty());
|
||||
setupServer(1, isFileStorage(), isNetty());
|
||||
|
||||
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
|
||||
servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
|
||||
}
|
||||
|
||||
protected void stopServers() throws Exception {
|
||||
closeAllConsumers();
|
||||
|
||||
closeAllSessionFactories();
|
||||
|
||||
closeAllServerLocatorsFactories();
|
||||
|
||||
stopServers(0, 1);
|
||||
|
||||
clearServer(0, 1);
|
||||
}
|
||||
|
||||
}
|
|
@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
|
|||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
|
||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
|
||||
|
@ -52,8 +51,8 @@ public class PageTest extends ActiveMQTestBase {
|
|||
|
||||
@Before
|
||||
public void registerProtocols() {
|
||||
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
|
||||
MessagePersister.registerPersister((byte)2, AMQPMessagePersister.getInstance());
|
||||
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
|
||||
MessagePersister.registerPersister(AMQPMessagePersister.getInstance());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -51,14 +51,12 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
|
|||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
|
||||
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
|
||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
|
||||
|
@ -75,8 +73,8 @@ import org.junit.Test;
|
|||
public class PagingStoreImplTest extends ActiveMQTestBase {
|
||||
|
||||
static {
|
||||
MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
|
||||
MessagePersister.registerPersister(ProtonProtocolManagerFactory.ID, AMQPMessagePersister.getInstance());
|
||||
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
|
||||
MessagePersister.registerPersister(AMQPMessagePersister.getInstance());
|
||||
}
|
||||
|
||||
private static final SimpleString destinationTestName = new SimpleString("test");
|
||||
|
|
Loading…
Reference in New Issue