ARTEMIS-900 fix compatibility after address changes

This commit is contained in:
Clebert Suconic 2016-12-21 17:31:30 -05:00
parent a3490cad22
commit b9a7e152f7
10 changed files with 195 additions and 55 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
@ -24,6 +25,11 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class PacketImpl implements Packet {
// Constants -------------------------------------------------------------------------
public static final int ADDRESSING_CHANGE_VERSION = 129;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
// The minimal size for all the packets, Common data for all the packets (look at
// PacketImpl.encode)
public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
@ -267,6 +273,20 @@ public class PacketImpl implements Packet {
// Public --------------------------------------------------------
public SimpleString convertName(SimpleString name) {
if (name == null) {
return null;
}
if (name.startsWith(OLD_QUEUE_PREFIX)) {
return name.subSeq(OLD_QUEUE_PREFIX.length(), name.length());
} else if (name.startsWith(OLD_TOPIC_PREFIX)) {
return name.subSeq(OLD_TOPIC_PREFIX.length(), name.length());
} else {
return name;
}
}
@Override
public byte getType() {
return type;
@ -376,4 +396,6 @@ public class PacketImpl implements Packet {
protected int nullableStringEncodeSize(final String str) {
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public abstract class QueueAbstractPacket extends PacketImpl {
protected SimpleString queueName;
protected SimpleString oldVersionQueueName;
protected SimpleString address;
protected SimpleString oldVersionAddresseName;
public SimpleString getQueueName(int clientVersion) {
if (clientVersion < ADDRESSING_CHANGE_VERSION) {
if (oldVersionQueueName == null) {
oldVersionQueueName = convertName(queueName);
}
return oldVersionQueueName;
} else {
return queueName;
}
}
public SimpleString getAddress(int clientVersion) {
if (clientVersion < ADDRESSING_CHANGE_VERSION) {
if (oldVersionAddresseName == null) {
oldVersionAddresseName = convertName(address);
}
return oldVersionAddresseName;
} else {
return address;
}
}
public QueueAbstractPacket(byte type) {
super(type);
}
}

View File

@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionBindingQueryMessage extends PacketImpl {
private SimpleString address;
public class SessionBindingQueryMessage extends QueueAbstractPacket {
public SessionBindingQueryMessage(final SimpleString address) {
super(SESS_BINDINGQUERY);
@ -34,10 +31,6 @@ public class SessionBindingQueryMessage extends PacketImpl {
super(SESS_BINDINGQUERY);
}
public SimpleString getAddress() {
return address;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(address);

View File

@ -18,14 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionCreateConsumerMessage extends PacketImpl {
public class SessionCreateConsumerMessage extends QueueAbstractPacket {
private long id;
private SimpleString queueName;
private SimpleString filterString;
private boolean browseOnly;
@ -66,10 +63,6 @@ public class SessionCreateConsumerMessage extends PacketImpl {
return id;
}
public SimpleString getQueueName() {
return queueName;
}
public SimpleString getFilterString() {
return filterString;
}

View File

@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionQueueQueryMessage extends PacketImpl {
private SimpleString queueName;
public class SessionQueueQueryMessage extends QueueAbstractPacket {
public SessionQueueQueryMessage(final SimpleString queueName) {
super(SESS_QUEUEQUERY);
@ -34,10 +31,6 @@ public class SessionQueueQueryMessage extends PacketImpl {
super(SESS_QUEUEQUERY);
}
public SimpleString getQueueName() {
return queueName;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(queueName);

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QueueAbstractPacket;
import org.junit.Assert;
import org.junit.Test;
public class QueueAbstractTest {
class MyTest extends QueueAbstractPacket {
MyTest(String name) {
super((byte)0);
this.queueName = SimpleString.toSimpleString(name);
}
}
@Test
public void testOldTopic() {
MyTest test = new MyTest("jms.topic.mytopic");
Assert.assertEquals("mytopic", test.getQueueName(127).toString());
}
@Test
public void testOldQueue() {
MyTest test = new MyTest("jms.queue.myQueue");
Assert.assertEquals("myQueue", test.getQueueName(127).toString());
}
}

View File

@ -367,22 +367,29 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
try {
ActiveMQQueue queue = lookupQueue(queueName, false);
if (queue == null) {
queue = lookupQueue(queueName, true);
}
if (queue == null) {
throw new JMSException("There is no queue with name " + queueName);
} else {
return queue;
}
return internalCreateQueue(queueName, false);
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
protected Queue internalCreateQueue(String queueName, final boolean retry) throws ActiveMQException, JMSException {
ActiveMQQueue queue = lookupQueue(queueName, false);
if (queue == null) {
queue = lookupQueue(queueName, true);
}
if (queue == null) {
if (!retry) {
return internalCreateQueue("jms.queue." + queueName, true);
}
throw new JMSException("There is no queue with name " + queueName);
} else {
return queue;
}
}
@Override
public Topic createTopic(final String topicName) throws JMSException {
// As per spec. section 4.11
@ -391,22 +398,29 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
try {
ActiveMQTopic topic = lookupTopic(topicName, false);
if (topic == null) {
topic = lookupTopic(topicName, true);
}
if (topic == null) {
throw new JMSException("There is no topic with name " + topicName);
} else {
return topic;
}
return internalCreateTopic(topicName, false);
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
protected Topic internalCreateTopic(String topicName, boolean retry) throws ActiveMQException, JMSException {
ActiveMQTopic topic = lookupTopic(topicName, false);
if (topic == null) {
topic = lookupTopic(topicName, true);
}
if (topic == null) {
if (!retry) {
return internalCreateTopic("jms.topic." + topicName, true);
}
throw new JMSException("There is no topic with name " + topicName);
} else {
return topic;
}
}
@Override
public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException {
return createDurableSubscriber(topic, name, null, false);

View File

@ -150,7 +150,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
this.remotingConnection = channel.getConnection();
//TODO think of a better way of doing this
Connection conn = remotingConnection.getTransportConnection();
if (conn instanceof NettyConnection) {
@ -215,11 +214,12 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_CREATECONSUMER: {
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
requiresResponse = request.isRequiresResponse();
session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getClientVersion()), request.getFilterString(), request.isBrowseOnly());
if (requiresResponse) {
// We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
@ -287,7 +287,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_QUEUEQUERY: {
requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(result);
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
@ -300,7 +300,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_BINDINGQUERY: {
requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion()));
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultDeleteOnNoConsumers(), result.getDefaultMaxConsumers());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.version.Version;
import org.jboss.logging.Logger;
@ -153,7 +157,15 @@ public class ActiveMQPacketHandler implements ChannelHandler {
OperationContext sessionOperationContext = server.newOperationContext();
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, protocolManager.getPrefixes());
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
if (connection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
routingTypeMap = new HashMap<>();
routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST);
routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
}
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
channel.setHandler(handler);

View File

@ -1330,6 +1330,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
SimpleString address = removePrefix(message.getAddress());
// In case the prefix was removed, we also need to update the message
if (address != message.getAddress()) {
message.setAddress(address);
}
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
@ -1349,12 +1354,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
if (address == null) {
if (message.getAddress() == null) {
// This could happen with some tests that are ignoring messages
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
if (address.equals(managementAddress)) {
if (message.getAddress().equals(managementAddress)) {
// It's a management message
handleManagementMessage(tx, message, direct);
@ -1733,7 +1738,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public SimpleString removePrefix(SimpleString address) {
if (prefixEnabled) {
if (prefixEnabled && address != null) {
return PrefixUtil.getAddress(address, prefixes);
}
return address;