mqtt improvement over retained queue

This commit is contained in:
Clebert Suconic 2015-07-25 09:59:30 +01:00
parent 414d4e24e8
commit d002da8506
5 changed files with 24 additions and 19 deletions

View File

@ -17,14 +17,14 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.Iterator;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import java.util.Iterator;
public class MQTTRetainMessageManager
{
@ -46,11 +46,11 @@ public class MQTTRetainMessageManager
{
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
if (!session.getServerSession().executeQueueQuery(retainAddress).isExists())
{
session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
}
Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null)
{
queue = session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
}
// Set the address of this message to the retained queue.
message.setAddress(retainAddress);

View File

@ -17,16 +17,16 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
public class MQTTSubscriptionManager
{
private MQTTSession session;
@ -134,6 +134,7 @@ public class MQTTSubscriptionManager
}
}
// FIXME: Do we need this synchronzied?
private synchronized void removeSubscription(String address) throws Exception
{
ServerConsumer consumer = consumers.get(address);

View File

@ -331,7 +331,7 @@ public class AMQServerSession extends ServerSessionImpl
}
@Override
public void createQueue(final SimpleString address,
public Queue createQueue(final SimpleString address,
final SimpleString name,
final SimpleString filterString,
final boolean temporary,
@ -339,11 +339,10 @@ public class AMQServerSession extends ServerSessionImpl
{
if (!this.internal)
{
super.createQueue(address, name, filterString, temporary, durable);
return;
return super.createQueue(address, name, filterString, temporary, durable);
}
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
if (temporary)
{
@ -368,6 +367,7 @@ public class AMQServerSession extends ServerSessionImpl
temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
}
return queue;
}
@Override

View File

@ -87,7 +87,7 @@ public interface ServerSession
void stop();
void createQueue(SimpleString address,
Queue createQueue(SimpleString address,
SimpleString name,
SimpleString filterString,
boolean temporary,

View File

@ -543,7 +543,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
credits);
}
public void createQueue(final SimpleString address,
public Queue createQueue(final SimpleString address,
final SimpleString name,
final SimpleString filterString,
final boolean temporary,
@ -561,14 +561,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener
((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername());
Queue queue;
// any non-temporary JMS queue created via this method should be marked as auto-created
if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name))
{
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
}
else
{
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
}
if (temporary)
@ -594,6 +596,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
}
return queue;
}
@Override