ARTEMIS-2581 Duplicate Detection on AMQP should be configurable
There is an optimization in AMQP, that properties are only parsed over demand.
It happens that after ARTEMIS-2294 (commit 2dd0671698
),
every send would request for the property on the message, resulting the properties to always be parsed upon send.
Even when there's no use of application properties.
This commit is contained in:
parent
73156cb79d
commit
13278cc45f
|
@ -1,3 +1,3 @@
|
|||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
<acceptor name="amqp">tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
|
|
|
@ -81,13 +81,15 @@ ${jdbc}
|
|||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
|
||||
as duplicate detection requires applicationProperties to be parsed on the server. -->
|
||||
|
||||
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
|
||||
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
|
||||
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
|
||||
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
|
||||
</acceptors>
|
||||
|
||||
|
|
|
@ -196,6 +196,13 @@ public class AMQPMessage extends RefCountMessage {
|
|||
this.coreMessageObjectPools = null;
|
||||
}
|
||||
|
||||
/** This will return application properties without attempting to decode it.
|
||||
* That means, if applicationProperties were never parsed before, this will return null, even if there is application properties.
|
||||
* This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */
|
||||
public ApplicationProperties getDecodedApplicationProperties() {
|
||||
return applicationProperties;
|
||||
}
|
||||
|
||||
// Access to the AMQP message data using safe copies freshly decoded from the current
|
||||
// AMQP message data stored in this message wrapper. Changes to these values cannot
|
||||
// be used to influence the underlying AMQP message data, the standard AMQPMessage API
|
||||
|
|
|
@ -69,6 +69,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
|||
|
||||
private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
|
||||
|
||||
private boolean amqpDuplicateDetection = true;
|
||||
|
||||
private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
|
||||
|
||||
private int initialRemoteMaxFrameSize = 4 * 1024;
|
||||
|
@ -106,6 +108,15 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
|||
|
||||
}
|
||||
|
||||
public boolean isAmqpDuplicateDetection() {
|
||||
return amqpDuplicateDetection;
|
||||
}
|
||||
|
||||
public ProtonProtocolManager setAmqpDuplicateDetection(boolean duplicateDetection) {
|
||||
this.amqpDuplicateDetection = duplicateDetection;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtocolManagerFactory<AmqpInterceptor> getFactory() {
|
||||
return factory;
|
||||
|
|
|
@ -74,7 +74,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
|
||||
protected final AMQPSessionCallback sessionSPI;
|
||||
|
||||
RoutingContext routingContext = new RoutingContextImpl(null);
|
||||
final RoutingContext routingContext;
|
||||
|
||||
/**
|
||||
* We create this AtomicRunnable with setRan.
|
||||
|
@ -126,6 +126,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
AMQPSessionContext protonSession,
|
||||
Receiver receiver) {
|
||||
this.connection = connection;
|
||||
this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
|
||||
this.protonSession = protonSession;
|
||||
this.receiver = receiver;
|
||||
this.sessionSPI = sessionSPI;
|
||||
|
|
|
@ -1048,7 +1048,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
|
||||
applyExpiryDelay(message, address);
|
||||
|
||||
if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
|
||||
if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
|
||||
return RoutingStatus.DUPLICATED_ID;
|
||||
}
|
||||
|
||||
|
|
|
@ -82,5 +82,9 @@ public interface RoutingContext {
|
|||
|
||||
boolean isReusable(Message message, int version);
|
||||
|
||||
boolean isDuplicateDetection();
|
||||
|
||||
RoutingContext setDuplicateDetection(boolean value);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -59,6 +59,19 @@ public final class RoutingContextImpl implements RoutingContext {
|
|||
|
||||
private final Executor executor;
|
||||
|
||||
private boolean duplicateDetection = true;
|
||||
|
||||
@Override
|
||||
public boolean isDuplicateDetection() {
|
||||
return duplicateDetection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RoutingContextImpl setDuplicateDetection(boolean value) {
|
||||
this.duplicateDetection = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RoutingContextImpl(final Transaction transaction) {
|
||||
this(transaction, null);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This test will validate if application properties are only parsed when there's a filter.
|
||||
* You have to disable duplciate-detction to have this optimization working.
|
||||
*/
|
||||
public class PropertyParseOptimizationTest extends AmqpClientTestSupport {
|
||||
|
||||
private String noDuplicateAcceptor = new String("tcp://localhost:" + (AMQP_PORT + 8));
|
||||
|
||||
@Override
|
||||
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
|
||||
server.getConfiguration().addAcceptorConfiguration("noDuplicate", noDuplicateAcceptor + "?protocols=AMQP;useEpoll=false;amqpDuplicateDetection=false");
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendWithPropertiesAndFilter() throws Exception {
|
||||
int size = 10 * 1024;
|
||||
AmqpClient client = createAmqpClient(new URI(noDuplicateAcceptor));
|
||||
AmqpConnection connection = client.createConnection();
|
||||
addConnection(connection);
|
||||
connection.connect();
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getQueueName());
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
|
||||
LinkedListIterator<MessageReference> iterator = queueView.iterator();
|
||||
|
||||
iterator.close();
|
||||
assertNotNull(queueView);
|
||||
assertEquals(0, queueView.getMessageCount());
|
||||
|
||||
session.begin();
|
||||
for (int m = 0; m < 10; m++) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setDurable(true);
|
||||
message.setApplicationProperty("odd", (m % 2 == 0));
|
||||
byte[] bytes = new byte[size];
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
bytes[i] = (byte) 'z';
|
||||
}
|
||||
|
||||
message.setBytes(bytes);
|
||||
sender.send(message);
|
||||
}
|
||||
session.commit();
|
||||
|
||||
Wait.assertEquals(10, queueView::getMessageCount);
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
|
||||
MessageReference reference = iterator.next();
|
||||
AMQPMessage message = (AMQPMessage) reference.getMessage();
|
||||
// if this rule fails it means something is requesting the application property for the message,
|
||||
// or the optimization is gone.
|
||||
// be careful if you decide to change this rule, as we have done extensive test to get this in place.
|
||||
Assert.assertNull("Application properties on AMQP Messages should only be parsed over demand", message.getDecodedApplicationProperties());
|
||||
}
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName(), "odd=true");
|
||||
receiver.flow(10);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(msgReceived);
|
||||
Data body = (Data) msgReceived.getWrappedMessage().getBody();
|
||||
byte[] bodyArray = body.getValue().getArray();
|
||||
for (int bI = 0; bI < size; bI++) {
|
||||
Assert.assertEquals((byte) 'z', bodyArray[bI]);
|
||||
}
|
||||
msgReceived.accept(true);
|
||||
}
|
||||
|
||||
receiver.flow(1);
|
||||
Assert.assertNull(receiver.receiveNoWait());
|
||||
Wait.assertEquals(5, queueView::getMessageCount);
|
||||
|
||||
receiver.close();
|
||||
|
||||
connection.close();
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue