diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java new file mode 100644 index 0000000000..22c8b98385 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import org.jboss.logging.Logger; + +public class SizeAwareMetric { + + public interface AddCallback { + void add(int delta, boolean sizeOnly); + } + + private static final Logger logger = Logger.getLogger(SizeAwareMetric.class); + + private static final int PENDING_FREE = 0, FREE = 1, PENDING_OVER_SIZE = 2, OVER_SIZE = 3, PENDING_OVER_ELEMENTS = 4, OVER_ELEMENTS = 5, NOT_USED = -1; + + private static final AtomicLongFieldUpdater elementsUpdater = AtomicLongFieldUpdater.newUpdater(SizeAwareMetric.class, "elements"); + private volatile long elements; + + private static final AtomicLongFieldUpdater sizeUpdater = AtomicLongFieldUpdater.newUpdater(SizeAwareMetric.class, "size"); + private volatile long size; + + private static final AtomicIntegerFieldUpdater flagUpdater = AtomicIntegerFieldUpdater.newUpdater(SizeAwareMetric.class, "flag"); + private volatile int flag = NOT_USED; + + private long maxElements; + + private long lowerMarkElements; + + private long maxSize; + + private long lowerMarkSize; + + private boolean sizeEnabled = false; + + private boolean elementsEnabled = false; + + private AddCallback onSizeCallback; + + private Runnable overCallback; + + private Runnable underCallback; + + public SizeAwareMetric(long maxSize, long lowerMarkSize, long maxElements, long lowerMarkElements) { + if (lowerMarkSize > maxSize) { + throw new IllegalArgumentException("lowerMark must be <= maxSize"); + } + if (lowerMarkElements > maxElements) { + throw new IllegalArgumentException("lowerMarkElements must be <= maxElements"); + } + this.maxElements = maxElements; + this.lowerMarkElements = lowerMarkElements; + this.maxSize = maxSize; + this.lowerMarkSize = lowerMarkSize; + this.sizeEnabled = maxSize >= 0; + this.elementsEnabled = maxElements >= 0; + } + + public boolean isOver() { + return flag > FREE; // equivalent to flag != FREE && flag != NOT_USED; + } + + public boolean isOverSize() { + return flag == OVER_SIZE; + } + + public boolean isOverElements() { + return flag == OVER_ELEMENTS; + } + + public long getSize() { + return size; + } + + public boolean isElementsEnabled() { + return elementsEnabled; + } + + public SizeAwareMetric setElementsEnabled(boolean elementsEnabled) { + this.elementsEnabled = elementsEnabled; + return this; + } + + public long getElements() { + return elements; + } + + public boolean isSizeEnabled() { + return sizeEnabled; + } + + public SizeAwareMetric setSizeEnabled(boolean sizeEnabled) { + this.sizeEnabled = sizeEnabled; + return this; + } + + public SizeAwareMetric setOnSizeCallback(AddCallback onSize) { + this.onSizeCallback = onSize; + return this; + } + + public SizeAwareMetric setOverCallback(Runnable over) { + this.overCallback = over; + return this; + } + + public SizeAwareMetric setUnderCallback(Runnable under) { + this.underCallback = under; + return this; + } + + protected void over() { + if (overCallback != null) { + try { + overCallback.run(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + } + } + + protected void under() { + if (underCallback != null) { + try { + underCallback.run(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + } + } + + private boolean changeFlag(int expected, int newValue) { + return flagUpdater.compareAndSet(this, expected, newValue); + } + + public final long addSize(final int delta) { + return addSize(delta, false); + } + + public final long addSize(final int delta, final boolean sizeOnly) { + + if (delta == 0) { + if (logger.isDebugEnabled()) { + logger.debug("SizeAwareMetric ignored with size 0", new Exception("trace")); + } + return sizeUpdater.get(this); + } + + changeFlag(NOT_USED, FREE); + + if (onSizeCallback != null) { + try { + onSizeCallback.add(delta, sizeOnly); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + } + + long currentSize = sizeUpdater.addAndGet(this, delta); + + long currentElements; + if (sizeOnly) { + currentElements = elementsUpdater.get(this); + } else if (delta > 0) { + currentElements = elementsUpdater.incrementAndGet(this); + } else { + currentElements = elementsUpdater.decrementAndGet(this); + } + + if (delta > 0) { + checkOver(currentElements, currentSize); + } else { // (delta < 0) + checkUnder(currentElements, currentSize); + } + + return currentSize; + } + + public void setMax(long maxSize, long lowerMarkSize, long maxElements, long lowerMarkElements) { + this.maxSize = maxSize; + this.lowerMarkSize = lowerMarkSize; + this.maxElements = maxElements; + this.lowerMarkElements = lowerMarkElements; + long currentSize = sizeUpdater.get(this); + long currentElements = elementsUpdater.get(this); + checkOver(currentElements, currentSize); + checkUnder(currentElements, currentSize); + } + + private void checkUnder(long currentElements, long currentSize) { + if (sizeEnabled) { + if (currentSize <= lowerMarkSize && changeFlag(OVER_SIZE, PENDING_FREE)) { + // checking if we need to switch from OVER_SIZE to OVER_ELEMENTS, to avoid calling under needless + if (!(elementsEnabled && currentElements >= maxElements && changeFlag(PENDING_FREE, OVER_ELEMENTS))) { + try { + under(); + } finally { + changeFlag(PENDING_FREE, FREE); + } + } + return; // we must return now as we already checked for the elements portion + } + } + + if (elementsEnabled) { + if (currentElements <= lowerMarkElements && changeFlag(OVER_ELEMENTS, PENDING_FREE)) { + // checking if we need to switch from OVER_ELEMENTS to OVER_SIZE, to avoid calling under needless + if (!(sizeEnabled && currentSize >= maxSize && changeFlag(PENDING_FREE, OVER_SIZE))) { + // this is checking the other side from size (elements). + // on this case we are just switching sides and we should not fire under(); + try { + under(); + } finally { + changeFlag(PENDING_FREE, FREE); + } + } + return; // this return here is moot I know. I am keeping it here for consistence with the size portion + // and in case eventually further checks are added on this method, this needs to be reviewed. + } + } + } + + private void checkOver(long currentElements, long currentSize) { + if (sizeEnabled) { + if (currentSize >= maxSize && changeFlag(FREE, PENDING_OVER_SIZE)) { + try { + over(); + } finally { + changeFlag(PENDING_OVER_SIZE, OVER_SIZE); + } + } + } + + if (elementsEnabled && currentElements >= 0) { + if (currentElements >= maxElements && changeFlag(FREE, PENDING_OVER_ELEMENTS)) { + try { + over(); + } finally { + changeFlag(PENDING_OVER_ELEMENTS, OVER_ELEMENTS); + } + } + } + } + + @Override + public String toString() { + return "SizeAwareMetric{" + "elements=" + elements + ", size=" + size + '}'; + } +} diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SizeAwareMetricTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SizeAwareMetricTest.java new file mode 100644 index 0000000000..f2f39a1fbe --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SizeAwareMetricTest.java @@ -0,0 +1,644 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class SizeAwareMetricTest { + + ExecutorService executor; + + private void setupExecutor(int threads) throws Exception { + if (executor == null) { + executor = Executors.newFixedThreadPool(threads); + } + } + + @After + public void shutdownExecutor() throws Exception { + if (executor != null) { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + executor = null; + } + } + + @Test + public void testWithParent() throws Exception { + AtomicBoolean childBoolean = new AtomicBoolean(false); + AtomicBoolean parentBoolean = new AtomicBoolean(false); + + SizeAwareMetric child = new SizeAwareMetric(5, 5, 2, 2); + SizeAwareMetric parent = new SizeAwareMetric(10, 10, 15, 15); + + child.setOnSizeCallback(parent::addSize); + child.setOverCallback(() -> childBoolean.set(true)); + child.setUnderCallback(() -> childBoolean.set(false)); + + parent.setOverCallback(() -> parentBoolean.set(true)); + parent.setUnderCallback(() -> parentBoolean.set(false)); + + for (int i = 0; i < 4; i++) { + child.addSize(1, true); + } + + Assert.assertEquals(4, child.getSize()); + Assert.assertEquals(4, parent.getSize()); + Assert.assertEquals(0, child.getElements()); + Assert.assertEquals(0, parent.getElements()); + Assert.assertFalse(childBoolean.get()); + Assert.assertFalse(parentBoolean.get()); + + child.addSize(1, true); + Assert.assertEquals(5, child.getSize()); + Assert.assertTrue(childBoolean.get()); + Assert.assertFalse(parentBoolean.get()); + Assert.assertEquals(0, child.getElements()); + Assert.assertEquals(0, parent.getElements()); + + + child.addSize(-5, true); + + Assert.assertEquals(0, child.getSize()); + Assert.assertEquals(0, parent.getSize()); + + + for (int i = 0; i < 5; i++) { + child.addSize(1, false); + } + + Assert.assertEquals(5, child.getSize()); + Assert.assertEquals(5, parent.getSize()); + Assert.assertEquals(5, child.getElements()); + Assert.assertEquals(5, parent.getElements()); + Assert.assertTrue(childBoolean.get()); + Assert.assertFalse(parentBoolean.get()); + Assert.assertTrue(child.isOverElements()); + + for (int i = 0; i < 5; i++) { + child.addSize(1, false); + } + + Assert.assertEquals(10, child.getSize()); + Assert.assertEquals(10, parent.getSize()); + Assert.assertEquals(10, child.getElements()); + Assert.assertEquals(10, parent.getElements()); + + Assert.assertTrue(childBoolean.get()); + Assert.assertTrue(parentBoolean.get()); + Assert.assertTrue(child.isOverElements()); + Assert.assertFalse(parent.isOverElements()); + Assert.assertTrue(parent.isOverSize()); + + } + + @Test + public void testMultipleSizeAdd() throws Exception { + final int THREADS = 10; + final int ELEMENTS = 1000; + setupExecutor(THREADS); + + final AtomicInteger errors = new AtomicInteger(0); + + final AtomicInteger globalMetricOverCalls = new AtomicInteger(0); + final AtomicInteger metricOverCalls = new AtomicInteger(0); + final AtomicInteger globalMetricUnderCalls = new AtomicInteger(0); + final AtomicInteger metricUnderCalls = new AtomicInteger(0); + + final AtomicBoolean globalMetricOver = new AtomicBoolean(false); + final AtomicBoolean metricOver = new AtomicBoolean(false); + + SizeAwareMetric metric = new SizeAwareMetric(1000, 500, -1, -1); + SizeAwareMetric globalMetric = new SizeAwareMetric(10000, 500, -1, -1); + + metric.setOnSizeCallback(globalMetric::addSize); + metric.setOverCallback(() -> { + metricOver.set(true); + metricOverCalls.incrementAndGet(); + }); + metric.setUnderCallback(() -> { + metricOver.set(false); + metricUnderCalls.incrementAndGet(); + }); + + globalMetric.setOverCallback(() -> { + globalMetricOver.set(true); + globalMetricOverCalls.incrementAndGet(); + }); + globalMetric.setUnderCallback(() -> { + globalMetricOver.set(false); + globalMetricOverCalls.incrementAndGet(); + }); + + ReusableLatch latchDone = new ReusableLatch(THREADS); + + CyclicBarrier flagStart = new CyclicBarrier(THREADS + 1); + for (int istart = 0; istart < THREADS; istart++) { + executor.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + + for (int iadd = 0; iadd < ELEMENTS; iadd++) { + metric.addSize(1, false); + } + latchDone.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + } + + flagStart.await(10, TimeUnit.SECONDS); + Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS)); + + Assert.assertTrue(metricOver.get()); + Assert.assertTrue(metric.isOver()); + Assert.assertTrue(metric.isOverSize()); + Assert.assertFalse(metric.isOverElements()); + + Assert.assertTrue(globalMetricOver.get()); + Assert.assertTrue(globalMetric.isOver()); + + Assert.assertEquals(1, metricOverCalls.get()); + Assert.assertEquals(1, globalMetricOverCalls.get()); + Assert.assertEquals(0, metricUnderCalls.get()); + Assert.assertEquals(0, globalMetricUnderCalls.get()); + + Assert.assertEquals(ELEMENTS * THREADS, metric.getSize()); + Assert.assertEquals(ELEMENTS * THREADS, metric.getElements()); + Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getSize()); + Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getElements()); + + Assert.assertEquals(0, errors.get()); + + latchDone.setCount(10); + + for (int istart = 0; istart < 10; istart++) { + executor.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + + for (int iadd = 0; iadd < ELEMENTS; iadd++) { + metric.addSize(-1, false); + } + latchDone.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + } + + flagStart.await(10, TimeUnit.SECONDS); + Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS)); + + Assert.assertEquals(0, globalMetric.getSize()); + Assert.assertEquals(0, globalMetric.getElements()); + Assert.assertEquals(0, metric.getSize()); + Assert.assertEquals(0, metric.getElements()); + Assert.assertFalse(globalMetricOver.get()); + Assert.assertFalse(globalMetric.isOver()); + } + + + @Test + public void testMaxElements() throws Exception { + SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 10,10); + + AtomicBoolean over = new AtomicBoolean(false); + metric.setOverCallback(() -> over.set(true)); + metric.setUnderCallback(() -> over.set(false)); + + for (int i = 0; i < 11; i++) { + metric.addSize(10, false); + } + + Assert.assertTrue(over.get()); + Assert.assertEquals(110, metric.getSize()); + Assert.assertEquals(11, metric.getElements()); + metric.addSize(1000, false); + + for (int i = 0; i < 12; i++) { + metric.addSize(-10, false); + } + + Assert.assertFalse(over.get()); + + } + @Test + public void testMaxElementsReleaseNonSizeParentMetric() throws Exception { + SizeAwareMetric metricMaster = new SizeAwareMetric(10000, 500, 10,10); + SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 1000,1000); + + metric.setOnSizeCallback(metricMaster::addSize); + + AtomicBoolean over = new AtomicBoolean(false); + metricMaster.setOverCallback(() -> over.set(true)); + metricMaster.setUnderCallback(() -> over.set(false)); + + for (int i = 0; i < 11; i++) { + metric.addSize(10); + } + metric.addSize(1000, true); + + Assert.assertEquals(1110L, metricMaster.getSize()); + Assert.assertEquals(11, metricMaster.getElements()); + Assert.assertEquals(1110L, metric.getSize()); + Assert.assertEquals(11, metric.getElements()); + Assert.assertTrue(metricMaster.isOverElements()); + Assert.assertFalse(metricMaster.isOverSize()); + Assert.assertFalse(metric.isOverElements()); + Assert.assertFalse(metric.isOverSize()); + Assert.assertTrue(over.get()); + + metric.addSize(-1000, true); + + Assert.assertEquals(110L, metricMaster.getSize()); + Assert.assertEquals(11, metricMaster.getElements()); + Assert.assertTrue(metricMaster.isOverElements()); + Assert.assertFalse(metricMaster.isOverSize()); + Assert.assertTrue(over.get()); + + for (int i = 0; i < 11; i++) { + metric.addSize(-10); + } + + Assert.assertEquals(0L, metricMaster.getSize()); + Assert.assertEquals(0L, metricMaster.getElements()); + Assert.assertFalse(metricMaster.isOver()); + Assert.assertEquals(0L, metric.getSize()); + Assert.assertEquals(0L, metric.getElements()); + Assert.assertFalse(metric.isOver()); + Assert.assertFalse(over.get()); + } + + + @Test + public void testMaxElementsReleaseNonSize() throws Exception { + SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 10,10); + + AtomicBoolean over = new AtomicBoolean(false); + metric.setOverCallback(() -> over.set(true)); + metric.setUnderCallback(() -> over.set(false)); + + for (int i = 0; i < 11; i++) { + metric.addSize(10); + } + metric.addSize(1000, true); + + Assert.assertEquals(1110L, metric.getSize()); + Assert.assertEquals(11, metric.getElements()); + Assert.assertTrue(metric.isOverElements()); + Assert.assertFalse(metric.isOverSize()); + Assert.assertTrue(over.get()); + + metric.addSize(-1000, true); + + Assert.assertEquals(110L, metric.getSize()); + Assert.assertEquals(11, metric.getElements()); + Assert.assertTrue(metric.isOverElements()); + Assert.assertFalse(metric.isOverSize()); + Assert.assertTrue(over.get()); + + for (int i = 0; i < 11; i++) { + metric.addSize(-10); + } + + Assert.assertEquals(0L, metric.getSize()); + Assert.assertEquals(0L, metric.getElements()); + Assert.assertFalse(metric.isOver()); + Assert.assertFalse(over.get()); + } + + @Test + public void testMultipleSizeAddMultipleInstances() throws Exception { + final int THREADS = 10, ELEMENTS = 1000; + setupExecutor(THREADS); + + final AtomicInteger errors = new AtomicInteger(0); + + final AtomicInteger globalMetricOverCalls = new AtomicInteger(0); + final AtomicInteger metricOverCalls = new AtomicInteger(0); + final AtomicInteger globalMetricUnderCalls = new AtomicInteger(0); + final AtomicInteger metricUnderCalls = new AtomicInteger(0); + + final AtomicBoolean globalMetricOver = new AtomicBoolean(false); + final AtomicBoolean[] metricOverArray = new AtomicBoolean[THREADS]; + + SizeAwareMetric globalMetric = new SizeAwareMetric(10000, 500, 0, 0); + + SizeAwareMetric[] metric = new SizeAwareMetric[THREADS]; + + + globalMetric.setOverCallback(() -> { + globalMetricOver.set(true); + globalMetricOverCalls.incrementAndGet(); + }); + globalMetric.setUnderCallback(() -> { + globalMetricOver.set(false); + globalMetricUnderCalls.incrementAndGet(); + }); + + ReusableLatch latchDone = new ReusableLatch(THREADS); + + CyclicBarrier flagStart = new CyclicBarrier(THREADS + 1); + for (int istart = 0; istart < THREADS; istart++) { + final AtomicBoolean metricOver = new AtomicBoolean(false); + final SizeAwareMetric themetric = new SizeAwareMetric(1000, 500, 0, 0); + themetric.setOnSizeCallback(globalMetric::addSize); + themetric.setOverCallback(() -> { + metricOver.set(true); + metricOverCalls.incrementAndGet(); + }); + themetric.setUnderCallback(() -> { + metricOver.set(false); + metricUnderCalls.incrementAndGet(); + }); + metric[istart] = themetric; + metricOverArray[istart] = metricOver; + executor.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + + for (int iadd = 0; iadd < ELEMENTS; iadd++) { + themetric.addSize(1); + } + latchDone.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + } + + flagStart.await(10, TimeUnit.SECONDS); + Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS)); + + for (SizeAwareMetric theMetric : metric) { + Assert.assertTrue(theMetric.isOver()); + Assert.assertEquals(ELEMENTS, theMetric.getSize()); + Assert.assertEquals(ELEMENTS, theMetric.getElements()); + } + + for (AtomicBoolean theBool : metricOverArray) { + Assert.assertTrue(theBool.get()); + } + + Assert.assertTrue(globalMetricOver.get()); + Assert.assertTrue(globalMetric.isOver()); + + Assert.assertEquals(10, metricOverCalls.get()); + Assert.assertEquals(1, globalMetricOverCalls.get()); + Assert.assertEquals(0, metricUnderCalls.get()); + Assert.assertEquals(0, globalMetricUnderCalls.get()); + + Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getSize()); + Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getElements()); + + Assert.assertEquals(0, errors.get()); + + latchDone.setCount(10); + + for (int istart = 0; istart < 10; istart++) { + SizeAwareMetric theMetric = metric[istart]; + executor.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + + for (int iadd = 0; iadd < ELEMENTS; iadd++) { + theMetric.addSize(-1); + } + latchDone.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + } + + flagStart.await(10, TimeUnit.SECONDS); + Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS)); + + Assert.assertEquals(0, globalMetric.getSize()); + Assert.assertEquals(0, globalMetric.getElements()); + for (SizeAwareMetric theMetric : metric) { + Assert.assertEquals(0, theMetric.getSize()); + Assert.assertEquals(0, theMetric.getElements()); + } + + Assert.assertEquals(10, metricOverCalls.get()); + Assert.assertEquals(1, globalMetricOverCalls.get()); + Assert.assertEquals(10, metricUnderCalls.get()); + Assert.assertEquals(1, globalMetricUnderCalls.get()); + Assert.assertFalse(globalMetricOver.get()); + Assert.assertFalse(globalMetric.isOver()); + for (AtomicBoolean theBool : metricOverArray) { + Assert.assertFalse(theBool.get()); + } + } + + @Test + public void testUpdateMax() throws Exception { + AtomicBoolean over = new AtomicBoolean(false); + SizeAwareMetric metric = new SizeAwareMetric(1000, 500, -1, -1); + metric.setOverCallback(() -> over.set(true)); + metric.setUnderCallback(() -> over.set(false)); + + metric.addSize(900); + Assert.assertFalse(over.get()); + + metric.setMax(800, 700, 0, 0); + Assert.assertTrue(over.get()); + + metric.addSize(-200); + Assert.assertFalse(over.get()); + } + + @Test + public void testDisabled() throws Exception { + AtomicBoolean over = new AtomicBoolean(false); + SizeAwareMetric metric = new SizeAwareMetric(0, 0, -1, -1); + metric.setSizeEnabled(false); + metric.setOverCallback(() -> over.set(true)); + metric.addSize(100); + Assert.assertEquals(100, metric.getSize()); + Assert.assertEquals(1, metric.getElements()); + Assert.assertFalse(over.get()); + } + + @Test + public void testMultipleNonSized() throws Exception { + AtomicBoolean over = new AtomicBoolean(false); + final SizeAwareMetric metricMaster = new SizeAwareMetric(0, 0, 1, 1); + SizeAwareMetric metric = new SizeAwareMetric(0, 0, 1, 1); + metric.setSizeEnabled(false); + metric.setOverCallback(() -> over.set(true)); + metric.setOnSizeCallback(metricMaster::addSize); + for (int i = 0; i < 10; i++) { + metric.addSize(10, true); + } + + Assert.assertEquals(100, metricMaster.getSize()); + Assert.assertEquals(100, metric.getSize()); + Assert.assertEquals(0, metricMaster.getElements()); + Assert.assertEquals(0, metric.getElements()); + + for (int i = 0; i < 10; i++) { + metric.addSize(10, false); + } + + Assert.assertEquals(200, metricMaster.getSize()); + Assert.assertEquals(200, metric.getSize()); + Assert.assertEquals(10, metricMaster.getElements()); + Assert.assertEquals(10, metric.getElements()); + } + + @Test + public void testResetNeverUsed() throws Exception { + SizeAwareMetric metric = new SizeAwareMetric(0, 0, 0, 0); + AtomicBoolean over = new AtomicBoolean(false); + + metric.setOverCallback(() -> over.set(true)); + metric.setElementsEnabled(true); + metric.setSizeEnabled(true); + metric.setMax(0, 0, 0, 0); + Assert.assertFalse(over.get()); + Assert.assertFalse(metric.isOver()); + } + + @Test + public void testSwitchSides() throws Exception { + SizeAwareMetric metric = new SizeAwareMetric(2000, 2000, 1, 1); + AtomicBoolean over = new AtomicBoolean(false); + + metric.setOverCallback(() -> over.set(true)); + metric.setUnderCallback(() -> over.set(false)); + metric.setElementsEnabled(true); + metric.setSizeEnabled(true); + + metric.addSize(2500, true); + + Assert.assertTrue(over.get()); + + metric.addSize(1000); + + Assert.assertTrue(metric.isOverSize()); + + metric.addSize(-2500, true); + + // Even though we are free from maxSize, we are still bound by maxElements, it should still be over + Assert.assertTrue(over.get()); + Assert.assertTrue("Switch did not work", metric.isOverElements()); + + Assert.assertEquals(1, metric.getElements()); + Assert.assertEquals(1000, metric.getSize()); + + metric.addSize(5000, true); + + Assert.assertTrue(metric.isOverElements()); + Assert.assertEquals(6000, metric.getSize()); + + metric.addSize(-1000); + + Assert.assertTrue(metric.isOverSize()); + Assert.assertEquals(0, metric.getElements()); + Assert.assertEquals(5000, metric.getSize()); + + metric.addSize(-5000, true); + Assert.assertFalse(metric.isOver()); + Assert.assertEquals(0, metric.getSize()); + Assert.assertEquals(0, metric.getElements()); + } + + + @Test + public void testMTOverAndUnder() throws Exception { + final int THREADS = 10; + final int ELEMENTS = 100; + setupExecutor(THREADS * 2); + + SizeAwareMetric metric = new SizeAwareMetric(1, 1, -1, -1); + AtomicInteger overCounter = new AtomicInteger(0); + AtomicInteger errors = new AtomicInteger(0); + metric.setOverCallback(() -> { + int value = overCounter.incrementAndGet(); + if (value > 1) { + new Exception("Value = " + value).printStackTrace(); + errors.incrementAndGet(); + } + }); + metric.setUnderCallback(() -> { + int value = overCounter.decrementAndGet(); + if (value < 0) { + new Exception("Value = " + value).printStackTrace(); + errors.incrementAndGet(); + } + }); + + CyclicBarrier flagStart = new CyclicBarrier(THREADS * 2); + + CountDownLatch done = new CountDownLatch(THREADS * 2); + + for (int i = 0; i < THREADS; i++) { + executor.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + for (int repeat = 0; repeat < ELEMENTS; repeat++) { + metric.addSize(1); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + done.countDown(); + }); + executor.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + for (int repeat = 0; repeat < ELEMENTS; repeat++) { + metric.addSize(-1); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + done.countDown(); + }); + } + + Assert.assertTrue(done.await(10, TimeUnit.SECONDS)); + + + Assert.assertEquals(0, metric.getSize()); + Assert.assertEquals(0, metric.getElements()); + Assert.assertEquals(0, errors.get()); + + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index cbaa2353f6..d8da748ac9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -499,6 +499,8 @@ public final class ActiveMQDefaultConfiguration { public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2; + public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1; + public static final int DEFAULT_MAX_DISK_USAGE; static { @@ -1495,6 +1497,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_GLOBAL_MAX_SIZE; } + public static long getDefaultMaxGlobalMessages() { + return DEFAULT_GLOBAL_MAX_MESSAGES; + } + public static int getDefaultMaxDiskUsage() { return DEFAULT_MAX_DISK_USAGE; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 71f1286d17..3f6851aeea 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -515,7 +515,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. if (owner != null && memoryEstimate != -1) { // the memory has already been tracked and needs to be updated to reflect the new decoding int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data); - ((PagingStore)owner).addSize(addition); + ((PagingStore)owner).addSize(addition, false); final int updatedEstimate = memoryEstimate + addition; memoryEstimate = updatedEstimate; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 16cfcf60f0..8addffcfbc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1236,6 +1236,10 @@ public interface Configuration { Configuration setGlobalMaxSize(long globalMaxSize); + Configuration setGlobalMaxMessages(long globalMaxMessages); + + long getGlobalMaxMessages(); + int getMaxDiskUsage(); Configuration setMaxDiskUsage(int maxDiskUsage); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 8ab3507971..d8e633bb88 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -348,6 +348,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private Long globalMaxSize; + private Long globalMaxMessages; + private boolean amqpUseCoreSubscriptionNaming = ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(); private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage(); @@ -567,6 +569,21 @@ public class ConfigurationImpl implements Configuration, Serializable { return globalMaxSize; } + + @Override + public ConfigurationImpl setGlobalMaxMessages(long maxMessages) { + this.globalMaxMessages = maxMessages; + return this; + } + + @Override + public long getGlobalMaxMessages() { + if (globalMaxMessages == null) { + this.globalMaxMessages = ActiveMQDefaultConfiguration.getDefaultMaxGlobalMessages(); + } + return globalMaxMessages; + } + @Override public ConfigurationImpl setPersistenceEnabled(final boolean enable) { persistenceEnabled = enable; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c78cf0d24f..8ffc514604 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -213,6 +213,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String MAX_SIZE_BYTES_NODE_NAME = "max-size-bytes"; + private static final String MAX_MESSAGES_NODE_NAME = "max-size-messages"; + private static final String MAX_SIZE_BYTES_REJECT_THRESHOLD_NODE_NAME = "max-size-bytes-reject-threshold"; private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME = "address-full-policy"; @@ -305,6 +307,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String GLOBAL_MAX_SIZE = "global-max-size"; + private static final String GLOBAL_MAX_MESSAGES = "global-max-messages"; + private static final String MAX_DISK_USAGE = "max-disk-usage"; private static final String DISK_SCAN_PERIOD = "disk-scan-period"; @@ -446,6 +450,13 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setGlobalMaxSize(globalMaxSize); } + long globalMaxMessages = getLong(e, GLOBAL_MAX_MESSAGES, -1, Validators.MINUS_ONE_OR_GT_ZERO); + + if (globalMaxSize > 0) { + config.setGlobalMaxMessages(globalMaxMessages); + } + + config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE_OR_MINUS_ONE)); config.setDiskScanPeriod(getInteger(e, DISK_SCAN_PERIOD, config.getDiskScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO)); @@ -1245,6 +1256,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child)); } else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) { addressSettings.setMaxSizeBytes(ByteUtil.convertTextBytes(getTrimmedTextContent(child))); + } else if (MAX_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) { + addressSettings.setMaxSizeMessages(XMLUtil.parseInt(child)); } else if (MAX_SIZE_BYTES_REJECT_THRESHOLD_NODE_NAME.equalsIgnoreCase(name)) { addressSettings.setMaxSizeBytesRejectThreshold(ByteUtil.convertTextBytes(getTrimmedTextContent(child))); } else if (PAGE_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index 7b1f2d2924..d6bd9c261c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -97,9 +97,25 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository /** * Add size at the global count level. - * if totalSize > globalMaxSize it will return true + * if sizeOnly = true, only the size portion is updated. If false both the counter for bytes and number of messages is updated. */ - PagingManager addSize(int size); + PagingManager addSize(int size, boolean sizeOnly); + + /** + * An utility method to call addSize(size, false); + * this is a good fit for an IntConsumer. + */ + default PagingManager addSize(int size) { + return addSize(size, false); + } + + /** + * An utility method to call addSize(size, true); + * this is a good fit for an IntConsumer. + */ + default PagingManager addSizeOnly(int size) { + return addSize(size, true); + } boolean isUsingGlobalSize(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 05d030a06d..9049dfa611 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -128,7 +128,16 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener void stopPaging() throws Exception; - void addSize(int size); + /** * + * + * @param size + * @param sizeOnly if false we won't increment the number of messages. (add references for example) + */ + void addSize(int size, boolean sizeOnly); + + default void addSize(int size) { + addSize(size, false); + } boolean checkMemory(Runnable runnable); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 579d10b295..16aec18601 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.SimpleString; @@ -39,6 +38,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.SizeAwareMetric; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.jboss.logging.Logger; @@ -67,11 +67,18 @@ public final class PagingManagerImpl implements PagingManager { private PagingStoreFactory pagingStoreFactory; - private final AtomicLong globalSizeBytes = new AtomicLong(0); + private volatile boolean globalFull; - private final AtomicLong numberOfMessages = new AtomicLong(0); + private void setGlobalFull(boolean globalFull) { + synchronized (memoryCallback) { + this.globalFull = globalFull; + checkMemoryRelease(); + } + } - private final long maxSize; + private final SizeAwareMetric globalSizeMetric; + + private long maxSize; private volatile boolean cleanupEnabled = true; @@ -91,9 +98,6 @@ public final class PagingManagerImpl implements PagingManager { private final SimpleString managementAddress; - - - // for tests.. not part of the API public void replacePageStoreFactory(PagingStoreFactory factory) { this.pagingStoreFactory = factory; @@ -107,15 +111,32 @@ public final class PagingManagerImpl implements PagingManager { public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository addressSettingsRepository, final long maxSize, + final long maxMessages, final SimpleString managementAddress) { pagingStoreFactory = pagingSPI; this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); this.maxSize = maxSize; + this.globalSizeMetric = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages); + globalSizeMetric.setSizeEnabled(maxSize >= 0); + globalSizeMetric.setElementsEnabled(maxMessages >= 0); + globalSizeMetric.setOverCallback(() -> setGlobalFull(true)); + globalSizeMetric.setUnderCallback(() -> setGlobalFull(false)); this.memoryExecutor = pagingSPI.newExecutor(); this.managementAddress = managementAddress; } + SizeAwareMetric getSizeAwareMetric() { + return globalSizeMetric; + } + + + /** To be used in tests only called through PagingManagerTestAccessor */ + void resetMaxSize(long maxSize, long maxElements) { + this.maxSize = maxSize; + this.globalSizeMetric.setMax(maxSize, maxSize, maxElements, maxElements); + } + @Override public long getMaxSize() { return maxSize; @@ -123,13 +144,13 @@ public final class PagingManagerImpl implements PagingManager { public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository addressSettingsRepository) { - this(pagingSPI, addressSettingsRepository, -1, null); + this(pagingSPI, addressSettingsRepository, -1, -1, null); } public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository addressSettingsRepository, final SimpleString managementAddress) { - this(pagingSPI, addressSettingsRepository, -1, managementAddress); + this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress); } @Override @@ -150,33 +171,23 @@ public final class PagingManagerImpl implements PagingManager { } @Override - public PagingManagerImpl addSize(int size) { - - if (size > 0) { - numberOfMessages.incrementAndGet(); - } else { - numberOfMessages.decrementAndGet(); - } - - long newSize = globalSizeBytes.addAndGet(size); + public PagingManagerImpl addSize(int size, boolean sizeOnly) { + long newSize = globalSizeMetric.addSize(size, sizeOnly); if (newSize < 0) { ActiveMQServerLogger.LOGGER.negativeGlobalAddressSize(newSize); } - if (size < 0) { - checkMemoryRelease(); - } return this; } @Override public long getGlobalSize() { - return globalSizeBytes.get(); + return globalSizeMetric.getSize(); } protected void checkMemoryRelease() { - if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) { + if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) { if (!memoryCallback.isEmpty()) { if (memoryExecutor != null) { memoryExecutor.execute(this::memoryReleased); @@ -277,7 +288,7 @@ public final class PagingManagerImpl implements PagingManager { @Override public boolean isGlobalFull() { - return diskFull || maxSize > 0 && globalSizeBytes.get() >= maxSize; + return diskFull || maxSize > 0 && globalFull; } @Override @@ -449,7 +460,7 @@ public final class PagingManagerImpl implements PagingManager { } public void debug() { - logger.info("size = " + globalSizeBytes + " bytes, messages = " + numberOfMessages); + logger.info("size = " + globalSizeMetric.getSize() + " bytes, messages = " + globalSizeMetric.getElements()); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index e78588943b..f5a03c6396 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -59,6 +58,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.utils.FutureLatch; +import org.apache.activemq.artemis.utils.SizeAwareMetric; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.jboss.logging.Logger; @@ -91,6 +91,8 @@ public class PagingStoreImpl implements PagingStore { private long maxSize; + private long maxMessages; + private int pageSize; private volatile AddressFullMessagePolicy addressFullMessagePolicy; @@ -104,7 +106,9 @@ public class PagingStoreImpl implements PagingStore { private final ArtemisExecutor executor; // Bytes consumed by the queue on the memory - private final AtomicLong sizeInBytes = new AtomicLong(); + private final SizeAwareMetric size; + + private volatile boolean full; private int numberOfPages; @@ -166,6 +170,10 @@ public class PagingStoreImpl implements PagingStore { this.storeName = storeName; + this.size = new SizeAwareMetric(maxSize, maxSize, -1, -1). + setUnderCallback(this::underSized).setOverCallback(this::overSized). + setOnSizeCallback(pagingManager::addSize); + applySetting(addressSettings); this.executor = executor; @@ -189,6 +197,22 @@ public class PagingStoreImpl implements PagingStore { this.usingGlobalMaxSize = pagingManager.isUsingGlobalSize(); } + private void overSized() { + full = true; + } + + private void underSized() { + full = false; + checkReleasedMemory(); + } + + private void configureSizeMetric() { + size.setMax(maxSize, maxSize, maxMessages, maxMessages); + size.setSizeEnabled(maxSize >= 0); + size.setElementsEnabled(maxMessages >= 0); + + } + /** * @param addressSettings */ @@ -196,6 +220,10 @@ public class PagingStoreImpl implements PagingStore { public void applySetting(final AddressSettings addressSettings) { maxSize = addressSettings.getMaxSizeBytes(); + maxMessages = addressSettings.getMaxSizeMessages(); + + configureSizeMetric(); + pageSize = addressSettings.getPageSizeBytes(); addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy(); @@ -247,7 +275,7 @@ public class PagingStoreImpl implements PagingStore { @Override public long getAddressSize() { - return sizeInBytes.get(); + return size.getSize(); } @Override @@ -502,7 +530,7 @@ public class PagingStoreImpl implements PagingStore { final boolean isPaging = this.paging; if (isPaging) { paging = false; - ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize()); } this.cursorProvider.onPageModeCleared(); } finally { @@ -547,7 +575,7 @@ public class PagingStoreImpl implements PagingStore { } } paging = true; - ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize()); return true; } finally { @@ -712,7 +740,7 @@ public class PagingStoreImpl implements PagingStore { return false; } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { - if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= maxSize || pagingManager.isGlobalFull()) { + if (pagingManager.isDiskFull() || maxSize > 0 && this.full || pagingManager.isGlobalFull()) { if (runWhenBlocking != null) { runWhenBlocking.run(); } @@ -724,7 +752,7 @@ public class PagingStoreImpl implements PagingStore { // has been added, but the check to execute was done before the element was added // NOTE! We do not fix this race by locking the whole thing, doing this check provides // MUCH better performance in a highly concurrent environment - if (!pagingManager.isGlobalFull() && (sizeInBytes.get() < maxSize || maxSize < 0)) { + if (!pagingManager.isGlobalFull() && (!full || maxSize < 0)) { // run it now atomicRunWhenAvailable.run(); } else { @@ -736,7 +764,7 @@ public class PagingStoreImpl implements PagingStore { if (pagingManager.isDiskFull()) { ActiveMQServerLogger.LOGGER.blockingDiskFull(address); } else { - ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, size.getSize(), maxSize, pagingManager.getGlobalSize()); } blocking = true; } @@ -754,9 +782,9 @@ public class PagingStoreImpl implements PagingStore { } @Override - public void addSize(final int size) { - boolean globalFull = pagingManager.addSize(size).isGlobalFull(); - long newSize = sizeInBytes.addAndGet(size); + public void addSize(final int size, boolean sizeOnly) { + long newSize = this.size.addSize(size, sizeOnly); + boolean globalFull = pagingManager.isGlobalFull(); if (newSize < 0) { ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString()); @@ -764,13 +792,13 @@ public class PagingStoreImpl implements PagingStore { if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { if (usingGlobalMaxSize && !globalFull || maxSize != -1) { - checkReleaseMemory(globalFull, newSize); + checkReleasedMemory(); } return; } else if (addressFullMessagePolicy == AddressFullMessagePolicy.PAGE) { if (size > 0) { - if (maxSize != -1 && newSize > maxSize || globalFull) { + if (globalFull || full) { startPaging(); } } @@ -781,15 +809,11 @@ public class PagingStoreImpl implements PagingStore { @Override public boolean checkReleasedMemory() { - return checkReleaseMemory(pagingManager.isGlobalFull(), sizeInBytes.get()); - } - - public boolean checkReleaseMemory(boolean globalFull, long newSize) { - if (!blockedViaAddressControl && !globalFull && (newSize < maxSize || maxSize < 0)) { + if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && (!full || maxSize < 0)) { if (!onMemoryFreedRunnables.isEmpty()) { executor.execute(this::memoryReleased); if (blocking) { - ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize); + ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, size.getSize(), maxSize); blocking = false; return true; } @@ -824,7 +848,7 @@ public class PagingStoreImpl implements PagingStore { // Address is full, we just pretend we are paging, and drop the data if (!printedDropMessagesWarning) { printedDropMessagesWarning = true; - ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize()); } return true; } else { @@ -979,7 +1003,7 @@ public class PagingStoreImpl implements PagingStore { @Override public void refUp(Message message, int count) { - this.addSize(MessageReferenceImpl.getMemoryEstimate()); + this.addSize(MessageReferenceImpl.getMemoryEstimate(), true); } @Override @@ -988,7 +1012,7 @@ public class PagingStoreImpl implements PagingStore { // this could happen on paged messages since they are not routed and refUp is never called return; } - this.addSize(-MessageReferenceImpl.getMemoryEstimate()); + this.addSize(-MessageReferenceImpl.getMemoryEstimate(), true); } private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index f76d0b22d0..fafc09417b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2952,7 +2952,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public PagingManager createPagingManager() throws Exception { - return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getManagementAddress()); + return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getGlobalMaxMessages(), configuration.getManagementAddress()); } protected PagingStoreFactory getPagingStoreFactory() throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e91e5e3bb8..520ecc7aeb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -210,7 +210,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final AtomicInteger pagedReferences = new AtomicInteger(0); // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage - private final AtomicInteger queueMemorySize = new AtomicInteger(0); + final AtomicInteger queueMemorySize = new AtomicInteger(0); protected final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending"); @@ -1025,7 +1025,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { int count = messageReference.getMessage().refUp(); if (count == 1) { if (messageReference.getMessage().getOwner() != null) { - ((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate()); + ((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate(), false); } } if (pagingStore != null) { @@ -1038,7 +1038,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { int count = messageReference.getMessage().refDown(); if (count == 0) { if (messageReference.getMessage().getOwner() != null) { - ((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate()); + ((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate(), false); } } if (pagingStore != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index fe21c2ffa5..baa68e7cca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -40,6 +40,8 @@ public class AddressSettings implements Mergeable, Serializable */ public static final long DEFAULT_MAX_SIZE_BYTES = -1; + public static final long DEFAULT_MAX_SIZE_MESSAGES = -1; + public static final AddressFullMessagePolicy DEFAULT_ADDRESS_FULL_MESSAGE_POLICY = AddressFullMessagePolicy.PAGE; public static final int DEFAULT_PAGE_SIZE = 10 * 1024 * 1024; @@ -139,6 +141,8 @@ public class AddressSettings implements Mergeable, Serializable private Long maxSizeBytes = null; + private Long maxSizeMessages = null; + private Integer pageSizeBytes = null; private Integer pageMaxCache = null; @@ -276,6 +280,7 @@ public class AddressSettings implements Mergeable, Serializable public AddressSettings(AddressSettings other) { this.addressFullMessagePolicy = other.addressFullMessagePolicy; this.maxSizeBytes = other.maxSizeBytes; + this.maxSizeMessages = other.maxSizeMessages; this.pageSizeBytes = other.pageSizeBytes; this.pageMaxCache = other.pageMaxCache; this.dropMessagesWhenFull = other.dropMessagesWhenFull; @@ -609,6 +614,15 @@ public class AddressSettings implements Mergeable, Serializable return maxSizeBytes != null ? maxSizeBytes : AddressSettings.DEFAULT_MAX_SIZE_BYTES; } + public long getMaxSizeMessages() { + return maxSizeMessages != null ? maxSizeMessages : AddressSettings.DEFAULT_MAX_SIZE_MESSAGES; + } + + public AddressSettings setMaxSizeMessages(final long maxSizeMessages) { + this.maxSizeMessages = maxSizeMessages; + return this; + } + public AddressSettings setMaxSizeBytes(final long maxSizeBytes) { this.maxSizeBytes = maxSizeBytes; return this; @@ -986,6 +1000,9 @@ public class AddressSettings implements Mergeable, Serializable if (maxSizeBytes == null) { maxSizeBytes = merged.maxSizeBytes; } + if (maxSizeMessages == null) { + maxSizeMessages = merged.maxSizeMessages; + } if (pageMaxCache == null) { pageMaxCache = merged.pageMaxCache; } @@ -1412,6 +1429,10 @@ public class AddressSettings implements Mergeable, Serializable configDeleteDiverts = null; } } + + if (buffer.readableBytes() > 0) { + maxSizeMessages = BufferHelper.readNullableLong(buffer); + } } @Override @@ -1479,7 +1500,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) + BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) + BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()) + - BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp); + BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) + + BufferHelper.sizeOfNullableLong(maxSizeMessages); } @Override @@ -1613,6 +1635,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, enableIngressTimestamp); buffer.writeNullableSimpleString(configDeleteDiverts != null ? new SimpleString(configDeleteDiverts.toString()) : null); + + BufferHelper.writeNullableLong(buffer, maxSizeMessages); } /* (non-Javadoc) @@ -1688,6 +1712,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode()); result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode()); result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode()); + result = prime * result + ((maxSizeMessages == null) ? 0 : maxSizeMessages.hashCode()); return result; } @@ -2054,6 +2079,12 @@ public class AddressSettings implements Mergeable, Serializable } else if (!enableIngressTimestamp.equals(other.enableIngressTimestamp)) return false; + if (maxSizeMessages == null) { + if (other.maxSizeMessages != null) + return false; + } else if (!maxSizeMessages.equals(other.maxSizeMessages)) + return false; + return true; } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 41176866c5..65fe241a52 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -894,6 +894,15 @@ + + + + Number of messages before all addresses will enter into their Full Policy configured. + It works in conjunction with global-max-size, being watever value hits its maximum first. + + + + @@ -3934,6 +3943,14 @@ + + + + the maximum number of messages allowed on the address. This is used in PAGING, BLOCK and FAIL policies. It does not support notations and it is a simple number of messages allowed. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 6ee93210a0..9cad4d7788 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -357,6 +357,35 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { } } + @Test + public void testParseMaxSizeOnAddressSettings() throws Exception { + String configStr = "" + "\n" + "" + "\n" + "123\n" + "" + "\n" + "" + "\n"; + + FileConfigurationParser parser = new FileConfigurationParser(); + ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)); + + Configuration configuration = parser.parseMainConfig(input); + AddressSettings settings = configuration.getAddressesSettings().get("foo"); + Assert.assertEquals(123, settings.getMaxSizeMessages()); + } + + // you should not use K, M notations on address settings max-size-messages + @Test + public void testExpectedErrorOverMaxMessageNotation() throws Exception { + String configStr = "" + "\n" + "" + "\n" + "123K\n" + "" + "\n" + "" + "\n"; + + FileConfigurationParser parser = new FileConfigurationParser(); + ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)); + + boolean valid = false; + try { + parser.parseMainConfig(input); + } catch (Throwable expected) { + valid = true; + } + Assert.assertTrue("Exception expected", valid); + } + private static String bridgePart = "\n" + " \n" + " sausage-factory\n" + @@ -416,6 +445,47 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { Configuration configuration = parser.parseMainConfig(inputStream); } + @Test + public void testMaxSize() throws Exception { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println(""); + stream.println("10M"); + stream.println("1000"); + stream.println(""); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + FileConfigurationParser parser = new FileConfigurationParser(); + Configuration configuration = parser.parseMainConfig(inputStream); + + Assert.assertEquals(10 * 1024 * 1024, configuration.getGlobalMaxSize()); + Assert.assertEquals(1000, configuration.getGlobalMaxMessages()); + } + + @Test + public void testExceptionMaxSize() throws Exception { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println(""); + stream.println("1000K"); + stream.println(""); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + FileConfigurationParser parser = new FileConfigurationParser(); + boolean exceptionHappened = false; + + try { + parser.parseMainConfig(inputStream); + } catch (Throwable e) { + exceptionHappened = true; + } + + Assert.assertTrue("Exception expected parsing notation for global-max-messages", exceptionHappened); + + } + @Test public void testRetentionJournalOptionsDays() throws Exception { testStreamDatesOption("DAYS", TimeUnit.DAYS); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java new file mode 100644 index 0000000000..e33c582cbe --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.paging.impl; + +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.utils.SizeAwareMetric; + +/** Use this class to access things that are meant on test only */ +public class PagingManagerTestAccessor { + + public static void resetMaxSize(PagingManager pagingManager, long maxSize, long maxElements) { + ((PagingManagerImpl)pagingManager).resetMaxSize(maxSize, maxElements); + } + + public static SizeAwareMetric globalSizeAwareMetric(PagingManager pagingManager) { + return ((PagingManagerImpl)pagingManager).getSizeAwareMetric(); + } +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java index 20ff3577d9..d7e0920870 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java @@ -309,7 +309,7 @@ public class FileMoveManagerTest { PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null); - PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress()); + PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress()); managerImpl.start(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTestAccessor.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTestAccessor.java new file mode 100644 index 0000000000..2731e8c4b1 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTestAccessor.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.Queue; + +public class QueueImplTestAccessor { + + public static int getQueueMemorySize(Queue queue) { + return ((QueueImpl)queue).queueMemorySize.get(); + } + +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index aa67e3e104..722d24b3e1 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -65,6 +65,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { addressSettingsToMerge.setMaxDeliveryAttempts(1000); addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); addressSettingsToMerge.setMaxSizeBytes(1001); + addressSettingsToMerge.setMaxSizeMessages(101); addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002); addressSettingsToMerge.setRedeliveryDelay(1003); addressSettingsToMerge.setPageSizeBytes(1004); @@ -76,6 +77,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(addressSettings.getExpiryAddress(), exp); Assert.assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000); Assert.assertEquals(addressSettings.getMaxSizeBytes(), 1001); + Assert.assertEquals(addressSettings.getMaxSizeMessages(), 101); Assert.assertEquals(addressSettings.getMessageCounterHistoryDayLimit(), 1002); Assert.assertEquals(addressSettings.getRedeliveryDelay(), 1003); Assert.assertEquals(addressSettings.getPageSizeBytes(), 1004); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index e602872f71..271d711126 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -22,6 +22,9 @@ import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor; +import org.apache.activemq.artemis.core.server.impl.QueueImplTestAccessor; import org.apache.activemq.artemis.json.JsonArray; import org.apache.activemq.artemis.json.JsonObject; import javax.management.Notification; @@ -30,7 +33,6 @@ import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularDataSupport; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.text.DateFormat; import java.util.Arrays; @@ -43,7 +45,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -71,13 +72,11 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; -import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; @@ -1579,16 +1578,9 @@ public class QueueControlTest extends ManagementTestBase { final LocalQueueBinding binding2 = (LocalQueueBinding) server.getPostOffice().getBinding(dlq); Queue q2 = binding2.getQueue(); - Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize"); - queueMemorySizeField.setAccessible(true); - - //Get memory size counters to verify - AtomicInteger queueMemorySize1 = (AtomicInteger) queueMemorySizeField.get(q); - AtomicInteger queueMemorySize2 = (AtomicInteger) queueMemorySizeField.get(q2); - //Verify that original queue has a memory size greater than 0 and DLQ is 0 - assertTrue(queueMemorySize1.get() > 0); - assertEquals(0, queueMemorySize2.get()); + assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0); + assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2)); // Read and rollback all messages to DLQ ClientConsumer clientConsumer = session.createConsumer(qName); @@ -1603,8 +1595,8 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertNull(clientConsumer.receiveImmediate()); //Verify that original queue has a memory size of 0 and DLQ is greater than 0 after rollback - assertEquals(0, queueMemorySize1.get()); - assertTrue(queueMemorySize2.get() > 0); + assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q)); + assertTrue(QueueImplTestAccessor.getQueueMemorySize(q2) > 0); QueueControl dlqQueueControl = createManagementControl(dla, dlq); assertMessageMetrics(dlqQueueControl, numMessagesToTest, durable); @@ -1616,8 +1608,8 @@ public class QueueControlTest extends ManagementTestBase { assertMessageMetrics(dlqQueueControl, 0, durable); //Verify that original queue has a memory size of greater than 0 and DLQ is 0 after move - assertTrue(queueMemorySize1.get() > 0); - assertEquals(0, queueMemorySize2.get()); + assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0); + assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2)); // .. and that the messages is now on the original queue once more. for (int i = 0; i < numMessagesToTest; i++) { @@ -1630,8 +1622,8 @@ public class QueueControlTest extends ManagementTestBase { clientConsumer.close(); //Verify that original queue and DLQ have a memory size of 0 - assertEquals(0, queueMemorySize1.get()); - assertEquals(0, queueMemorySize2.get()); + assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q)); + assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2)); } /** @@ -1662,17 +1654,12 @@ public class QueueControlTest extends ManagementTestBase { final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue); Queue q = binding.getQueue(); - Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize"); - queueMemorySizeField.setAccessible(true); - - //Get memory size counters to verify - AtomicInteger queueMemorySize = (AtomicInteger) queueMemorySizeField.get(q); QueueControl queueControl = createManagementControl(address, queue); assertMessageMetrics(queueControl, 1, durable); //verify memory usage is greater than 0 - Assert.assertTrue(queueMemorySize.get() > 0); + Assert.assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0); // moved all messages to otherQueue int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString()); @@ -1680,7 +1667,7 @@ public class QueueControlTest extends ManagementTestBase { assertMessageMetrics(queueControl, 0, durable); //verify memory usage is 0 after move - Assert.assertEquals(0, queueMemorySize.get()); + Assert.assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q)); // check there is no message to consume from queue consumeMessages(0, session, queue); @@ -1873,7 +1860,7 @@ public class QueueControlTest extends ManagementTestBase { } final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue); - Assert.assertEquals(10, binding.getQueue().getMessageCount()); + Wait.assertEquals(10, () -> binding.getQueue().getMessageCount()); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(10, queueControl.getMessageCount()); @@ -2206,10 +2193,7 @@ public class QueueControlTest extends ManagementTestBase { final int MESSAGE_SIZE = 1024 * 3; // 3k - // reset maxSize for Paging mode - Field maxSizField = PagingManagerImpl.class.getDeclaredField("maxSize"); - maxSizField.setAccessible(true); - maxSizField.setLong(server.getPagingManager(), 10240); + PagingManagerTestAccessor.resetMaxSize(server.getPagingManager(), 10240, 0); clearDataRecreateServerDirs(); SimpleString address = RandomUtil.randomSimpleString(); @@ -2230,7 +2214,7 @@ public class QueueControlTest extends ManagementTestBase { bb.put(getSamplebyte(j)); } - final int numberOfMessages = 8000; + final int numberOfMessages = 100; ClientMessage message; for (int i = 0; i < numberOfMessages; i++) { message = session.createMessage(true); @@ -2250,10 +2234,7 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(numberOfMessages, removedMatchedMessagesCount); assertMessageMetrics(queueControl, 0, durable); - Field queueMemoprySizeField = QueueImpl.class.getDeclaredField("queueMemorySize"); - queueMemoprySizeField.setAccessible(true); - AtomicInteger queueMemorySize = (AtomicInteger) queueMemoprySizeField.get(queue); - Assert.assertEquals(0, queueMemorySize.get()); + Assert.assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(queue)); session.deleteQueue(queueName); } @@ -3826,12 +3807,9 @@ public class QueueControlTest extends ManagementTestBase { QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, queueControl.getScheduledCount()); - Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize"); - queueMemorySizeField.setAccessible(true); final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue); Queue q = binding.getQueue(); - AtomicInteger queueMemorySize1 = (AtomicInteger) queueMemorySizeField.get(q); - assertEquals(0, queueMemorySize1.get()); + assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q)); ClientProducer producer = session.createProducer(address); ClientMessage message = session.createMessage(durable); @@ -3843,7 +3821,7 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(0, queueControl.getMessageCount()); //Verify that original queue has a memory size of 0 - assertEquals(0, queueMemorySize1.get()); + assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q)); session.deleteQueue(queue); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java index 8587985847..31445391a9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.ActiveMQConnection; +import org.junit.Assert; import org.junit.Test; /** @@ -58,7 +59,8 @@ public class ProducerFlowControlTest extends ProducerFlowControlBaseTest { pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2"); assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); - msg = (TextMessage) consumer.receive(); + msg = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(msg); assertEquals("Message 2", msg.getText()); msg.acknowledge(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java new file mode 100644 index 0000000000..25bbffc79b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.SizeAwareMetric; +import org.junit.Assert; +import org.junit.Test; + +public class MaxMessagesPagingTest extends ActiveMQTestBase { + + protected static final int PAGE_MAX = 100 * 1024; + protected static final int PAGE_SIZE = 10 * 1024; + protected ActiveMQServer server; + + @Test + public void testGlobalMaxMessages() throws Exception { + final SimpleString ADDRESS = new SimpleString("testGlobalMaxMessages"); + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + final int PAGE_MAX = 100 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX); + server.getConfiguration().setGlobalMaxMessages(100); + server.start(); + + ServerLocator locator = createInVMNonHALocator(); + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(null, null, false, true, true, false, 0); + + session.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS)); + + ClientProducer producer = session.createProducer(ADDRESS); + + ClientMessage message = null; + + int messageSize = 1 * 1024; + + for (int i = 0; i < 30; i++) { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[messageSize]); + + for (int j = 1; j <= messageSize; j++) { + message.getBodyBuffer().writeInt(j); + } + + producer.send(message); + } + + Queue queue = server.locateQueue(ADDRESS); + + Wait.assertTrue(queue.getPagingStore()::isPaging); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + + session.start(); + + for (int i = 0; i < 30; i++) { + message = consumer.receive(5000); + Assert.assertNotNull(message); + message.acknowledge(); + } + session.commit(); + + Wait.assertFalse(queue.getPagingStore()::isPaging); + + messageSize = 1; + + for (int i = 0; i < 102; i++) { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[messageSize]); + + producer.send(message); + if (i == 30) { + // it should not kick based on the size of the address + Wait.assertFalse(queue.getPagingStore()::isPaging); + } + } + + Wait.assertTrue(queue.getPagingStore()::isPaging); + + SizeAwareMetric globalSizeMetric = PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager()); + + // this is validating the test is actually validating paging after over elements + Assert.assertTrue(globalSizeMetric.isOverElements()); + Assert.assertFalse(globalSizeMetric.isOverSize()); + + session.close(); + } + + @Test + public void testGlobalMaxMessagesMultipleQueues() throws Exception { + final String baseAddress = "testGlobal"; + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + final int PAGE_MAX = 100 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX); + server.getConfiguration().setGlobalMaxMessages(50); + server.start(); + + ServerLocator locator = createInVMNonHALocator(); + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(null, null, false, true, true, false, 0); + + for (int adr = 1; adr <= 2; adr++) { + SimpleString address = new SimpleString(baseAddress + adr); + session.createQueue(new QueueConfiguration(address).setAddress(address)); + } + + for (int adr = 1; adr <= 2; adr++) { + SimpleString address = new SimpleString(baseAddress + adr); + ClientProducer producer = session.createProducer(address); + + ClientMessage message = null; + + for (int i = 0; i < 30; i++) { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[1]); + + producer.send(message); + } + + Queue queue = server.locateQueue(address); + if (adr == 1) { + // first address is fine + Wait.assertFalse(queue.getPagingStore()::isPaging); + } else { + // on second one we reach max + Wait.assertTrue(queue.getPagingStore()::isPaging); + } + } + + SizeAwareMetric globalSizeMetric = PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager()); + + // this is validating the test is actually validating paging after over elements + Assert.assertTrue(globalSizeMetric.isOverElements()); + Assert.assertFalse(globalSizeMetric.isOverSize()); + + session.close(); + } + + @Test + public void testMaxOnAddress() throws Exception { + final String baseAddress = "testMaxOnAddress"; + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + final int PAGE_MAX = 100 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX); + server.getConfiguration().setGlobalMaxMessages(50); + server.start(); + + AddressSettings max5 = new AddressSettings().setMaxSizeMessages(5); + server.getAddressSettingsRepository().addMatch("#", max5); + + ServerLocator locator = createInVMNonHALocator(); + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(null, null, false, true, true, false, 0); + + for (int adr = 1; adr <= 2; adr++) { + SimpleString address = new SimpleString(baseAddress + adr); + session.createQueue(new QueueConfiguration(address).setAddress(address)); + } + + for (int adr = 1; adr <= 1; adr++) { + SimpleString address = new SimpleString(baseAddress + adr); + ClientProducer producer = session.createProducer(address); + + ClientMessage message = null; + + Queue queue = server.locateQueue(address); + for (int i = 0; i < 10; i++) { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[1]); + + producer.send(message); + if (i >= 4) { + Wait.assertTrue(queue.getPagingStore()::isPaging); + } else { + Assert.assertFalse(queue.getPagingStore().isPaging()); + } + } + } + } + + @Test + public void testMaxOnAddressHitGlobal() throws Exception { + final String baseAddress = "testMaxOnAddress"; + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + final int PAGE_MAX = 100 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX); + server.getConfiguration().setGlobalMaxMessages(40); + server.start(); + + AddressSettings max5 = new AddressSettings().setMaxSizeMessages(5).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + server.getAddressSettingsRepository().addMatch("#", max5); + + ServerLocator locator = createInVMNonHALocator(); + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(null, null, false, true, true, false, 0); + + for (int adr = 0; adr < 11; adr++) { + SimpleString address = new SimpleString(baseAddress + adr); + session.createQueue(new QueueConfiguration(address).setAddress(address)); + ClientProducer producer = session.createProducer(address); + + ClientMessage message = null; + + Queue queue = server.locateQueue(address); + for (int i = 0; i < 4; i++) { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[1]); + + producer.send(message); + } + + if (adr >= 9) { + Wait.assertTrue(queue.getPagingStore()::isPaging); + } else { + Assert.assertFalse(queue.getPagingStore().isPaging()); + } + } + } + +} \ No newline at end of file diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index ad39c3a3ed..d15251452e 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -404,6 +404,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { } + @Override + public void addSize(int size, boolean sizeOnly) { + } + @Override public void addSize(int size) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 7188c1c2e2..d30a5aa7c6 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -910,7 +910,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { } @Override - public FakePagingManager addSize(int s) { + public FakePagingManager addSize(int s, boolean sizeOnly) { globalSize.addAndGet(s); return this; } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java index a7ee37e48e..bed3ea5ad6 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java @@ -81,7 +81,7 @@ public class FakePagingManager implements PagingManager { } @Override - public FakePagingManager addSize(int size) { + public FakePagingManager addSize(int size, boolean sizeOnly) { return this; }