From 1d4139f27844c9ca4cc9b5cbdc4576806b778cea Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 15 May 2023 11:47:10 -0400 Subject: [PATCH] ARTEMIS-3809 Fixing LargeMessageController timeout Say packets stopped flowing, and you are calling receive(0); The Controller should check if packets didn't arrive and throw a proper exception --- .../impl/LargeMessageControllerImpl.java | 2 + .../impl/LargeMessageControllerImplTest.java | 83 +++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImplTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index a98ca81e93..71838744ac 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -303,6 +303,8 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); } } + + packetAdded = false; } checkException(); diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImplTest.java new file mode 100644 index 0000000000..a2b486d4b1 --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImplTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LargeMessageControllerImplTest { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test(timeout = 10_000) + public void testControllerTimeout() throws Exception { + + ClientConsumerInternal consumerMock = Mockito.mock(ClientConsumerInternal.class); + LargeMessageControllerImpl largeMessageController = new LargeMessageControllerImpl(consumerMock, 1000, 1); + + AtomicInteger bytesWritten = new AtomicInteger(); + AtomicInteger errors = new AtomicInteger(0); + final byte filling = (byte) 3; + + largeMessageController.setOutputStream(new OutputStream() { + @Override + public void write(int b) throws IOException { + bytesWritten.incrementAndGet(); + if (b != filling) { + errors.incrementAndGet(); + } + } + }); + + largeMessageController.addPacket(createBytes(100, filling), 1000, true); + + int exceptionCounter = 0; + try { + largeMessageController.waitCompletion(0); + } catch (ActiveMQException e) { + logger.debug(e.getMessage(), e); + exceptionCounter++; + } + + Assert.assertEquals(1, exceptionCounter); + + largeMessageController.addPacket(createBytes(900, filling), 1000, false); + + Assert.assertTrue(largeMessageController.waitCompletion(0)); + Assert.assertEquals(1000, bytesWritten.get()); + Assert.assertEquals(0, errors.get()); + } + + byte[] createBytes(int size, byte fill) { + byte[] bytes = new byte[size]; + Arrays.fill(bytes, fill); + return bytes; + } + + +}