From 7576b9d5866e077d6eb6cba0ab035dbe09527ff4 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 15 Jan 2018 16:36:01 +0000 Subject: [PATCH] ARTEMIS-1611 Added support for 1.x transformer API --- .../artemis/utils/TypedProperties.java | 5 + .../artemis/core/message/BodyEncoder.java | 51 ++ .../core/message/impl/MessageInternal.java | 58 ++ .../message/impl/MessageInternalImpl.java | 698 ++++++++++++++++++ .../artemis/core/server/ServerMessage.java | 57 ++ .../core/server/cluster/Transformer.java | 17 +- .../server/transformer/ServerMessageImpl.java | 193 +++++ .../cluster/bridge/BridgeTest.java | 7 +- 8 files changed, 1084 insertions(+), 2 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java index 29cf8348e9..05b401b6d9 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java @@ -23,4 +23,9 @@ package org.apache.activemq.artemis.utils; @Deprecated public class TypedProperties extends org.apache.activemq.artemis.utils.collections.TypedProperties { + public TypedProperties() { } + + public TypedProperties(final org.apache.activemq.artemis.utils.collections.TypedProperties other) { + super(other); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java new file mode 100644 index 0000000000..b6c4092de4 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message; + +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; + +@Deprecated +public interface BodyEncoder { + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + void open() throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + void close() throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + int encode(ByteBuffer bufferRead) throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + long getLargeBodySize(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java new file mode 100644 index 0000000000..dc0bb104b1 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message.impl; + +import java.io.InputStream; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.BodyEncoder; +import org.apache.activemq.artemis.utils.TypedProperties; + +@Deprecated +public interface MessageInternal extends Message { + + void decodeFromBuffer(ActiveMQBuffer buffer); + + int getEndOfMessagePosition(); + + int getEndOfBodyPosition(); + + void bodyChanged(); + + boolean isServerMessage(); + + ActiveMQBuffer getEncodedBuffer(); + + int getHeadersAndPropertiesEncodeSize(); + + ActiveMQBuffer getWholeBuffer(); + + void encodeHeadersAndProperties(ActiveMQBuffer buffer); + + void decodeHeadersAndProperties(ActiveMQBuffer buffer); + + BodyEncoder getBodyEncoder() throws ActiveMQException; + + InputStream getBodyInputStream(); + + void setAddressTransient(SimpleString address); + + TypedProperties getTypedProperties(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java new file mode 100644 index 0000000000..56ff816935 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java @@ -0,0 +1,698 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message.impl; + +import java.io.InputStream; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessageListener; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.BodyEncoder; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.utils.TypedProperties; + +@Deprecated +public class MessageInternalImpl implements MessageInternal { + + private CoreMessage message; + + public MessageInternalImpl(ICoreMessage message) { + this.message = (CoreMessage) message; + } + + @Override + public void decodeFromBuffer(ActiveMQBuffer buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public int getEndOfMessagePosition() { + throw new UnsupportedOperationException(); + } + + @Override + public int getEndOfBodyPosition() { + throw new UnsupportedOperationException(); + } + + @Override + public void bodyChanged() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isServerMessage() { + return true; + } + + @Override + public ActiveMQBuffer getEncodedBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public int getHeadersAndPropertiesEncodeSize() { + return message.getHeadersAndPropertiesEncodeSize(); + } + + @Override + public ActiveMQBuffer getWholeBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public void encodeHeadersAndProperties(ActiveMQBuffer buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public void decodeHeadersAndProperties(ActiveMQBuffer buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public BodyEncoder getBodyEncoder() throws ActiveMQException { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getBodyInputStream() { + return message.getBodyInputStream(); + } + + @Override + public void messageChanged() { + throw new UnsupportedOperationException(); + } + + /** + * Used to calculate what is the delivery time. + * Return null if not scheduled. + */ + @Override + public Long getScheduledDeliveryTime() { + return message.getScheduledDeliveryTime(); + } + + /** + * Context can be used by the application server to inject extra control, like a protocol specific on the server. + * There is only one per Object, use it wisely! + * + * Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo + */ + @Override + public RefCountMessageListener getContext() { + throw new UnsupportedOperationException(); + } + + @Override + public SimpleString getReplyTo() { + return message.getReplyTo(); + } + + @Override + public Message setReplyTo(SimpleString address) { + message.setReplyTo(address); + return this; + } + + @Override + public Message setContext(RefCountMessageListener context) { + throw new UnsupportedOperationException(); + } + + /** + * The buffer will belong to this message, until release is called. + * + * @param buffer + */ + @Override + public Message setBuffer(ByteBuf buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBuffer() { + return message.getBuffer(); + } + + /** + * It will generate a new instance of the message encode, being a deep copy, new properties, new everything + */ + @Override + public Message copy() { + return message.copy(); + } + + /** + * It will generate a new instance of the message encode, being a deep copy, new properties, new everything + * + * @param newID + */ + @Override + public Message copy(long newID) { + return message.copy(newID); + } + + /** + * Returns the messageID. + *
+ * The messageID is set when the message is handled by the server. + */ + @Override + public long getMessageID() { + return message.getMessageID(); + } + + @Override + public Message setMessageID(long id) { + message.setMessageID(id); + return this; + } + + /** + * Returns the expiration time of this message. + */ + @Override + public long getExpiration() { + return message.getExpiration(); + } + + /** + * Sets the expiration of this message. + * + * @param expiration expiration time + */ + @Override + public Message setExpiration(long expiration) { + message.setExpiration(expiration); + return this; + } + + /** + * This represents historically the JMSMessageID. + * We had in the past used this for the MessageID that was sent on core messages... + * + * later on when we added AMQP this name clashed with AMQPMessage.getUserID(); + * + * @return the user id + */ + @Override + public Object getUserID() { + return message.getUserID(); + } + + @Override + public Message setUserID(Object userID) { + message.setUserID(userID); + return this; + } + + /** + * Returns whether this message is durable or not. + */ + @Override + public boolean isDurable() { + return message.isDurable(); + } + + /** + * Sets whether this message is durable or not. + * + * @param durable {@code true} to flag this message as durable, {@code false} else + */ + @Override + public Message setDurable(boolean durable) { + message.setDurable(durable); + return message; + } + + @Override + public Persister getPersister() { + throw new UnsupportedOperationException(); + } + + @Override + public String getAddress() { + return message.getAddress(); + } + + @Override + public Message setAddress(String address) { + message.setAddress(address); + return this; + } + + @Override + public SimpleString getAddressSimpleString() { + return message.getAddressSimpleString(); + } + + @Override + public Message setAddress(SimpleString address) { + message.setAddress(address); + return this; + } + + @Override + public long getTimestamp() { + return message.getTimestamp(); + } + + @Override + public Message setTimestamp(long timestamp) { + message.setTimestamp(timestamp); + return this; + } + + /** + * Returns the message priority. + *

+ * Values range from 0 (less priority) to 9 (more priority) inclusive. + */ + @Override + public byte getPriority() { + return message.getPriority(); + } + + /** + * Sets the message priority. + *

+ * Value must be between 0 and 9 inclusive. + * + * @param priority the new message priority + */ + @Override + public Message setPriority(byte priority) { + message.setPriority(priority); + return this; + } + + /** + * Used to receive this message from an encoded medium buffer + * + * @param buffer + */ + @Override + public void receiveBuffer(ByteBuf buffer) { + throw new UnsupportedOperationException(); + } + + /** + * Used to send this message to an encoded medium buffer. + * + * @param buffer the buffer used. + * @param deliveryCount Some protocols (AMQP) will have this as part of the message. + */ + @Override + public void sendBuffer(ByteBuf buffer, int deliveryCount) { + throw new UnsupportedOperationException(); + } + + @Override + public int getPersistSize() { + return message.getPersistSize(); + } + + @Override + public void persist(ActiveMQBuffer targetRecord) { + message.persist(targetRecord); + } + + @Override + public void reloadPersistence(ActiveMQBuffer record) { + throw new UnsupportedOperationException(); + } + + @Override + public Message putBooleanProperty(String key, boolean value) { + message.putBooleanProperty(key, value); + return this; + } + + @Override + public Message putByteProperty(String key, byte value) { + message.putByteProperty(key, value); + return this; + } + + @Override + public Message putBytesProperty(String key, byte[] value) { + message.putBytesProperty(key, value); + return this; + } + + @Override + public Message putShortProperty(String key, short value) { + message.putShortProperty(key, value); + return this; + } + + @Override + public Message putCharProperty(String key, char value) { + message.putCharProperty(key, value); + return this; + } + + @Override + public Message putIntProperty(String key, int value) { + message.putIntProperty(key, value); + return this; + } + + @Override + public Message putLongProperty(String key, long value) { + message.putLongProperty(key, value); + return this; + } + + @Override + public Message putFloatProperty(String key, float value) { + message.putFloatProperty(key, value); + return this; + } + + @Override + public Message putDoubleProperty(String key, double value) { + message.putDoubleProperty(key, value); + return this; + } + + @Override + public Message putBooleanProperty(SimpleString key, boolean value) { + message.putBooleanProperty(key, value); + return this; + } + + @Override + public Message putByteProperty(SimpleString key, byte value) { + message.putByteProperty(key, value); + return this; + } + + @Override + public Message putBytesProperty(SimpleString key, byte[] value) { + message.putBytesProperty(key, value); + return this; + } + + @Override + public Message putShortProperty(SimpleString key, short value) { + message.putShortProperty(key, value); + return this; + } + + @Override + public Message putCharProperty(SimpleString key, char value) { + message.putCharProperty(key, value); + return this; + } + + @Override + public Message putIntProperty(SimpleString key, int value) { + message.putIntProperty(key, value); + return this; + } + + @Override + public Message putLongProperty(SimpleString key, long value) { + message.putLongProperty(key, value); + return this; + } + + @Override + public Message putFloatProperty(SimpleString key, float value) { + message.putFloatProperty(key, value); + return this; + } + + @Override + public Message putDoubleProperty(SimpleString key, double value) { + message.putDoubleProperty(key, value); + return this; + } + + /** + * Puts a String property in this message. + * + * @param key property name + * @param value property value + */ + @Override + public Message putStringProperty(String key, String value) { + message.putStringProperty(key, value); + return this; + } + + @Override + public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException { + message.putObjectProperty(key, value); + return this; + } + + @Override + public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException { + message.putObjectProperty(key, value); + return this; + } + + @Override + public Object removeProperty(String key) { + return message.removeProperty(key); + } + + @Override + public boolean containsProperty(String key) { + return message.containsProperty(key); + } + + @Override + public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { + return message.getBooleanProperty(key); + } + + @Override + public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { + return message.getByteProperty(key); + } + + @Override + public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { + return message.getDoubleProperty(key); + } + + @Override + public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { + return message.getIntProperty(key); + } + + @Override + public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { + return message.getLongProperty(key); + } + + @Override + public Object getObjectProperty(String key) { + return message.getObjectProperty(key); + } + + @Override + public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { + return message.getShortProperty(key); + } + + @Override + public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { + return message.getFloatProperty(key); + } + + @Override + public String getStringProperty(String key) throws ActiveMQPropertyConversionException { + return message.getStringProperty(key); + } + + @Override + public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { + return message.getSimpleStringProperty(key); + } + + @Override + public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException { + return message.getBytesProperty(key); + } + + @Override + public Object removeProperty(SimpleString key) { + return message.removeProperty(key); + } + + @Override + public boolean containsProperty(SimpleString key) { + return message.containsProperty(key); + } + + @Override + public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getBooleanProperty(key); + } + + @Override + public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getByteProperty(key); + } + + @Override + public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getDoubleProperty(key); + } + + @Override + public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getIntProperty(key); + } + + @Override + public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getLongProperty(key); + } + + @Override + public Object getObjectProperty(SimpleString key) { + return message.getObjectProperty(key); + } + + @Override + public Object getAnnotation(SimpleString key) { + return message.getAnnotation(key); + } + + @Override + public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getShortProperty(key); + } + + @Override + public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getFloatProperty(key); + } + + @Override + public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getStringProperty(key); + } + + @Override + public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getSimpleStringProperty(key); + } + + @Override + public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return message.getBytesProperty(key); + } + + @Override + public Message putStringProperty(SimpleString key, SimpleString value) { + return message.putStringProperty(key, value); + } + + @Override + public Message putStringProperty(SimpleString key, String value) { + return message.putStringProperty(key, value); + } + + /** + * Returns the size of the encoded message. + */ + @Override + public int getEncodeSize() { + return message.getEncodeSize(); + } + + /** + * Returns all the names of the properties for this message. + */ + @Override + public Set getPropertyNames() { + return message.getPropertyNames(); + } + + @Override + public int getRefCount() { + throw new UnsupportedOperationException(); + } + + @Override + public int incrementRefCount() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public int decrementRefCount() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public int incrementDurableRefCount() { + throw new UnsupportedOperationException(); + } + + @Override + public int decrementDurableRefCount() { + throw new UnsupportedOperationException(); + } + + /** + * This should make you convert your message into Core format. + */ + @Override + public ICoreMessage toCore() { + return message.toCore(); + } + + /** + * This should make you convert your message into Core format. + * + * @param coreMessageObjectPools + */ + @Override + public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { + return message.toCore(); + } + + @Override + public int getMemoryEstimate() { + return message.getMemoryEstimate(); + } + + @Override + public void setAddressTransient(SimpleString address) { + message.setAddress(address); + } + + @Override + public TypedProperties getTypedProperties() { + return new TypedProperties(message.getTypedProperties()); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java new file mode 100644 index 0000000000..60e8035221 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.paging.PagingStore; + +@Deprecated +public interface ServerMessage extends MessageInternal { + + ICoreMessage getICoreMessage(); + + MessageReference createReference(Queue queue); + + /** + * This will force encoding of the address, and will re-check the buffer + * This is to avoid setMessageTransient which set the address without changing the buffer + * + * @param address + */ + void forceAddress(SimpleString address); + + ServerMessage makeCopyForExpiryOrDLA(long newID, + MessageReference originalReference, + boolean expiry, + boolean copyOriginalHeaders) throws Exception; + + void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry); + + void setPagingStore(PagingStore store); + + PagingStore getPagingStore(); + + // Is there any _AMQ_ property being used + boolean hasInternalProperties(); + + boolean storeIsPaging(); + + void encodeMessageIDToBuffer(); + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java index 7d965e8828..e426d39a63 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java @@ -16,8 +16,23 @@ */ package org.apache.activemq.artemis.core.server.cluster; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.transformer.ServerMessageImpl; + /** * This is for back compatibility with package move. */ +@Deprecated public interface Transformer extends org.apache.activemq.artemis.core.server.transformer.Transformer { -} + + @Override + default Message transform(Message message) { + return transform(new ServerMessageImpl(message)).getICoreMessage(); + } + + @Deprecated + default ServerMessage transform(ServerMessage m) { + return m; + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java new file mode 100644 index 0000000000..fe8e9ec7e6 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.transformer; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.MessageInternalImpl; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerMessage; + +/** + * Do not use this class. It is for backwards compatibility with Artemis 1.x only. + */ +@Deprecated +public class ServerMessageImpl extends MessageInternalImpl implements ServerMessage { + + private CoreMessage message; + + private boolean valid = false; + + public boolean isValid() { + return false; + } + + @Override + public ICoreMessage getICoreMessage() { + return message; + } + + public ServerMessageImpl(Message message) { + super(message.toCore()); + this.message = (CoreMessage) message.toCore(); + } + + @Override + public ServerMessage setMessageID(long id) { + message.setMessageID(id); + return this; + } + + @Override + public MessageReference createReference(Queue queue) { + throw new UnsupportedOperationException(); + } + + /** + * This will force encoding of the address, and will re-check the buffer + * This is to avoid setMessageTransient which set the address without changing the buffer + * + * @param address + */ + @Override + public void forceAddress(SimpleString address) { + message.setAddress(address); + } + + @Override + public int incrementRefCount() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public int decrementRefCount() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public int incrementDurableRefCount() { + throw new UnsupportedOperationException(); + } + + @Override + public int decrementDurableRefCount() { + throw new UnsupportedOperationException(); + } + + @Override + public ServerMessage copy(long newID) { + throw new UnsupportedOperationException(); + } + + @Override + public ServerMessage copy() { + throw new UnsupportedOperationException(); + } + + @Override + public int getMemoryEstimate() { + return message.getMemoryEstimate(); + } + + @Override + public int getRefCount() { + return message.getRefCount(); + } + + @Override + public ServerMessage makeCopyForExpiryOrDLA(long newID, + MessageReference originalReference, + boolean expiry, + boolean copyOriginalHeaders) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void setOriginalHeaders(ServerMessage otherServerMessage, MessageReference originalReference, boolean expiry) { + + ICoreMessage other = otherServerMessage.getICoreMessage(); + + SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE); + + if (originalQueue != null) { + message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); + } else if (originalReference != null) { + message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName()); + } + + if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { + message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS)); + + message.putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID)); + } else { + message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, new SimpleString(other.getAddress())); + + message.putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID()); + } + + // reset expiry + message.setExpiration(0); + + if (expiry) { + long actualExpiryTime = System.currentTimeMillis(); + + message.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime); + } + + // TODO ASk clebert + //message.bufferValid = false; + } + + @Override + public void setPagingStore(PagingStore store) { + throw new UnsupportedOperationException(); + } + + @Override + public PagingStore getPagingStore() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasInternalProperties() { + return message.getTypedProperties().hasInternalProperties(); + } + + @Override + public boolean storeIsPaging() { + throw new UnsupportedOperationException(); + } + + @Override + public void encodeMessageIDToBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getDuplicateIDBytes() { + return message.getDuplicateIDBytes(); + } + + @Override + public Object getDuplicateProperty() { + return message.getDuplicateProperty(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 3f7d1c65a1..219aa15698 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -1994,7 +1994,12 @@ public class BridgeTest extends ActiveMQTestBase { final String BRIDGE = "myBridge"; ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl(); - Transformer transformer = (Message encode) -> null; + Transformer transformer = new Transformer() { + @Override + public Message transform(Message message) { + return null; + } + }; serviceRegistry.addBridgeTransformer(BRIDGE, transformer); Configuration config = createDefaultInVMConfig().addConnectorConfiguration("in-vm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));