This closes #490

This commit is contained in:
Clebert Suconic 2016-04-26 10:02:01 -04:00
commit 586372603f
6 changed files with 183 additions and 4 deletions

View File

@ -25,6 +25,7 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
@ -67,7 +68,7 @@ public class ActiveMQJMSVendor implements JMSVendor {
@Override
public ObjectMessage createObjectMessage() {
return null;
return new ServerJMSObjectMessage(newMessage(org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE), 0);
}
@Override
@ -110,6 +111,8 @@ public class ActiveMQJMSVendor implements JMSVendor {
return new ServerJMSMapMessage(wrapped, deliveryCount);
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
return new ServerJMSTextMessage(wrapped, deliveryCount);
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
return new ServerJMSObjectMessage(wrapped, deliveryCount);
default:
return new ServerJMSMessage(wrapped, deliveryCount);
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.proton.converter;
import org.apache.activemq.transport.amqp.message.JMSVendor;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Properties;
import javax.jms.Message;
class JMSMappingInboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer {
JMSMappingInboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
super.populateMessage(jms, amqp);
final Properties properties = amqp.getProperties();
if (properties != null) {
if (properties.getMessageId() != null) {
if (properties.getMessageId() instanceof Long) {
jms.setLongProperty(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID", (Long) properties.getMessageId());
}
else if (properties.getMessageId() instanceof UnsignedLong) {
jms.setLongProperty(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID", ((UnsignedLong) properties.getMessageId()).longValue());
}
else {
jms.setStringProperty(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID", properties.getMessageId().toString());
}
}
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.proton.converter;
import org.apache.activemq.transport.amqp.message.JMSVendor;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.message.ProtonJMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.UnsupportedEncodingException;
import java.util.Map;
class JMSMappingOutboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer {
JMSMappingOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
ProtonJMessage protonJMessage = super.convert(msg);
Map properties = protonJMessage.getApplicationProperties().getValue();
if (properties.containsKey(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID")) {
Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID");
protonJMessage.setMessageId(id);
}
else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID")) {
Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID");
protonJMessage.setMessageId(new UnsignedLong(id));
}
else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID")) {
String id = (String) properties.remove(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID");
protonJMessage.setMessageId(id);
}
return protonJMessage;
}
}

View File

@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.protocol.proton.converter;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.transport.amqp.message.EncodedMessage;
import org.apache.activemq.transport.amqp.message.InboundTransformer;
import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
import org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;

View File

@ -18,12 +18,20 @@ package org.apache.activemq.artemis.core.protocol.proton.converter.jms;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import javax.jms.JMSException;
import javax.jms.Queue;
/**
* This is just here to avoid all the client checks we ned with valid JMS destinations, protocol convertors don't need to
* adhere to the jms. semantics.
*/
public class ServerDestination extends ActiveMQDestination {
public class ServerDestination extends ActiveMQDestination implements Queue {
public ServerDestination(String name) {
super(name, name, false, false, null);
}
@Override
public String getQueueName() throws JMSException {
return getName();
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.proton.converter.jms;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.util.ByteArrayOutputStream;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage {
public static final byte TYPE = Message.STREAM_TYPE;
private Serializable object;
public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) {
super(message, deliveryCount);
}
@Override
public void setObject(Serializable object) throws JMSException {
this.object = object;
}
@Override
public Serializable getObject() throws JMSException {
return object;
}
@Override
public void encode() throws Exception {
super.encode();
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream ous = new ObjectOutputStream(out);
ous.writeObject(object);
getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray());
}
@Override
public void decode() throws Exception {
super.decode();
int size = getInnerMessage().getBodyBuffer().readableBytes();
byte[] bytes = new byte[size];
getInnerMessage().getBodyBuffer().readBytes(bytes);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
object = (Serializable) ois.readObject();
}
}