ARTEMIS-1512 Fix race condition with Subscribe receipt

This commit is contained in:
Martyn Taylor 2017-11-10 12:35:46 +00:00 committed by Clebert Suconic
parent a5c443afb0
commit 120fc190c6
5 changed files with 77 additions and 49 deletions

View File

@ -343,7 +343,7 @@ public final class StompConnection implements RemotingConnection {
StompFrame frame = frameHandler.createStompFrame(Stomp.Responses.ERROR);
frame.addHeader(Stomp.Headers.Error.MESSAGE, me.getMessage());
sendFrame(frame);
sendFrame(frame, null);
destroyed = true;
}
@ -552,7 +552,7 @@ public final class StompConnection implements RemotingConnection {
}
if (reply != null) {
sendFrame(reply);
sendFrame(reply, null);
}
if (Stomp.Commands.DISCONNECT.equals(cmd)) {
@ -560,8 +560,8 @@ public final class StompConnection implements RemotingConnection {
}
}
public void sendFrame(StompFrame frame) {
manager.sendReply(this, frame);
public void sendFrame(StompFrame frame, StompPostReceiptFunction function) {
manager.sendReply(this, frame, function);
}
public boolean validateUser(final String login, final String pass, final RemotingConnection connection) {
@ -660,7 +660,7 @@ public final class StompConnection implements RemotingConnection {
}
}
void subscribe(String destination,
StompPostReceiptFunction subscribe(String destination,
String selector,
String ack,
String id,
@ -694,7 +694,7 @@ public final class StompConnection implements RemotingConnection {
}
try {
manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
@ -743,7 +743,7 @@ public final class StompConnection implements RemotingConnection {
//send a ping stomp frame
public void ping(StompFrame pingFrame) {
manager.sendReply(this, pingFrame);
manager.sendReply(this, pingFrame, null);
}
public void physicalSend(StompFrame frame) throws Exception {

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.stomp;
public interface StompPostReceiptFunction {
void afterReceipt();
}

View File

@ -33,9 +33,9 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession;
@ -281,7 +281,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
});
}
public void sendReply(final StompConnection connection, final StompFrame frame) {
public void sendReply(final StompConnection connection, final StompFrame frame, final StompPostReceiptFunction function) {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
@ -295,7 +295,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
@Override
public void done() {
send(connection, frame);
if (frame != null) {
send(connection, frame);
}
if (function != null) {
function.afterReceipt();
}
}
});
}
@ -361,7 +367,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
// Inner classes -------------------------------------------------
public void subscribe(StompConnection connection,
public StompPostReceiptFunction subscribe(StompConnection connection,
String subscriptionID,
String durableSubscriptionName,
String destination,
@ -375,7 +381,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack);
return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack);
}
public void unsubscribe(StompConnection connection,

View File

@ -242,7 +242,7 @@ public class StompSession implements SessionCallback {
StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);
frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
frame.setBody("consumer with ID " + consumerId + " disconnected by server");
connection.sendFrame(frame);
connection.sendFrame(frame, null);
}
}
@ -278,7 +278,7 @@ public class StompSession implements SessionCallback {
session.commit();
}
public void addSubscription(long consumerID,
public StompPostReceiptFunction addSubscription(long consumerID,
String subscriptionID,
String clientID,
String durableSubscriptionName,
@ -287,13 +287,11 @@ public class StompSession implements SessionCallback {
String ack) throws Exception {
SimpleString queueName = SimpleString.toSimpleString(destination);
boolean pubSub = false;
int receiveCredits = consumerCredits;
if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
receiveCredits = -1;
}
final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST)) {
boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (topic) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {
@ -308,15 +306,12 @@ public class StompSession implements SessionCallback {
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), true, false);
}
session.createConsumer(consumerID, queueName, null, false, false, receiveCredits);
} else {
session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, receiveCredits);
}
final ServerConsumer consumer = topic ? session.createConsumer(consumerID, queueName, null, false, false, 0) : session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
subscriptions.put(consumerID, subscription);
session.start();
return () -> consumer.receiveCredits(receiveCredits);
}
public boolean unsubscribe(String id, String durableSubscriptionName, String clientID) throws Exception {

View File

@ -96,7 +96,7 @@ public abstract class VersionedStompFrameHandler {
} else if (Stomp.Commands.ABORT.equals(request.getCommand())) {
response = onAbort(request);
} else if (Stomp.Commands.SUBSCRIBE.equals(request.getCommand())) {
response = onSubscribe(request);
return handleSubscribe(request);
} else if (Stomp.Commands.UNSUBSCRIBE.equals(request.getCommand())) {
response = onUnsubscribe(request);
} else if (Stomp.Commands.CONNECT.equals(request.getCommand())) {
@ -120,6 +120,21 @@ public abstract class VersionedStompFrameHandler {
return response;
}
private StompFrame handleSubscribe(StompFrame request) {
StompFrame response = null;
try {
StompPostReceiptFunction postProcessFunction = onSubscribe(request);
response = postprocess(request);
if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) {
response.addHeader(Stomp.Headers.Response.RECEIPT_ID, request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
}
connection.sendFrame(response, postProcessFunction);
return null;
} catch (ActiveMQStompException e) {
return e.getFrame();
}
}
public abstract StompFrame onConnect(StompFrame frame);
public abstract StompFrame onDisconnect(StompFrame frame);
@ -240,31 +255,22 @@ public abstract class VersionedStompFrameHandler {
return response;
}
public StompFrame onSubscribe(StompFrame frame) {
StompFrame response = null;
try {
String destination = getDestination(frame);
public StompPostReceiptFunction onSubscribe(StompFrame frame) throws ActiveMQStompException {
String destination = getDestination(frame);
String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
String id = frame.getHeader(Stomp.Headers.Subscribe.ID);
String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
} catch (ActiveMQStompException e) {
response = e.getFrame();
String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
String id = frame.getHeader(Stomp.Headers.Subscribe.ID);
String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
return response;
RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
}
public String getDestination(StompFrame request) {
@ -334,7 +340,7 @@ public abstract class VersionedStompFrameHandler {
//sends an ERROR frame back to client if possible then close the connection
public void onError(ActiveMQStompException e) {
this.connection.sendFrame(e.getFrame());
this.connection.sendFrame(e.getFrame(), null);
connection.destroy();
}