diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 13b7b8693b..96859bc2d6 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -343,7 +343,7 @@ public final class StompConnection implements RemotingConnection { StompFrame frame = frameHandler.createStompFrame(Stomp.Responses.ERROR); frame.addHeader(Stomp.Headers.Error.MESSAGE, me.getMessage()); - sendFrame(frame); + sendFrame(frame, null); destroyed = true; } @@ -552,7 +552,7 @@ public final class StompConnection implements RemotingConnection { } if (reply != null) { - sendFrame(reply); + sendFrame(reply, null); } if (Stomp.Commands.DISCONNECT.equals(cmd)) { @@ -560,8 +560,8 @@ public final class StompConnection implements RemotingConnection { } } - public void sendFrame(StompFrame frame) { - manager.sendReply(this, frame); + public void sendFrame(StompFrame frame, StompPostReceiptFunction function) { + manager.sendReply(this, frame, function); } public boolean validateUser(final String login, final String pass, final RemotingConnection connection) { @@ -660,7 +660,7 @@ public final class StompConnection implements RemotingConnection { } } - void subscribe(String destination, + StompPostReceiptFunction subscribe(String destination, String selector, String ack, String id, @@ -694,7 +694,7 @@ public final class StompConnection implements RemotingConnection { } try { - manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal); + return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal); } catch (ActiveMQStompException e) { throw e; } catch (Exception e) { @@ -743,7 +743,7 @@ public final class StompConnection implements RemotingConnection { //send a ping stomp frame public void ping(StompFrame pingFrame) { - manager.sendReply(this, pingFrame); + manager.sendReply(this, pingFrame, null); } public void physicalSend(StompFrame frame) throws Exception { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompPostReceiptFunction.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompPostReceiptFunction.java new file mode 100644 index 0000000000..381b0f0e7b --- /dev/null +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompPostReceiptFunction.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.stomp; + +public interface StompPostReceiptFunction { + void afterReceipt(); +} diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 84c78c292a..888674c852 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -33,9 +33,9 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.remoting.CertificateUtil; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.remoting.CertificateUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerSession; @@ -281,7 +281,7 @@ public class StompProtocolManager extends AbstractProtocolManager routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes(); - if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST)) { + boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST); + if (topic) { // subscribes to a topic pubSub = true; if (durableSubscriptionName != null) { @@ -308,15 +306,12 @@ public class StompSession implements SessionCallback { queueName = UUIDGenerator.getInstance().generateSimpleStringUUID(); session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), true, false); } - session.createConsumer(consumerID, queueName, null, false, false, receiveCredits); - } else { - session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, receiveCredits); } - + final ServerConsumer consumer = topic ? session.createConsumer(consumerID, queueName, null, false, false, 0) : session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, 0); StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub); subscriptions.put(consumerID, subscription); - session.start(); + return () -> consumer.receiveCredits(receiveCredits); } public boolean unsubscribe(String id, String durableSubscriptionName, String clientID) throws Exception { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index df6d9b0877..bdae6fc0be 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -96,7 +96,7 @@ public abstract class VersionedStompFrameHandler { } else if (Stomp.Commands.ABORT.equals(request.getCommand())) { response = onAbort(request); } else if (Stomp.Commands.SUBSCRIBE.equals(request.getCommand())) { - response = onSubscribe(request); + return handleSubscribe(request); } else if (Stomp.Commands.UNSUBSCRIBE.equals(request.getCommand())) { response = onUnsubscribe(request); } else if (Stomp.Commands.CONNECT.equals(request.getCommand())) { @@ -120,6 +120,21 @@ public abstract class VersionedStompFrameHandler { return response; } + private StompFrame handleSubscribe(StompFrame request) { + StompFrame response = null; + try { + StompPostReceiptFunction postProcessFunction = onSubscribe(request); + response = postprocess(request); + if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) { + response.addHeader(Stomp.Headers.Response.RECEIPT_ID, request.getHeader(Stomp.Headers.RECEIPT_REQUESTED)); + } + connection.sendFrame(response, postProcessFunction); + return null; + } catch (ActiveMQStompException e) { + return e.getFrame(); + } + + } public abstract StompFrame onConnect(StompFrame frame); public abstract StompFrame onDisconnect(StompFrame frame); @@ -240,31 +255,22 @@ public abstract class VersionedStompFrameHandler { return response; } - public StompFrame onSubscribe(StompFrame frame) { - StompFrame response = null; - try { - String destination = getDestination(frame); + public StompPostReceiptFunction onSubscribe(StompFrame frame) throws ActiveMQStompException { + String destination = getDestination(frame); - String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR); - String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); - String id = frame.getHeader(Stomp.Headers.Subscribe.ID); - String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME); - if (durableSubscriptionName == null) { - durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); - } - RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION)); - boolean noLocal = false; - - if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { - noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); - } - - connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); - } catch (ActiveMQStompException e) { - response = e.getFrame(); + String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR); + String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); + String id = frame.getHeader(Stomp.Headers.Subscribe.ID); + String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME); + if (durableSubscriptionName == null) { + durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); } - - return response; + RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION)); + boolean noLocal = false; + if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { + noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); + } + return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); } public String getDestination(StompFrame request) { @@ -334,7 +340,7 @@ public abstract class VersionedStompFrameHandler { //sends an ERROR frame back to client if possible then close the connection public void onError(ActiveMQStompException e) { - this.connection.sendFrame(e.getFrame()); + this.connection.sendFrame(e.getFrame(), null); connection.destroy(); }