diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index c48a9802eb..86bbf18c6b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -195,7 +195,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) { toRemove.add(entry.getKey()); } - if (state.isWill() && !state.isAttached() && state.isFailed() && !state.isWillSent() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { + if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { state.getSession().sendWillMessage(); } } @@ -203,7 +203,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ for (String key : toRemove) { logger.debugf("Removing state for session: %s", key); MQTTSessionState state = removeSessionState(key); - if (state != null && state.isWill() && !state.isAttached() && state.isFailed() && !state.isWillSent()) { + if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { state.getSession().sendWillMessage(); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index db4295b100..ba4edc8ad5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -256,32 +256,30 @@ public class MQTTSession { } public void sendWillMessage() { - try { - MqttProperties properties; - if (state.getWillUserProperties() == null) { - properties = MqttProperties.NO_PROPERTIES; - } else { - properties = new MqttProperties(); - for (MqttProperties.MqttProperty userProperty : state.getWillUserProperties()) { - properties.add(userProperty); + if (state.getWillStatus() == MQTTSessionState.WillStatus.NOT_SENT) { + try { + state.setWillStatus(MQTTSessionState.WillStatus.SENDING); + MqttProperties properties; + if (state.getWillUserProperties() == null) { + properties = MqttProperties.NO_PROPERTIES; + } else { + properties = new MqttProperties(); + for (MqttProperties.MqttProperty userProperty : state.getWillUserProperties()) { + properties.add(userProperty); + } } + MqttPublishMessage publishMessage = MqttMessageBuilders.publish().messageId(0).qos(MqttQoS.valueOf(state.getWillQoSLevel())).retained(state.isWillRetain()).topicName(state.getWillTopic()).payload(state.getWillMessage() == null ? new EmptyByteBuf(PooledByteBufAllocator.DEFAULT) : state.getWillMessage()).properties(properties).build(); + logger.debugf("%s sending will message: %s", this, publishMessage); + getMqttPublishManager().sendToQueue(publishMessage, true); + state.setWillStatus(MQTTSessionState.WillStatus.SENT); + state.setWillMessage(null); + } catch (ActiveMQSecurityException e) { + state.setWillStatus(MQTTSessionState.WillStatus.NOT_SENT); + MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage()); + } catch (Exception e) { + state.setWillStatus(MQTTSessionState.WillStatus.NOT_SENT); + MQTTLogger.LOGGER.errorSendingWillMessage(e); } - MqttPublishMessage publishMessage = MqttMessageBuilders.publish() - .messageId(0) - .qos(MqttQoS.valueOf(state.getWillQoSLevel())) - .retained(state.isWillRetain()) - .topicName(state.getWillTopic()) - .payload(state.getWillMessage() == null ? new EmptyByteBuf(PooledByteBufAllocator.DEFAULT) : state.getWillMessage()) - .properties(properties) - .build(); - logger.debugf("%s sending will message: %s", this, publishMessage); - getMqttPublishManager().sendToQueue(publishMessage, true); - state.setWillSent(true); - state.setWillMessage(null); - } catch (ActiveMQSecurityException e) { - MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage()); - } catch (Exception e) { - MQTTLogger.LOGGER.errorSendingWillMessage(e); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 90b2e5233f..b3524229ac 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -78,7 +78,7 @@ public class MQTTSessionState { private List<? extends MqttProperties.MqttProperty> willUserProperties; - private boolean willSent = false; + private WillStatus willStatus = WillStatus.NOT_SENT; private boolean failed = false; @@ -113,7 +113,7 @@ public class MQTTSessionState { willMessage.clear(); willMessage = null; } - willSent = false; + willStatus = WillStatus.NOT_SENT; failed = false; willDelayInterval = 0; willRetain = false; @@ -282,12 +282,12 @@ public class MQTTSessionState { return willUserProperties; } - public boolean isWillSent() { - return willSent; + public WillStatus getWillStatus() { + return willStatus; } - public void setWillSent(boolean willSent) { - this.willSent = willSent; + public void setWillStatus(WillStatus willStatus) { + this.willStatus = willStatus; } public boolean isFailed() { @@ -448,4 +448,34 @@ public class MQTTSessionState { public String toString() { return "MQTTSessionState[" + "session=" + session + ", clientId='" + clientId + "', subscriptions=" + subscriptions + ", messageRefStore=" + messageRefStore + ", addressMessageMap=" + addressMessageMap + ", pubRec=" + pubRec + ", attached=" + attached + ", outboundStore=" + outboundStore + ", disconnectedTime=" + disconnectedTime + ", sessionExpiryInterval=" + clientSessionExpiryInterval + ", isWill=" + isWill + ", willMessage=" + willMessage + ", willTopic='" + willTopic + "', willQoSLevel=" + willQoSLevel + ", willRetain=" + willRetain + ", willDelayInterval=" + willDelayInterval + ", failed=" + failed + ", maxPacketSize=" + clientMaxPacketSize + ']'; } + + public enum WillStatus { + NOT_SENT, SENT, SENDING; + + public byte getStatus() { + switch (this) { + case NOT_SENT: + return 0; + case SENT: + return 1; + case SENDING: + return 2; + default: + return -1; + } + } + + public static WillStatus getStatus(byte status) { + switch (status) { + case 0: + return NOT_SENT; + case 1: + return SENT; + case 2: + return SENDING; + default: + return null; + } + } + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index b04bbd7c2f..f3257b33a1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -251,6 +251,13 @@ public final class PagingManagerImpl implements PagingManager { } } + /* + * For tests only! + */ + protected void setDiskFull(boolean diskFull) { + this.diskFull = diskFull; + } + @Override public boolean isDiskFull() { return diskFull; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java new file mode 100644 index 0000000000..5aa5a5117e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.paging.impl; + +public class PagingManagerImplAccessor { + public static void setDiskFull(PagingManagerImpl pagingManager, boolean diskFull) { + pagingManager.setDiskFull(diskFull); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index c2af69ede9..2485f19d2a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -27,10 +27,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.utils.Wait; import org.eclipse.paho.mqttv5.client.MqttAsyncClient; @@ -225,4 +229,21 @@ public class MQTT5Test extends MQTT5TestSupport { server.start(); org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null, 3000, 10); } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testRecursiveWill() throws Exception { + AssertionLoggerHandler.startCapture(true); + try { + final String WILL_QUEUE = "will"; + server.createQueue(new QueueConfiguration(WILL_QUEUE).setRoutingType(RoutingType.ANYCAST)); + PagingManagerImplAccessor.setDiskFull((PagingManagerImpl) server.getPagingManager(), true); + MqttClient client = createPahoClient("willGenerator"); + MqttConnectionOptions options = new MqttConnectionOptionsBuilder().will(WILL_QUEUE, new MqttMessage(RandomUtil.randomBytes())).build(); + client.connect(options); + client.disconnectForcibly(0, 0, false); + Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ229119"), 2000, 100); + } finally { + AssertionLoggerHandler.stopCapture(); + } + } }