ARTEMIS-3065 AMQP Anonymous producer would eventually block

This commit is contained in:
Clebert Suconic 2021-01-12 14:52:33 -05:00
parent 65d6efa2ed
commit 78c0792989
6 changed files with 226 additions and 3 deletions

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@ -138,6 +139,10 @@ public class AMQPSessionCallback implements SessionCallback {
return coreMessageObjectPools;
}
public ProtonProtocolManager getProtocolManager() {
return manager;
}
@Override
public boolean isWritable(ReadyListener callback, Object protocolContext) {
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) protocolContext;
@ -603,10 +608,17 @@ public class AMQPSessionCallback implements SessionCallback {
public void flow(final SimpleString address,
Runnable runnable) {
try {
PagingManager pagingManager = manager.getServer().getPagingManager();
if (address == null) {
pagingManager.checkMemory(runnable);
PagingManager pagingManager = manager.getServer().getPagingManager();
if (manager != null && manager.getServer() != null &&
manager.getServer().getAddressSettingsRepository() != null &&
manager.getServer().getAddressSettingsRepository().getMatch("#").getAddressFullMessagePolicy().equals(AddressFullMessagePolicy.PAGE)) {
// If it's paging, we only check for disk full
pagingManager.checkStorage(runnable);
} else {
pagingManager.checkMemory(runnable);
}
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
if (store != null) {

View File

@ -66,4 +66,9 @@ public interface ActiveMQAMQPProtocolLogger extends BasicLogger {
"\nSuccess on Server AMQP Connection {0} on {1} after {2} retries" +
"\n*******************************************************************************************************************************\n", format = Message.Format.MESSAGE_FORMAT)
void successReconnect(String name, String hostAndPort, int currentRetry);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 111004, value = "AddressFullPolicy clash on an anonymous producer between destinations {0}(addressFullPolicy={1}) and {2}(addressFullPolicy={3}). This could lead to semantic inconsistencies on your clients. Notice you could have other instances of this scenario however this message will only be logged once. log.debug output would show all instances of this event.",
format = Message.Format.MESSAGE_FORMAT)
void incompatibleAddressFullMessagePolicy(String oldAddress, String oldPolicy, String newAddress, String newPolicy);
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -34,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
@ -57,6 +59,9 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
protected SimpleString address;
protected SimpleString lastAddress;
protected AddressFullMessagePolicy lastAddressPolicy;
protected boolean addressAlreadyClashed = false;
protected final Runnable spiFlow = this::sessionSPIFlow;
@ -174,6 +179,10 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
protected void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) {
try {
if (sessionSPI != null) {
// message could be null on unit tests (Mocking from ProtonServerReceiverContextTest).
if (address == null && message != null) {
validateAddressOnAnonymousLink(message);
}
sessionSPI.serverSend(this, tx, receiver, delivery, address, routingContext, message);
}
} catch (Exception e) {
@ -184,6 +193,23 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
}
}
private void validateAddressOnAnonymousLink(AMQPMessage message) throws Exception {
SimpleString newAddress = message.getAddressSimpleString();
if (newAddress != null && !newAddress.equals(lastAddress)) {
AddressFullMessagePolicy currentPolicy = sessionSPI.getProtocolManager().getServer().getPagingManager().getPageStore(newAddress).getAddressFullMessagePolicy();
if (lastAddressPolicy != null && lastAddressPolicy != currentPolicy) {
if (!addressAlreadyClashed) {
addressAlreadyClashed = true; // print the warning only once
ActiveMQAMQPProtocolLogger.LOGGER.incompatibleAddressFullMessagePolicy(lastAddress.toString(), "" + lastAddressPolicy, newAddress.toString(), "" + currentPolicy);
}
log.debug("AddressFullPolicy clash between " + lastAddress + "/" + lastAddressPolicy + " and " + newAddress + "/" + lastAddressPolicy);
}
this.lastAddress = message.getAddressSimpleString();
this.lastAddressPolicy = currentPolicy;
}
}
public void deliveryFailed(Delivery delivery, Receiver receiver, Exception e) {
connection.runNow(() -> {
DeliveryState deliveryState = determineDeliveryState(((Source) receiver.getSource()),
@ -262,7 +288,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
connection.requireInHandler();
// Use the SessionSPI to allocate producer credits, or default, always allocate credit.
if (sessionSPI != null) {
sessionSPI.flow(address, creditRunnable);
sessionSPI.flow(address != null ? address : lastAddress, creditRunnable);
} else {
creditRunnable.run();
}

View File

@ -121,4 +121,13 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
*/
void checkMemory(Runnable runWhenAvailable);
/**
* Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
* @param runWhenAvailable
*/
default void checkStorage(Runnable runWhenAvailable) {
checkMemory(runWhenAvailable);
}
}

View File

@ -120,6 +120,10 @@ public final class PagingManagerImpl implements PagingManager {
this.managementAddress = managementAddress;
}
public long getMaxSize() {
return maxSize;
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
this(pagingSPI, addressSettingsRepository, -1, null);
@ -255,6 +259,14 @@ public final class PagingManagerImpl implements PagingManager {
runWhenAvailable.run();
}
@Override
public void checkStorage(Runnable runWhenAvailable) {
if (diskFull) {
memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
return;
}
runWhenAvailable.run();
}
private void memoryReleased() {
Runnable runnable;

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class AnonymousProducerPageTest extends ActiveMQTestBase {
protected final String protocol;
@Parameterized.Parameters(name = "protocol={0}")
public static Collection getParams() {
return Arrays.asList(new Object[][]{
{"AMQP"}, {"CORE"}, {"OPENWIRE"}});
}
public AnonymousProducerPageTest(String protocol) {
this.protocol = protocol;
}
protected static final String NETTY_ACCEPTOR = "netty-acceptor";
ActiveMQServer server;
@Before
public void createServer() throws Exception {
int port = 5672;
this.server = addServer(this.createServer(true, true));
server.getConfiguration().getAddressesSettings().clear();
server.getConfiguration().addAddressesSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
server.getConfiguration().getAcceptorConfigurations().clear();
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port));
server.getConfiguration().setName(getName());
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port);
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port);
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port);
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory());
server.getConfiguration().setJMXManagementEnabled(true);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.start();
}
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols());
HashMap<String, Object> amqpParams = new HashMap<>();
configureAMQPAcceptorParameters(amqpParams);
TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams);
configureAMQPAcceptorParameters(tc);
return tc;
}
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
// None by default
}
protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
// None by default
}
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test(timeout = 60_000)
public void testNotBlockOnGlobalMaxSizeWithAnonymousProduce() throws Exception {
final int MSG_SIZE = 1000;
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < MSG_SIZE; i++) {
builder.append('0');
}
final String data = builder.toString();
final int MSG_COUNT = 3_000;
// sending size to explode max size
server.getPagingManager().addSize((int) ((PagingManagerImpl) server.getPagingManager()).getMaxSize());
server.getPagingManager().addSize(100_000);
server.getAddressSettingsRepository().addMatch("blockedQueue", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672");
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
javax.jms.Queue jmsQueue = session.createQueue(getName());
for (int i = 0; i < MSG_COUNT; i++) {
TextMessage message = session.createTextMessage(data);
producer.send(jmsQueue, message);
}
session.commit();
if (protocol.equals("AMQP")) {
// this is only valid for AMQP
validatePolicyMismatch(session, producer);
}
connection.close();
}
private void validatePolicyMismatch(Session session, MessageProducer producer) throws JMSException {
AssertionLoggerHandler.startCapture();
try {
producer.send(session.createQueue("blockedQueue"), session.createMessage());
session.commit();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ111004"));
AssertionLoggerHandler.clear();
producer.send(session.createQueue(getName()), session.createMessage());
session.commit();
Assert.assertFalse("The warning should be printed only once", AssertionLoggerHandler.findText("AMQ111004"));
} finally {
AssertionLoggerHandler.stopCapture();
}
}
}