This commit is contained in:
Clebert Suconic 2019-12-19 17:54:21 -05:00
commit 0c9d1cf0cd
9 changed files with 164 additions and 4 deletions

View File

@ -1,3 +1,3 @@
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<acceptor name="amqp">tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>

View File

@ -81,13 +81,15 @@ ${jdbc}
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
</acceptors>

View File

@ -196,6 +196,13 @@ public class AMQPMessage extends RefCountMessage {
this.coreMessageObjectPools = null;
}
/** This will return application properties without attempting to decode it.
* That means, if applicationProperties were never parsed before, this will return null, even if there is application properties.
* This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */
public ApplicationProperties getDecodedApplicationProperties() {
return applicationProperties;
}
// Access to the AMQP message data using safe copies freshly decoded from the current
// AMQP message data stored in this message wrapper. Changes to these values cannot
// be used to influence the underlying AMQP message data, the standard AMQPMessage API

View File

@ -69,6 +69,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
private boolean amqpDuplicateDetection = true;
private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
private int initialRemoteMaxFrameSize = 4 * 1024;
@ -106,6 +108,15 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
}
public boolean isAmqpDuplicateDetection() {
return amqpDuplicateDetection;
}
public ProtonProtocolManager setAmqpDuplicateDetection(boolean duplicateDetection) {
this.amqpDuplicateDetection = duplicateDetection;
return this;
}
@Override
public ProtocolManagerFactory<AmqpInterceptor> getFactory() {
return factory;

View File

@ -74,7 +74,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final AMQPSessionCallback sessionSPI;
RoutingContext routingContext = new RoutingContextImpl(null);
final RoutingContext routingContext;
/**
* We create this AtomicRunnable with setRan.
@ -126,6 +126,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
AMQPSessionContext protonSession,
Receiver receiver) {
this.connection = connection;
this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
this.protonSession = protonSession;
this.receiver = receiver;
this.sessionSPI = sessionSPI;

View File

@ -1048,7 +1048,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
applyExpiryDelay(message, address);
if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
return RoutingStatus.DUPLICATED_ID;
}

View File

@ -82,5 +82,9 @@ public interface RoutingContext {
boolean isReusable(Message message, int version);
boolean isDuplicateDetection();
RoutingContext setDuplicateDetection(boolean value);
}

View File

@ -59,6 +59,19 @@ public final class RoutingContextImpl implements RoutingContext {
private final Executor executor;
private boolean duplicateDetection = true;
@Override
public boolean isDuplicateDetection() {
return duplicateDetection;
}
@Override
public RoutingContextImpl setDuplicateDetection(boolean value) {
this.duplicateDetection = value;
return this;
}
public RoutingContextImpl(final Transaction transaction) {
this(transaction, null);
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.junit.Assert;
import org.junit.Test;
/**
* This test will validate if application properties are only parsed when there's a filter.
* You have to disable duplciate-detction to have this optimization working.
*/
public class PropertyParseOptimizationTest extends AmqpClientTestSupport {
private String noDuplicateAcceptor = new String("tcp://localhost:" + (AMQP_PORT + 8));
@Override
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
server.getConfiguration().addAcceptorConfiguration("noDuplicate", noDuplicateAcceptor + "?protocols=AMQP;useEpoll=false;amqpDuplicateDetection=false");
}
@Test(timeout = 60000)
public void testSendWithPropertiesAndFilter() throws Exception {
int size = 10 * 1024;
AmqpClient client = createAmqpClient(new URI(noDuplicateAcceptor));
AmqpConnection connection = client.createConnection();
addConnection(connection);
connection.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
Queue queueView = getProxyToQueue(getQueueName());
LinkedListIterator<MessageReference> iterator = queueView.iterator();
iterator.close();
assertNotNull(queueView);
assertEquals(0, queueView.getMessageCount());
session.begin();
for (int m = 0; m < 10; m++) {
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setApplicationProperty("odd", (m % 2 == 0));
byte[] bytes = new byte[size];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) 'z';
}
message.setBytes(bytes);
sender.send(message);
}
session.commit();
Wait.assertEquals(10, queueView::getMessageCount);
while (iterator.hasNext()) {
MessageReference reference = iterator.next();
AMQPMessage message = (AMQPMessage) reference.getMessage();
// if this rule fails it means something is requesting the application property for the message,
// or the optimization is gone.
// be careful if you decide to change this rule, as we have done extensive test to get this in place.
Assert.assertNull("Application properties on AMQP Messages should only be parsed over demand", message.getDecodedApplicationProperties());
}
AmqpReceiver receiver = session.createReceiver(getQueueName(), "odd=true");
receiver.flow(10);
for (int i = 0; i < 5; i++) {
AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(msgReceived);
Data body = (Data) msgReceived.getWrappedMessage().getBody();
byte[] bodyArray = body.getValue().getArray();
for (int bI = 0; bI < size; bI++) {
Assert.assertEquals((byte) 'z', bodyArray[bI]);
}
msgReceived.accept(true);
}
receiver.flow(1);
Assert.assertNull(receiver.receiveNoWait());
Wait.assertEquals(5, queueView::getMessageCount);
receiver.close();
connection.close();
}
}