ARTEMIS-1611 Added support for 1.x transformer API

This commit is contained in:
Martyn Taylor 2018-01-15 16:36:01 +00:00 committed by Michael Pearce
parent c3ea288c62
commit 7576b9d586
8 changed files with 1084 additions and 2 deletions

View File

@ -23,4 +23,9 @@ package org.apache.activemq.artemis.utils;
@Deprecated
public class TypedProperties extends org.apache.activemq.artemis.utils.collections.TypedProperties {
public TypedProperties() { }
public TypedProperties(final org.apache.activemq.artemis.utils.collections.TypedProperties other) {
super(other);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.message;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@Deprecated
public interface BodyEncoder {
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
void open() throws ActiveMQException;
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
void close() throws ActiveMQException;
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
int encode(ByteBuffer bufferRead) throws ActiveMQException;
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
long getLargeBodySize();
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.message.impl;
import java.io.InputStream;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.utils.TypedProperties;
@Deprecated
public interface MessageInternal extends Message {
void decodeFromBuffer(ActiveMQBuffer buffer);
int getEndOfMessagePosition();
int getEndOfBodyPosition();
void bodyChanged();
boolean isServerMessage();
ActiveMQBuffer getEncodedBuffer();
int getHeadersAndPropertiesEncodeSize();
ActiveMQBuffer getWholeBuffer();
void encodeHeadersAndProperties(ActiveMQBuffer buffer);
void decodeHeadersAndProperties(ActiveMQBuffer buffer);
BodyEncoder getBodyEncoder() throws ActiveMQException;
InputStream getBodyInputStream();
void setAddressTransient(SimpleString address);
TypedProperties getTypedProperties();
}

View File

@ -0,0 +1,698 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.message.impl;
import java.io.InputStream;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.utils.TypedProperties;
@Deprecated
public class MessageInternalImpl implements MessageInternal {
private CoreMessage message;
public MessageInternalImpl(ICoreMessage message) {
this.message = (CoreMessage) message;
}
@Override
public void decodeFromBuffer(ActiveMQBuffer buffer) {
throw new UnsupportedOperationException();
}
@Override
public int getEndOfMessagePosition() {
throw new UnsupportedOperationException();
}
@Override
public int getEndOfBodyPosition() {
throw new UnsupportedOperationException();
}
@Override
public void bodyChanged() {
throw new UnsupportedOperationException();
}
@Override
public boolean isServerMessage() {
return true;
}
@Override
public ActiveMQBuffer getEncodedBuffer() {
throw new UnsupportedOperationException();
}
@Override
public int getHeadersAndPropertiesEncodeSize() {
return message.getHeadersAndPropertiesEncodeSize();
}
@Override
public ActiveMQBuffer getWholeBuffer() {
throw new UnsupportedOperationException();
}
@Override
public void encodeHeadersAndProperties(ActiveMQBuffer buffer) {
throw new UnsupportedOperationException();
}
@Override
public void decodeHeadersAndProperties(ActiveMQBuffer buffer) {
throw new UnsupportedOperationException();
}
@Override
public BodyEncoder getBodyEncoder() throws ActiveMQException {
throw new UnsupportedOperationException();
}
@Override
public InputStream getBodyInputStream() {
return message.getBodyInputStream();
}
@Override
public void messageChanged() {
throw new UnsupportedOperationException();
}
/**
* Used to calculate what is the delivery time.
* Return null if not scheduled.
*/
@Override
public Long getScheduledDeliveryTime() {
return message.getScheduledDeliveryTime();
}
/**
* Context can be used by the application server to inject extra control, like a protocol specific on the server.
* There is only one per Object, use it wisely!
*
* Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo
*/
@Override
public RefCountMessageListener getContext() {
throw new UnsupportedOperationException();
}
@Override
public SimpleString getReplyTo() {
return message.getReplyTo();
}
@Override
public Message setReplyTo(SimpleString address) {
message.setReplyTo(address);
return this;
}
@Override
public Message setContext(RefCountMessageListener context) {
throw new UnsupportedOperationException();
}
/**
* The buffer will belong to this message, until release is called.
*
* @param buffer
*/
@Override
public Message setBuffer(ByteBuf buffer) {
throw new UnsupportedOperationException();
}
@Override
public ByteBuf getBuffer() {
return message.getBuffer();
}
/**
* It will generate a new instance of the message encode, being a deep copy, new properties, new everything
*/
@Override
public Message copy() {
return message.copy();
}
/**
* It will generate a new instance of the message encode, being a deep copy, new properties, new everything
*
* @param newID
*/
@Override
public Message copy(long newID) {
return message.copy(newID);
}
/**
* Returns the messageID.
* <br>
* The messageID is set when the message is handled by the server.
*/
@Override
public long getMessageID() {
return message.getMessageID();
}
@Override
public Message setMessageID(long id) {
message.setMessageID(id);
return this;
}
/**
* Returns the expiration time of this message.
*/
@Override
public long getExpiration() {
return message.getExpiration();
}
/**
* Sets the expiration of this message.
*
* @param expiration expiration time
*/
@Override
public Message setExpiration(long expiration) {
message.setExpiration(expiration);
return this;
}
/**
* This represents historically the JMSMessageID.
* We had in the past used this for the MessageID that was sent on core messages...
*
* later on when we added AMQP this name clashed with AMQPMessage.getUserID();
*
* @return the user id
*/
@Override
public Object getUserID() {
return message.getUserID();
}
@Override
public Message setUserID(Object userID) {
message.setUserID(userID);
return this;
}
/**
* Returns whether this message is durable or not.
*/
@Override
public boolean isDurable() {
return message.isDurable();
}
/**
* Sets whether this message is durable or not.
*
* @param durable {@code true} to flag this message as durable, {@code false} else
*/
@Override
public Message setDurable(boolean durable) {
message.setDurable(durable);
return message;
}
@Override
public Persister<Message> getPersister() {
throw new UnsupportedOperationException();
}
@Override
public String getAddress() {
return message.getAddress();
}
@Override
public Message setAddress(String address) {
message.setAddress(address);
return this;
}
@Override
public SimpleString getAddressSimpleString() {
return message.getAddressSimpleString();
}
@Override
public Message setAddress(SimpleString address) {
message.setAddress(address);
return this;
}
@Override
public long getTimestamp() {
return message.getTimestamp();
}
@Override
public Message setTimestamp(long timestamp) {
message.setTimestamp(timestamp);
return this;
}
/**
* Returns the message priority.
* <p>
* Values range from 0 (less priority) to 9 (more priority) inclusive.
*/
@Override
public byte getPriority() {
return message.getPriority();
}
/**
* Sets the message priority.
* <p>
* Value must be between 0 and 9 inclusive.
*
* @param priority the new message priority
*/
@Override
public Message setPriority(byte priority) {
message.setPriority(priority);
return this;
}
/**
* Used to receive this message from an encoded medium buffer
*
* @param buffer
*/
@Override
public void receiveBuffer(ByteBuf buffer) {
throw new UnsupportedOperationException();
}
/**
* Used to send this message to an encoded medium buffer.
*
* @param buffer the buffer used.
* @param deliveryCount Some protocols (AMQP) will have this as part of the message.
*/
@Override
public void sendBuffer(ByteBuf buffer, int deliveryCount) {
throw new UnsupportedOperationException();
}
@Override
public int getPersistSize() {
return message.getPersistSize();
}
@Override
public void persist(ActiveMQBuffer targetRecord) {
message.persist(targetRecord);
}
@Override
public void reloadPersistence(ActiveMQBuffer record) {
throw new UnsupportedOperationException();
}
@Override
public Message putBooleanProperty(String key, boolean value) {
message.putBooleanProperty(key, value);
return this;
}
@Override
public Message putByteProperty(String key, byte value) {
message.putByteProperty(key, value);
return this;
}
@Override
public Message putBytesProperty(String key, byte[] value) {
message.putBytesProperty(key, value);
return this;
}
@Override
public Message putShortProperty(String key, short value) {
message.putShortProperty(key, value);
return this;
}
@Override
public Message putCharProperty(String key, char value) {
message.putCharProperty(key, value);
return this;
}
@Override
public Message putIntProperty(String key, int value) {
message.putIntProperty(key, value);
return this;
}
@Override
public Message putLongProperty(String key, long value) {
message.putLongProperty(key, value);
return this;
}
@Override
public Message putFloatProperty(String key, float value) {
message.putFloatProperty(key, value);
return this;
}
@Override
public Message putDoubleProperty(String key, double value) {
message.putDoubleProperty(key, value);
return this;
}
@Override
public Message putBooleanProperty(SimpleString key, boolean value) {
message.putBooleanProperty(key, value);
return this;
}
@Override
public Message putByteProperty(SimpleString key, byte value) {
message.putByteProperty(key, value);
return this;
}
@Override
public Message putBytesProperty(SimpleString key, byte[] value) {
message.putBytesProperty(key, value);
return this;
}
@Override
public Message putShortProperty(SimpleString key, short value) {
message.putShortProperty(key, value);
return this;
}
@Override
public Message putCharProperty(SimpleString key, char value) {
message.putCharProperty(key, value);
return this;
}
@Override
public Message putIntProperty(SimpleString key, int value) {
message.putIntProperty(key, value);
return this;
}
@Override
public Message putLongProperty(SimpleString key, long value) {
message.putLongProperty(key, value);
return this;
}
@Override
public Message putFloatProperty(SimpleString key, float value) {
message.putFloatProperty(key, value);
return this;
}
@Override
public Message putDoubleProperty(SimpleString key, double value) {
message.putDoubleProperty(key, value);
return this;
}
/**
* Puts a String property in this message.
*
* @param key property name
* @param value property value
*/
@Override
public Message putStringProperty(String key, String value) {
message.putStringProperty(key, value);
return this;
}
@Override
public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
message.putObjectProperty(key, value);
return this;
}
@Override
public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
message.putObjectProperty(key, value);
return this;
}
@Override
public Object removeProperty(String key) {
return message.removeProperty(key);
}
@Override
public boolean containsProperty(String key) {
return message.containsProperty(key);
}
@Override
public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
return message.getBooleanProperty(key);
}
@Override
public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
return message.getByteProperty(key);
}
@Override
public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
return message.getDoubleProperty(key);
}
@Override
public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
return message.getIntProperty(key);
}
@Override
public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
return message.getLongProperty(key);
}
@Override
public Object getObjectProperty(String key) {
return message.getObjectProperty(key);
}
@Override
public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
return message.getShortProperty(key);
}
@Override
public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
return message.getFloatProperty(key);
}
@Override
public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
return message.getStringProperty(key);
}
@Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return message.getSimpleStringProperty(key);
}
@Override
public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
return message.getBytesProperty(key);
}
@Override
public Object removeProperty(SimpleString key) {
return message.removeProperty(key);
}
@Override
public boolean containsProperty(SimpleString key) {
return message.containsProperty(key);
}
@Override
public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getBooleanProperty(key);
}
@Override
public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getByteProperty(key);
}
@Override
public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getDoubleProperty(key);
}
@Override
public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getIntProperty(key);
}
@Override
public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getLongProperty(key);
}
@Override
public Object getObjectProperty(SimpleString key) {
return message.getObjectProperty(key);
}
@Override
public Object getAnnotation(SimpleString key) {
return message.getAnnotation(key);
}
@Override
public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getShortProperty(key);
}
@Override
public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getFloatProperty(key);
}
@Override
public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getStringProperty(key);
}
@Override
public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getSimpleStringProperty(key);
}
@Override
public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return message.getBytesProperty(key);
}
@Override
public Message putStringProperty(SimpleString key, SimpleString value) {
return message.putStringProperty(key, value);
}
@Override
public Message putStringProperty(SimpleString key, String value) {
return message.putStringProperty(key, value);
}
/**
* Returns the size of the <em>encoded</em> message.
*/
@Override
public int getEncodeSize() {
return message.getEncodeSize();
}
/**
* Returns all the names of the properties for this message.
*/
@Override
public Set<SimpleString> getPropertyNames() {
return message.getPropertyNames();
}
@Override
public int getRefCount() {
throw new UnsupportedOperationException();
}
@Override
public int incrementRefCount() throws Exception {
throw new UnsupportedOperationException();
}
@Override
public int decrementRefCount() throws Exception {
throw new UnsupportedOperationException();
}
@Override
public int incrementDurableRefCount() {
throw new UnsupportedOperationException();
}
@Override
public int decrementDurableRefCount() {
throw new UnsupportedOperationException();
}
/**
* This should make you convert your message into Core format.
*/
@Override
public ICoreMessage toCore() {
return message.toCore();
}
/**
* This should make you convert your message into Core format.
*
* @param coreMessageObjectPools
*/
@Override
public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return message.toCore();
}
@Override
public int getMemoryEstimate() {
return message.getMemoryEstimate();
}
@Override
public void setAddressTransient(SimpleString address) {
message.setAddress(address);
}
@Override
public TypedProperties getTypedProperties() {
return new TypedProperties(message.getTypedProperties());
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PagingStore;
@Deprecated
public interface ServerMessage extends MessageInternal {
ICoreMessage getICoreMessage();
MessageReference createReference(Queue queue);
/**
* This will force encoding of the address, and will re-check the buffer
* This is to avoid setMessageTransient which set the address without changing the buffer
*
* @param address
*/
void forceAddress(SimpleString address);
ServerMessage makeCopyForExpiryOrDLA(long newID,
MessageReference originalReference,
boolean expiry,
boolean copyOriginalHeaders) throws Exception;
void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry);
void setPagingStore(PagingStore store);
PagingStore getPagingStore();
// Is there any _AMQ_ property being used
boolean hasInternalProperties();
boolean storeIsPaging();
void encodeMessageIDToBuffer();
}

View File

@ -16,8 +16,23 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.transformer.ServerMessageImpl;
/**
* This is for back compatibility with package move.
*/
@Deprecated
public interface Transformer extends org.apache.activemq.artemis.core.server.transformer.Transformer {
@Override
default Message transform(Message message) {
return transform(new ServerMessageImpl(message)).getICoreMessage();
}
@Deprecated
default ServerMessage transform(ServerMessage m) {
return m;
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.transformer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.MessageInternalImpl;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
/**
* Do not use this class. It is for backwards compatibility with Artemis 1.x only.
*/
@Deprecated
public class ServerMessageImpl extends MessageInternalImpl implements ServerMessage {
private CoreMessage message;
private boolean valid = false;
public boolean isValid() {
return false;
}
@Override
public ICoreMessage getICoreMessage() {
return message;
}
public ServerMessageImpl(Message message) {
super(message.toCore());
this.message = (CoreMessage) message.toCore();
}
@Override
public ServerMessage setMessageID(long id) {
message.setMessageID(id);
return this;
}
@Override
public MessageReference createReference(Queue queue) {
throw new UnsupportedOperationException();
}
/**
* This will force encoding of the address, and will re-check the buffer
* This is to avoid setMessageTransient which set the address without changing the buffer
*
* @param address
*/
@Override
public void forceAddress(SimpleString address) {
message.setAddress(address);
}
@Override
public int incrementRefCount() throws Exception {
throw new UnsupportedOperationException();
}
@Override
public int decrementRefCount() throws Exception {
throw new UnsupportedOperationException();
}
@Override
public int incrementDurableRefCount() {
throw new UnsupportedOperationException();
}
@Override
public int decrementDurableRefCount() {
throw new UnsupportedOperationException();
}
@Override
public ServerMessage copy(long newID) {
throw new UnsupportedOperationException();
}
@Override
public ServerMessage copy() {
throw new UnsupportedOperationException();
}
@Override
public int getMemoryEstimate() {
return message.getMemoryEstimate();
}
@Override
public int getRefCount() {
return message.getRefCount();
}
@Override
public ServerMessage makeCopyForExpiryOrDLA(long newID,
MessageReference originalReference,
boolean expiry,
boolean copyOriginalHeaders) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void setOriginalHeaders(ServerMessage otherServerMessage, MessageReference originalReference, boolean expiry) {
ICoreMessage other = otherServerMessage.getICoreMessage();
SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
if (originalQueue != null) {
message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
} else if (originalReference != null) {
message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName());
}
if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
message.putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID));
} else {
message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, new SimpleString(other.getAddress()));
message.putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID());
}
// reset expiry
message.setExpiration(0);
if (expiry) {
long actualExpiryTime = System.currentTimeMillis();
message.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
}
// TODO ASk clebert
//message.bufferValid = false;
}
@Override
public void setPagingStore(PagingStore store) {
throw new UnsupportedOperationException();
}
@Override
public PagingStore getPagingStore() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasInternalProperties() {
return message.getTypedProperties().hasInternalProperties();
}
@Override
public boolean storeIsPaging() {
throw new UnsupportedOperationException();
}
@Override
public void encodeMessageIDToBuffer() {
throw new UnsupportedOperationException();
}
@Override
public byte[] getDuplicateIDBytes() {
return message.getDuplicateIDBytes();
}
@Override
public Object getDuplicateProperty() {
return message.getDuplicateProperty();
}
}

View File

@ -1994,7 +1994,12 @@ public class BridgeTest extends ActiveMQTestBase {
final String BRIDGE = "myBridge";
ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl();
Transformer transformer = (Message encode) -> null;
Transformer transformer = new Transformer() {
@Override
public Message transform(Message message) {
return null;
}
};
serviceRegistry.addBridgeTransformer(BRIDGE, transformer);
Configuration config = createDefaultInVMConfig().addConnectorConfiguration("in-vm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));