ARTEMIS-3720 Max Number Of Messages on a deciding policy for paging (global and address-settings)

This commit is contained in:
Clebert Suconic 2021-12-16 13:37:03 -05:00 committed by clebertsuconic
parent 2b5a25a106
commit 0966f2d7a0
26 changed files with 1581 additions and 101 deletions

View File

@ -0,0 +1,267 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.jboss.logging.Logger;
public class SizeAwareMetric {
public interface AddCallback {
void add(int delta, boolean sizeOnly);
}
private static final Logger logger = Logger.getLogger(SizeAwareMetric.class);
private static final int PENDING_FREE = 0, FREE = 1, PENDING_OVER_SIZE = 2, OVER_SIZE = 3, PENDING_OVER_ELEMENTS = 4, OVER_ELEMENTS = 5, NOT_USED = -1;
private static final AtomicLongFieldUpdater<SizeAwareMetric> elementsUpdater = AtomicLongFieldUpdater.newUpdater(SizeAwareMetric.class, "elements");
private volatile long elements;
private static final AtomicLongFieldUpdater<SizeAwareMetric> sizeUpdater = AtomicLongFieldUpdater.newUpdater(SizeAwareMetric.class, "size");
private volatile long size;
private static final AtomicIntegerFieldUpdater<SizeAwareMetric> flagUpdater = AtomicIntegerFieldUpdater.newUpdater(SizeAwareMetric.class, "flag");
private volatile int flag = NOT_USED;
private long maxElements;
private long lowerMarkElements;
private long maxSize;
private long lowerMarkSize;
private boolean sizeEnabled = false;
private boolean elementsEnabled = false;
private AddCallback onSizeCallback;
private Runnable overCallback;
private Runnable underCallback;
public SizeAwareMetric(long maxSize, long lowerMarkSize, long maxElements, long lowerMarkElements) {
if (lowerMarkSize > maxSize) {
throw new IllegalArgumentException("lowerMark must be <= maxSize");
}
if (lowerMarkElements > maxElements) {
throw new IllegalArgumentException("lowerMarkElements must be <= maxElements");
}
this.maxElements = maxElements;
this.lowerMarkElements = lowerMarkElements;
this.maxSize = maxSize;
this.lowerMarkSize = lowerMarkSize;
this.sizeEnabled = maxSize >= 0;
this.elementsEnabled = maxElements >= 0;
}
public boolean isOver() {
return flag > FREE; // equivalent to flag != FREE && flag != NOT_USED;
}
public boolean isOverSize() {
return flag == OVER_SIZE;
}
public boolean isOverElements() {
return flag == OVER_ELEMENTS;
}
public long getSize() {
return size;
}
public boolean isElementsEnabled() {
return elementsEnabled;
}
public SizeAwareMetric setElementsEnabled(boolean elementsEnabled) {
this.elementsEnabled = elementsEnabled;
return this;
}
public long getElements() {
return elements;
}
public boolean isSizeEnabled() {
return sizeEnabled;
}
public SizeAwareMetric setSizeEnabled(boolean sizeEnabled) {
this.sizeEnabled = sizeEnabled;
return this;
}
public SizeAwareMetric setOnSizeCallback(AddCallback onSize) {
this.onSizeCallback = onSize;
return this;
}
public SizeAwareMetric setOverCallback(Runnable over) {
this.overCallback = over;
return this;
}
public SizeAwareMetric setUnderCallback(Runnable under) {
this.underCallback = under;
return this;
}
protected void over() {
if (overCallback != null) {
try {
overCallback.run();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
protected void under() {
if (underCallback != null) {
try {
underCallback.run();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
private boolean changeFlag(int expected, int newValue) {
return flagUpdater.compareAndSet(this, expected, newValue);
}
public final long addSize(final int delta) {
return addSize(delta, false);
}
public final long addSize(final int delta, final boolean sizeOnly) {
if (delta == 0) {
if (logger.isDebugEnabled()) {
logger.debug("SizeAwareMetric ignored with size 0", new Exception("trace"));
}
return sizeUpdater.get(this);
}
changeFlag(NOT_USED, FREE);
if (onSizeCallback != null) {
try {
onSizeCallback.add(delta, sizeOnly);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
long currentSize = sizeUpdater.addAndGet(this, delta);
long currentElements;
if (sizeOnly) {
currentElements = elementsUpdater.get(this);
} else if (delta > 0) {
currentElements = elementsUpdater.incrementAndGet(this);
} else {
currentElements = elementsUpdater.decrementAndGet(this);
}
if (delta > 0) {
checkOver(currentElements, currentSize);
} else { // (delta < 0)
checkUnder(currentElements, currentSize);
}
return currentSize;
}
public void setMax(long maxSize, long lowerMarkSize, long maxElements, long lowerMarkElements) {
this.maxSize = maxSize;
this.lowerMarkSize = lowerMarkSize;
this.maxElements = maxElements;
this.lowerMarkElements = lowerMarkElements;
long currentSize = sizeUpdater.get(this);
long currentElements = elementsUpdater.get(this);
checkOver(currentElements, currentSize);
checkUnder(currentElements, currentSize);
}
private void checkUnder(long currentElements, long currentSize) {
if (sizeEnabled) {
if (currentSize <= lowerMarkSize && changeFlag(OVER_SIZE, PENDING_FREE)) {
// checking if we need to switch from OVER_SIZE to OVER_ELEMENTS, to avoid calling under needless
if (!(elementsEnabled && currentElements >= maxElements && changeFlag(PENDING_FREE, OVER_ELEMENTS))) {
try {
under();
} finally {
changeFlag(PENDING_FREE, FREE);
}
}
return; // we must return now as we already checked for the elements portion
}
}
if (elementsEnabled) {
if (currentElements <= lowerMarkElements && changeFlag(OVER_ELEMENTS, PENDING_FREE)) {
// checking if we need to switch from OVER_ELEMENTS to OVER_SIZE, to avoid calling under needless
if (!(sizeEnabled && currentSize >= maxSize && changeFlag(PENDING_FREE, OVER_SIZE))) {
// this is checking the other side from size (elements).
// on this case we are just switching sides and we should not fire under();
try {
under();
} finally {
changeFlag(PENDING_FREE, FREE);
}
}
return; // this return here is moot I know. I am keeping it here for consistence with the size portion
// and in case eventually further checks are added on this method, this needs to be reviewed.
}
}
}
private void checkOver(long currentElements, long currentSize) {
if (sizeEnabled) {
if (currentSize >= maxSize && changeFlag(FREE, PENDING_OVER_SIZE)) {
try {
over();
} finally {
changeFlag(PENDING_OVER_SIZE, OVER_SIZE);
}
}
}
if (elementsEnabled && currentElements >= 0) {
if (currentElements >= maxElements && changeFlag(FREE, PENDING_OVER_ELEMENTS)) {
try {
over();
} finally {
changeFlag(PENDING_OVER_ELEMENTS, OVER_ELEMENTS);
}
}
}
}
@Override
public String toString() {
return "SizeAwareMetric{" + "elements=" + elements + ", size=" + size + '}';
}
}

View File

@ -0,0 +1,644 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class SizeAwareMetricTest {
ExecutorService executor;
private void setupExecutor(int threads) throws Exception {
if (executor == null) {
executor = Executors.newFixedThreadPool(threads);
}
}
@After
public void shutdownExecutor() throws Exception {
if (executor != null) {
executor.shutdownNow();
Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
executor = null;
}
}
@Test
public void testWithParent() throws Exception {
AtomicBoolean childBoolean = new AtomicBoolean(false);
AtomicBoolean parentBoolean = new AtomicBoolean(false);
SizeAwareMetric child = new SizeAwareMetric(5, 5, 2, 2);
SizeAwareMetric parent = new SizeAwareMetric(10, 10, 15, 15);
child.setOnSizeCallback(parent::addSize);
child.setOverCallback(() -> childBoolean.set(true));
child.setUnderCallback(() -> childBoolean.set(false));
parent.setOverCallback(() -> parentBoolean.set(true));
parent.setUnderCallback(() -> parentBoolean.set(false));
for (int i = 0; i < 4; i++) {
child.addSize(1, true);
}
Assert.assertEquals(4, child.getSize());
Assert.assertEquals(4, parent.getSize());
Assert.assertEquals(0, child.getElements());
Assert.assertEquals(0, parent.getElements());
Assert.assertFalse(childBoolean.get());
Assert.assertFalse(parentBoolean.get());
child.addSize(1, true);
Assert.assertEquals(5, child.getSize());
Assert.assertTrue(childBoolean.get());
Assert.assertFalse(parentBoolean.get());
Assert.assertEquals(0, child.getElements());
Assert.assertEquals(0, parent.getElements());
child.addSize(-5, true);
Assert.assertEquals(0, child.getSize());
Assert.assertEquals(0, parent.getSize());
for (int i = 0; i < 5; i++) {
child.addSize(1, false);
}
Assert.assertEquals(5, child.getSize());
Assert.assertEquals(5, parent.getSize());
Assert.assertEquals(5, child.getElements());
Assert.assertEquals(5, parent.getElements());
Assert.assertTrue(childBoolean.get());
Assert.assertFalse(parentBoolean.get());
Assert.assertTrue(child.isOverElements());
for (int i = 0; i < 5; i++) {
child.addSize(1, false);
}
Assert.assertEquals(10, child.getSize());
Assert.assertEquals(10, parent.getSize());
Assert.assertEquals(10, child.getElements());
Assert.assertEquals(10, parent.getElements());
Assert.assertTrue(childBoolean.get());
Assert.assertTrue(parentBoolean.get());
Assert.assertTrue(child.isOverElements());
Assert.assertFalse(parent.isOverElements());
Assert.assertTrue(parent.isOverSize());
}
@Test
public void testMultipleSizeAdd() throws Exception {
final int THREADS = 10;
final int ELEMENTS = 1000;
setupExecutor(THREADS);
final AtomicInteger errors = new AtomicInteger(0);
final AtomicInteger globalMetricOverCalls = new AtomicInteger(0);
final AtomicInteger metricOverCalls = new AtomicInteger(0);
final AtomicInteger globalMetricUnderCalls = new AtomicInteger(0);
final AtomicInteger metricUnderCalls = new AtomicInteger(0);
final AtomicBoolean globalMetricOver = new AtomicBoolean(false);
final AtomicBoolean metricOver = new AtomicBoolean(false);
SizeAwareMetric metric = new SizeAwareMetric(1000, 500, -1, -1);
SizeAwareMetric globalMetric = new SizeAwareMetric(10000, 500, -1, -1);
metric.setOnSizeCallback(globalMetric::addSize);
metric.setOverCallback(() -> {
metricOver.set(true);
metricOverCalls.incrementAndGet();
});
metric.setUnderCallback(() -> {
metricOver.set(false);
metricUnderCalls.incrementAndGet();
});
globalMetric.setOverCallback(() -> {
globalMetricOver.set(true);
globalMetricOverCalls.incrementAndGet();
});
globalMetric.setUnderCallback(() -> {
globalMetricOver.set(false);
globalMetricOverCalls.incrementAndGet();
});
ReusableLatch latchDone = new ReusableLatch(THREADS);
CyclicBarrier flagStart = new CyclicBarrier(THREADS + 1);
for (int istart = 0; istart < THREADS; istart++) {
executor.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int iadd = 0; iadd < ELEMENTS; iadd++) {
metric.addSize(1, false);
}
latchDone.countDown();
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
});
}
flagStart.await(10, TimeUnit.SECONDS);
Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS));
Assert.assertTrue(metricOver.get());
Assert.assertTrue(metric.isOver());
Assert.assertTrue(metric.isOverSize());
Assert.assertFalse(metric.isOverElements());
Assert.assertTrue(globalMetricOver.get());
Assert.assertTrue(globalMetric.isOver());
Assert.assertEquals(1, metricOverCalls.get());
Assert.assertEquals(1, globalMetricOverCalls.get());
Assert.assertEquals(0, metricUnderCalls.get());
Assert.assertEquals(0, globalMetricUnderCalls.get());
Assert.assertEquals(ELEMENTS * THREADS, metric.getSize());
Assert.assertEquals(ELEMENTS * THREADS, metric.getElements());
Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getSize());
Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getElements());
Assert.assertEquals(0, errors.get());
latchDone.setCount(10);
for (int istart = 0; istart < 10; istart++) {
executor.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int iadd = 0; iadd < ELEMENTS; iadd++) {
metric.addSize(-1, false);
}
latchDone.countDown();
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
});
}
flagStart.await(10, TimeUnit.SECONDS);
Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS));
Assert.assertEquals(0, globalMetric.getSize());
Assert.assertEquals(0, globalMetric.getElements());
Assert.assertEquals(0, metric.getSize());
Assert.assertEquals(0, metric.getElements());
Assert.assertFalse(globalMetricOver.get());
Assert.assertFalse(globalMetric.isOver());
}
@Test
public void testMaxElements() throws Exception {
SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 10,10);
AtomicBoolean over = new AtomicBoolean(false);
metric.setOverCallback(() -> over.set(true));
metric.setUnderCallback(() -> over.set(false));
for (int i = 0; i < 11; i++) {
metric.addSize(10, false);
}
Assert.assertTrue(over.get());
Assert.assertEquals(110, metric.getSize());
Assert.assertEquals(11, metric.getElements());
metric.addSize(1000, false);
for (int i = 0; i < 12; i++) {
metric.addSize(-10, false);
}
Assert.assertFalse(over.get());
}
@Test
public void testMaxElementsReleaseNonSizeParentMetric() throws Exception {
SizeAwareMetric metricMaster = new SizeAwareMetric(10000, 500, 10,10);
SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 1000,1000);
metric.setOnSizeCallback(metricMaster::addSize);
AtomicBoolean over = new AtomicBoolean(false);
metricMaster.setOverCallback(() -> over.set(true));
metricMaster.setUnderCallback(() -> over.set(false));
for (int i = 0; i < 11; i++) {
metric.addSize(10);
}
metric.addSize(1000, true);
Assert.assertEquals(1110L, metricMaster.getSize());
Assert.assertEquals(11, metricMaster.getElements());
Assert.assertEquals(1110L, metric.getSize());
Assert.assertEquals(11, metric.getElements());
Assert.assertTrue(metricMaster.isOverElements());
Assert.assertFalse(metricMaster.isOverSize());
Assert.assertFalse(metric.isOverElements());
Assert.assertFalse(metric.isOverSize());
Assert.assertTrue(over.get());
metric.addSize(-1000, true);
Assert.assertEquals(110L, metricMaster.getSize());
Assert.assertEquals(11, metricMaster.getElements());
Assert.assertTrue(metricMaster.isOverElements());
Assert.assertFalse(metricMaster.isOverSize());
Assert.assertTrue(over.get());
for (int i = 0; i < 11; i++) {
metric.addSize(-10);
}
Assert.assertEquals(0L, metricMaster.getSize());
Assert.assertEquals(0L, metricMaster.getElements());
Assert.assertFalse(metricMaster.isOver());
Assert.assertEquals(0L, metric.getSize());
Assert.assertEquals(0L, metric.getElements());
Assert.assertFalse(metric.isOver());
Assert.assertFalse(over.get());
}
@Test
public void testMaxElementsReleaseNonSize() throws Exception {
SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 10,10);
AtomicBoolean over = new AtomicBoolean(false);
metric.setOverCallback(() -> over.set(true));
metric.setUnderCallback(() -> over.set(false));
for (int i = 0; i < 11; i++) {
metric.addSize(10);
}
metric.addSize(1000, true);
Assert.assertEquals(1110L, metric.getSize());
Assert.assertEquals(11, metric.getElements());
Assert.assertTrue(metric.isOverElements());
Assert.assertFalse(metric.isOverSize());
Assert.assertTrue(over.get());
metric.addSize(-1000, true);
Assert.assertEquals(110L, metric.getSize());
Assert.assertEquals(11, metric.getElements());
Assert.assertTrue(metric.isOverElements());
Assert.assertFalse(metric.isOverSize());
Assert.assertTrue(over.get());
for (int i = 0; i < 11; i++) {
metric.addSize(-10);
}
Assert.assertEquals(0L, metric.getSize());
Assert.assertEquals(0L, metric.getElements());
Assert.assertFalse(metric.isOver());
Assert.assertFalse(over.get());
}
@Test
public void testMultipleSizeAddMultipleInstances() throws Exception {
final int THREADS = 10, ELEMENTS = 1000;
setupExecutor(THREADS);
final AtomicInteger errors = new AtomicInteger(0);
final AtomicInteger globalMetricOverCalls = new AtomicInteger(0);
final AtomicInteger metricOverCalls = new AtomicInteger(0);
final AtomicInteger globalMetricUnderCalls = new AtomicInteger(0);
final AtomicInteger metricUnderCalls = new AtomicInteger(0);
final AtomicBoolean globalMetricOver = new AtomicBoolean(false);
final AtomicBoolean[] metricOverArray = new AtomicBoolean[THREADS];
SizeAwareMetric globalMetric = new SizeAwareMetric(10000, 500, 0, 0);
SizeAwareMetric[] metric = new SizeAwareMetric[THREADS];
globalMetric.setOverCallback(() -> {
globalMetricOver.set(true);
globalMetricOverCalls.incrementAndGet();
});
globalMetric.setUnderCallback(() -> {
globalMetricOver.set(false);
globalMetricUnderCalls.incrementAndGet();
});
ReusableLatch latchDone = new ReusableLatch(THREADS);
CyclicBarrier flagStart = new CyclicBarrier(THREADS + 1);
for (int istart = 0; istart < THREADS; istart++) {
final AtomicBoolean metricOver = new AtomicBoolean(false);
final SizeAwareMetric themetric = new SizeAwareMetric(1000, 500, 0, 0);
themetric.setOnSizeCallback(globalMetric::addSize);
themetric.setOverCallback(() -> {
metricOver.set(true);
metricOverCalls.incrementAndGet();
});
themetric.setUnderCallback(() -> {
metricOver.set(false);
metricUnderCalls.incrementAndGet();
});
metric[istart] = themetric;
metricOverArray[istart] = metricOver;
executor.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int iadd = 0; iadd < ELEMENTS; iadd++) {
themetric.addSize(1);
}
latchDone.countDown();
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
});
}
flagStart.await(10, TimeUnit.SECONDS);
Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS));
for (SizeAwareMetric theMetric : metric) {
Assert.assertTrue(theMetric.isOver());
Assert.assertEquals(ELEMENTS, theMetric.getSize());
Assert.assertEquals(ELEMENTS, theMetric.getElements());
}
for (AtomicBoolean theBool : metricOverArray) {
Assert.assertTrue(theBool.get());
}
Assert.assertTrue(globalMetricOver.get());
Assert.assertTrue(globalMetric.isOver());
Assert.assertEquals(10, metricOverCalls.get());
Assert.assertEquals(1, globalMetricOverCalls.get());
Assert.assertEquals(0, metricUnderCalls.get());
Assert.assertEquals(0, globalMetricUnderCalls.get());
Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getSize());
Assert.assertEquals(ELEMENTS * THREADS, globalMetric.getElements());
Assert.assertEquals(0, errors.get());
latchDone.setCount(10);
for (int istart = 0; istart < 10; istart++) {
SizeAwareMetric theMetric = metric[istart];
executor.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int iadd = 0; iadd < ELEMENTS; iadd++) {
theMetric.addSize(-1);
}
latchDone.countDown();
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
});
}
flagStart.await(10, TimeUnit.SECONDS);
Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS));
Assert.assertEquals(0, globalMetric.getSize());
Assert.assertEquals(0, globalMetric.getElements());
for (SizeAwareMetric theMetric : metric) {
Assert.assertEquals(0, theMetric.getSize());
Assert.assertEquals(0, theMetric.getElements());
}
Assert.assertEquals(10, metricOverCalls.get());
Assert.assertEquals(1, globalMetricOverCalls.get());
Assert.assertEquals(10, metricUnderCalls.get());
Assert.assertEquals(1, globalMetricUnderCalls.get());
Assert.assertFalse(globalMetricOver.get());
Assert.assertFalse(globalMetric.isOver());
for (AtomicBoolean theBool : metricOverArray) {
Assert.assertFalse(theBool.get());
}
}
@Test
public void testUpdateMax() throws Exception {
AtomicBoolean over = new AtomicBoolean(false);
SizeAwareMetric metric = new SizeAwareMetric(1000, 500, -1, -1);
metric.setOverCallback(() -> over.set(true));
metric.setUnderCallback(() -> over.set(false));
metric.addSize(900);
Assert.assertFalse(over.get());
metric.setMax(800, 700, 0, 0);
Assert.assertTrue(over.get());
metric.addSize(-200);
Assert.assertFalse(over.get());
}
@Test
public void testDisabled() throws Exception {
AtomicBoolean over = new AtomicBoolean(false);
SizeAwareMetric metric = new SizeAwareMetric(0, 0, -1, -1);
metric.setSizeEnabled(false);
metric.setOverCallback(() -> over.set(true));
metric.addSize(100);
Assert.assertEquals(100, metric.getSize());
Assert.assertEquals(1, metric.getElements());
Assert.assertFalse(over.get());
}
@Test
public void testMultipleNonSized() throws Exception {
AtomicBoolean over = new AtomicBoolean(false);
final SizeAwareMetric metricMaster = new SizeAwareMetric(0, 0, 1, 1);
SizeAwareMetric metric = new SizeAwareMetric(0, 0, 1, 1);
metric.setSizeEnabled(false);
metric.setOverCallback(() -> over.set(true));
metric.setOnSizeCallback(metricMaster::addSize);
for (int i = 0; i < 10; i++) {
metric.addSize(10, true);
}
Assert.assertEquals(100, metricMaster.getSize());
Assert.assertEquals(100, metric.getSize());
Assert.assertEquals(0, metricMaster.getElements());
Assert.assertEquals(0, metric.getElements());
for (int i = 0; i < 10; i++) {
metric.addSize(10, false);
}
Assert.assertEquals(200, metricMaster.getSize());
Assert.assertEquals(200, metric.getSize());
Assert.assertEquals(10, metricMaster.getElements());
Assert.assertEquals(10, metric.getElements());
}
@Test
public void testResetNeverUsed() throws Exception {
SizeAwareMetric metric = new SizeAwareMetric(0, 0, 0, 0);
AtomicBoolean over = new AtomicBoolean(false);
metric.setOverCallback(() -> over.set(true));
metric.setElementsEnabled(true);
metric.setSizeEnabled(true);
metric.setMax(0, 0, 0, 0);
Assert.assertFalse(over.get());
Assert.assertFalse(metric.isOver());
}
@Test
public void testSwitchSides() throws Exception {
SizeAwareMetric metric = new SizeAwareMetric(2000, 2000, 1, 1);
AtomicBoolean over = new AtomicBoolean(false);
metric.setOverCallback(() -> over.set(true));
metric.setUnderCallback(() -> over.set(false));
metric.setElementsEnabled(true);
metric.setSizeEnabled(true);
metric.addSize(2500, true);
Assert.assertTrue(over.get());
metric.addSize(1000);
Assert.assertTrue(metric.isOverSize());
metric.addSize(-2500, true);
// Even though we are free from maxSize, we are still bound by maxElements, it should still be over
Assert.assertTrue(over.get());
Assert.assertTrue("Switch did not work", metric.isOverElements());
Assert.assertEquals(1, metric.getElements());
Assert.assertEquals(1000, metric.getSize());
metric.addSize(5000, true);
Assert.assertTrue(metric.isOverElements());
Assert.assertEquals(6000, metric.getSize());
metric.addSize(-1000);
Assert.assertTrue(metric.isOverSize());
Assert.assertEquals(0, metric.getElements());
Assert.assertEquals(5000, metric.getSize());
metric.addSize(-5000, true);
Assert.assertFalse(metric.isOver());
Assert.assertEquals(0, metric.getSize());
Assert.assertEquals(0, metric.getElements());
}
@Test
public void testMTOverAndUnder() throws Exception {
final int THREADS = 10;
final int ELEMENTS = 100;
setupExecutor(THREADS * 2);
SizeAwareMetric metric = new SizeAwareMetric(1, 1, -1, -1);
AtomicInteger overCounter = new AtomicInteger(0);
AtomicInteger errors = new AtomicInteger(0);
metric.setOverCallback(() -> {
int value = overCounter.incrementAndGet();
if (value > 1) {
new Exception("Value = " + value).printStackTrace();
errors.incrementAndGet();
}
});
metric.setUnderCallback(() -> {
int value = overCounter.decrementAndGet();
if (value < 0) {
new Exception("Value = " + value).printStackTrace();
errors.incrementAndGet();
}
});
CyclicBarrier flagStart = new CyclicBarrier(THREADS * 2);
CountDownLatch done = new CountDownLatch(THREADS * 2);
for (int i = 0; i < THREADS; i++) {
executor.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int repeat = 0; repeat < ELEMENTS; repeat++) {
metric.addSize(1);
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
done.countDown();
});
executor.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int repeat = 0; repeat < ELEMENTS; repeat++) {
metric.addSize(-1);
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
done.countDown();
});
}
Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
Assert.assertEquals(0, metric.getSize());
Assert.assertEquals(0, metric.getElements());
Assert.assertEquals(0, errors.get());
}
}

View File

@ -499,6 +499,8 @@ public final class ActiveMQDefaultConfiguration {
public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2;
public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1;
public static final int DEFAULT_MAX_DISK_USAGE;
static {
@ -1495,6 +1497,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_GLOBAL_MAX_SIZE;
}
public static long getDefaultMaxGlobalMessages() {
return DEFAULT_GLOBAL_MAX_MESSAGES;
}
public static int getDefaultMaxDiskUsage() {
return DEFAULT_MAX_DISK_USAGE;
}

View File

@ -515,7 +515,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
if (owner != null && memoryEstimate != -1) {
// the memory has already been tracked and needs to be updated to reflect the new decoding
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
((PagingStore)owner).addSize(addition);
((PagingStore)owner).addSize(addition, false);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
}

View File

@ -1236,6 +1236,10 @@ public interface Configuration {
Configuration setGlobalMaxSize(long globalMaxSize);
Configuration setGlobalMaxMessages(long globalMaxMessages);
long getGlobalMaxMessages();
int getMaxDiskUsage();
Configuration setMaxDiskUsage(int maxDiskUsage);

View File

@ -348,6 +348,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private Long globalMaxSize;
private Long globalMaxMessages;
private boolean amqpUseCoreSubscriptionNaming = ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming();
private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
@ -567,6 +569,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
return globalMaxSize;
}
@Override
public ConfigurationImpl setGlobalMaxMessages(long maxMessages) {
this.globalMaxMessages = maxMessages;
return this;
}
@Override
public long getGlobalMaxMessages() {
if (globalMaxMessages == null) {
this.globalMaxMessages = ActiveMQDefaultConfiguration.getDefaultMaxGlobalMessages();
}
return globalMaxMessages;
}
@Override
public ConfigurationImpl setPersistenceEnabled(final boolean enable) {
persistenceEnabled = enable;

View File

@ -213,6 +213,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String MAX_SIZE_BYTES_NODE_NAME = "max-size-bytes";
private static final String MAX_MESSAGES_NODE_NAME = "max-size-messages";
private static final String MAX_SIZE_BYTES_REJECT_THRESHOLD_NODE_NAME = "max-size-bytes-reject-threshold";
private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME = "address-full-policy";
@ -305,6 +307,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String GLOBAL_MAX_SIZE = "global-max-size";
private static final String GLOBAL_MAX_MESSAGES = "global-max-messages";
private static final String MAX_DISK_USAGE = "max-disk-usage";
private static final String DISK_SCAN_PERIOD = "disk-scan-period";
@ -446,6 +450,13 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setGlobalMaxSize(globalMaxSize);
}
long globalMaxMessages = getLong(e, GLOBAL_MAX_MESSAGES, -1, Validators.MINUS_ONE_OR_GT_ZERO);
if (globalMaxSize > 0) {
config.setGlobalMaxMessages(globalMaxMessages);
}
config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE_OR_MINUS_ONE));
config.setDiskScanPeriod(getInteger(e, DISK_SCAN_PERIOD, config.getDiskScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO));
@ -1245,6 +1256,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child));
} else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxSizeBytes(ByteUtil.convertTextBytes(getTrimmedTextContent(child)));
} else if (MAX_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxSizeMessages(XMLUtil.parseInt(child));
} else if (MAX_SIZE_BYTES_REJECT_THRESHOLD_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxSizeBytesRejectThreshold(ByteUtil.convertTextBytes(getTrimmedTextContent(child)));
} else if (PAGE_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {

View File

@ -97,9 +97,25 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
/**
* Add size at the global count level.
* if totalSize &gt; globalMaxSize it will return true
* if sizeOnly = true, only the size portion is updated. If false both the counter for bytes and number of messages is updated.
*/
PagingManager addSize(int size);
PagingManager addSize(int size, boolean sizeOnly);
/**
* An utility method to call addSize(size, false);
* this is a good fit for an IntConsumer.
*/
default PagingManager addSize(int size) {
return addSize(size, false);
}
/**
* An utility method to call addSize(size, true);
* this is a good fit for an IntConsumer.
*/
default PagingManager addSizeOnly(int size) {
return addSize(size, true);
}
boolean isUsingGlobalSize();

View File

@ -128,7 +128,16 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
void stopPaging() throws Exception;
void addSize(int size);
/** *
*
* @param size
* @param sizeOnly if false we won't increment the number of messages. (add references for example)
*/
void addSize(int size, boolean sizeOnly);
default void addSize(int size) {
addSize(size, false);
}
boolean checkMemory(Runnable runnable);

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -39,6 +38,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
@ -67,11 +67,18 @@ public final class PagingManagerImpl implements PagingManager {
private PagingStoreFactory pagingStoreFactory;
private final AtomicLong globalSizeBytes = new AtomicLong(0);
private volatile boolean globalFull;
private final AtomicLong numberOfMessages = new AtomicLong(0);
private void setGlobalFull(boolean globalFull) {
synchronized (memoryCallback) {
this.globalFull = globalFull;
checkMemoryRelease();
}
}
private final long maxSize;
private final SizeAwareMetric globalSizeMetric;
private long maxSize;
private volatile boolean cleanupEnabled = true;
@ -91,9 +98,6 @@ public final class PagingManagerImpl implements PagingManager {
private final SimpleString managementAddress;
// for tests.. not part of the API
public void replacePageStoreFactory(PagingStoreFactory factory) {
this.pagingStoreFactory = factory;
@ -107,15 +111,32 @@ public final class PagingManagerImpl implements PagingManager {
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final long maxSize,
final long maxMessages,
final SimpleString managementAddress) {
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
this.globalSizeMetric = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages);
globalSizeMetric.setSizeEnabled(maxSize >= 0);
globalSizeMetric.setElementsEnabled(maxMessages >= 0);
globalSizeMetric.setOverCallback(() -> setGlobalFull(true));
globalSizeMetric.setUnderCallback(() -> setGlobalFull(false));
this.memoryExecutor = pagingSPI.newExecutor();
this.managementAddress = managementAddress;
}
SizeAwareMetric getSizeAwareMetric() {
return globalSizeMetric;
}
/** To be used in tests only called through PagingManagerTestAccessor */
void resetMaxSize(long maxSize, long maxElements) {
this.maxSize = maxSize;
this.globalSizeMetric.setMax(maxSize, maxSize, maxElements, maxElements);
}
@Override
public long getMaxSize() {
return maxSize;
@ -123,13 +144,13 @@ public final class PagingManagerImpl implements PagingManager {
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
this(pagingSPI, addressSettingsRepository, -1, null);
this(pagingSPI, addressSettingsRepository, -1, -1, null);
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final SimpleString managementAddress) {
this(pagingSPI, addressSettingsRepository, -1, managementAddress);
this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress);
}
@Override
@ -150,33 +171,23 @@ public final class PagingManagerImpl implements PagingManager {
}
@Override
public PagingManagerImpl addSize(int size) {
if (size > 0) {
numberOfMessages.incrementAndGet();
} else {
numberOfMessages.decrementAndGet();
}
long newSize = globalSizeBytes.addAndGet(size);
public PagingManagerImpl addSize(int size, boolean sizeOnly) {
long newSize = globalSizeMetric.addSize(size, sizeOnly);
if (newSize < 0) {
ActiveMQServerLogger.LOGGER.negativeGlobalAddressSize(newSize);
}
if (size < 0) {
checkMemoryRelease();
}
return this;
}
@Override
public long getGlobalSize() {
return globalSizeBytes.get();
return globalSizeMetric.getSize();
}
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) {
if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) {
if (!memoryCallback.isEmpty()) {
if (memoryExecutor != null) {
memoryExecutor.execute(this::memoryReleased);
@ -277,7 +288,7 @@ public final class PagingManagerImpl implements PagingManager {
@Override
public boolean isGlobalFull() {
return diskFull || maxSize > 0 && globalSizeBytes.get() >= maxSize;
return diskFull || maxSize > 0 && globalFull;
}
@Override
@ -449,7 +460,7 @@ public final class PagingManagerImpl implements PagingManager {
}
public void debug() {
logger.info("size = " + globalSizeBytes + " bytes, messages = " + numberOfMessages);
logger.info("size = " + globalSizeMetric.getSize() + " bytes, messages = " + globalSizeMetric.getElements());
}
@Override

View File

@ -28,7 +28,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@ -59,6 +58,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
@ -91,6 +91,8 @@ public class PagingStoreImpl implements PagingStore {
private long maxSize;
private long maxMessages;
private int pageSize;
private volatile AddressFullMessagePolicy addressFullMessagePolicy;
@ -104,7 +106,9 @@ public class PagingStoreImpl implements PagingStore {
private final ArtemisExecutor executor;
// Bytes consumed by the queue on the memory
private final AtomicLong sizeInBytes = new AtomicLong();
private final SizeAwareMetric size;
private volatile boolean full;
private int numberOfPages;
@ -166,6 +170,10 @@ public class PagingStoreImpl implements PagingStore {
this.storeName = storeName;
this.size = new SizeAwareMetric(maxSize, maxSize, -1, -1).
setUnderCallback(this::underSized).setOverCallback(this::overSized).
setOnSizeCallback(pagingManager::addSize);
applySetting(addressSettings);
this.executor = executor;
@ -189,6 +197,22 @@ public class PagingStoreImpl implements PagingStore {
this.usingGlobalMaxSize = pagingManager.isUsingGlobalSize();
}
private void overSized() {
full = true;
}
private void underSized() {
full = false;
checkReleasedMemory();
}
private void configureSizeMetric() {
size.setMax(maxSize, maxSize, maxMessages, maxMessages);
size.setSizeEnabled(maxSize >= 0);
size.setElementsEnabled(maxMessages >= 0);
}
/**
* @param addressSettings
*/
@ -196,6 +220,10 @@ public class PagingStoreImpl implements PagingStore {
public void applySetting(final AddressSettings addressSettings) {
maxSize = addressSettings.getMaxSizeBytes();
maxMessages = addressSettings.getMaxSizeMessages();
configureSizeMetric();
pageSize = addressSettings.getPageSizeBytes();
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
@ -247,7 +275,7 @@ public class PagingStoreImpl implements PagingStore {
@Override
public long getAddressSize() {
return sizeInBytes.get();
return size.getSize();
}
@Override
@ -502,7 +530,7 @@ public class PagingStoreImpl implements PagingStore {
final boolean isPaging = this.paging;
if (isPaging) {
paging = false;
ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize());
}
this.cursorProvider.onPageModeCleared();
} finally {
@ -547,7 +575,7 @@ public class PagingStoreImpl implements PagingStore {
}
}
paging = true;
ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize());
return true;
} finally {
@ -712,7 +740,7 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= maxSize || pagingManager.isGlobalFull()) {
if (pagingManager.isDiskFull() || maxSize > 0 && this.full || pagingManager.isGlobalFull()) {
if (runWhenBlocking != null) {
runWhenBlocking.run();
}
@ -724,7 +752,7 @@ public class PagingStoreImpl implements PagingStore {
// has been added, but the check to execute was done before the element was added
// NOTE! We do not fix this race by locking the whole thing, doing this check provides
// MUCH better performance in a highly concurrent environment
if (!pagingManager.isGlobalFull() && (sizeInBytes.get() < maxSize || maxSize < 0)) {
if (!pagingManager.isGlobalFull() && (!full || maxSize < 0)) {
// run it now
atomicRunWhenAvailable.run();
} else {
@ -736,7 +764,7 @@ public class PagingStoreImpl implements PagingStore {
if (pagingManager.isDiskFull()) {
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
} else {
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, size.getSize(), maxSize, pagingManager.getGlobalSize());
}
blocking = true;
}
@ -754,9 +782,9 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
public void addSize(final int size) {
boolean globalFull = pagingManager.addSize(size).isGlobalFull();
long newSize = sizeInBytes.addAndGet(size);
public void addSize(final int size, boolean sizeOnly) {
long newSize = this.size.addSize(size, sizeOnly);
boolean globalFull = pagingManager.isGlobalFull();
if (newSize < 0) {
ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString());
@ -764,13 +792,13 @@ public class PagingStoreImpl implements PagingStore {
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
checkReleaseMemory(globalFull, newSize);
checkReleasedMemory();
}
return;
} else if (addressFullMessagePolicy == AddressFullMessagePolicy.PAGE) {
if (size > 0) {
if (maxSize != -1 && newSize > maxSize || globalFull) {
if (globalFull || full) {
startPaging();
}
}
@ -781,15 +809,11 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean checkReleasedMemory() {
return checkReleaseMemory(pagingManager.isGlobalFull(), sizeInBytes.get());
}
public boolean checkReleaseMemory(boolean globalFull, long newSize) {
if (!blockedViaAddressControl && !globalFull && (newSize < maxSize || maxSize < 0)) {
if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && (!full || maxSize < 0)) {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(this::memoryReleased);
if (blocking) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, size.getSize(), maxSize);
blocking = false;
return true;
}
@ -824,7 +848,7 @@ public class PagingStoreImpl implements PagingStore {
// Address is full, we just pretend we are paging, and drop the data
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize());
}
return true;
} else {
@ -979,7 +1003,7 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void refUp(Message message, int count) {
this.addSize(MessageReferenceImpl.getMemoryEstimate());
this.addSize(MessageReferenceImpl.getMemoryEstimate(), true);
}
@Override
@ -988,7 +1012,7 @@ public class PagingStoreImpl implements PagingStore {
// this could happen on paged messages since they are not routed and refUp is never called
return;
}
this.addSize(-MessageReferenceImpl.getMemoryEstimate());
this.addSize(-MessageReferenceImpl.getMemoryEstimate(), true);
}
private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {

View File

@ -2952,7 +2952,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public PagingManager createPagingManager() throws Exception {
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getManagementAddress());
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getGlobalMaxMessages(), configuration.getManagementAddress());
}
protected PagingStoreFactory getPagingStoreFactory() throws Exception {

View File

@ -210,7 +210,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final AtomicInteger pagedReferences = new AtomicInteger(0);
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
final AtomicInteger queueMemorySize = new AtomicInteger(0);
protected final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
@ -1025,7 +1025,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
int count = messageReference.getMessage().refUp();
if (count == 1) {
if (messageReference.getMessage().getOwner() != null) {
((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate());
((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate(), false);
}
}
if (pagingStore != null) {
@ -1038,7 +1038,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
int count = messageReference.getMessage().refDown();
if (count == 0) {
if (messageReference.getMessage().getOwner() != null) {
((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate());
((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate(), false);
}
}
if (pagingStore != null) {

View File

@ -40,6 +40,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
*/
public static final long DEFAULT_MAX_SIZE_BYTES = -1;
public static final long DEFAULT_MAX_SIZE_MESSAGES = -1;
public static final AddressFullMessagePolicy DEFAULT_ADDRESS_FULL_MESSAGE_POLICY = AddressFullMessagePolicy.PAGE;
public static final int DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
@ -139,6 +141,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Long maxSizeBytes = null;
private Long maxSizeMessages = null;
private Integer pageSizeBytes = null;
private Integer pageMaxCache = null;
@ -276,6 +280,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public AddressSettings(AddressSettings other) {
this.addressFullMessagePolicy = other.addressFullMessagePolicy;
this.maxSizeBytes = other.maxSizeBytes;
this.maxSizeMessages = other.maxSizeMessages;
this.pageSizeBytes = other.pageSizeBytes;
this.pageMaxCache = other.pageMaxCache;
this.dropMessagesWhenFull = other.dropMessagesWhenFull;
@ -609,6 +614,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return maxSizeBytes != null ? maxSizeBytes : AddressSettings.DEFAULT_MAX_SIZE_BYTES;
}
public long getMaxSizeMessages() {
return maxSizeMessages != null ? maxSizeMessages : AddressSettings.DEFAULT_MAX_SIZE_MESSAGES;
}
public AddressSettings setMaxSizeMessages(final long maxSizeMessages) {
this.maxSizeMessages = maxSizeMessages;
return this;
}
public AddressSettings setMaxSizeBytes(final long maxSizeBytes) {
this.maxSizeBytes = maxSizeBytes;
return this;
@ -986,6 +1000,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (maxSizeBytes == null) {
maxSizeBytes = merged.maxSizeBytes;
}
if (maxSizeMessages == null) {
maxSizeMessages = merged.maxSizeMessages;
}
if (pageMaxCache == null) {
pageMaxCache = merged.pageMaxCache;
}
@ -1412,6 +1429,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
configDeleteDiverts = null;
}
}
if (buffer.readableBytes() > 0) {
maxSizeMessages = BufferHelper.readNullableLong(buffer);
}
}
@Override
@ -1479,7 +1500,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) +
BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()) +
BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp);
BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) +
BufferHelper.sizeOfNullableLong(maxSizeMessages);
}
@Override
@ -1613,6 +1635,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableBoolean(buffer, enableIngressTimestamp);
buffer.writeNullableSimpleString(configDeleteDiverts != null ? new SimpleString(configDeleteDiverts.toString()) : null);
BufferHelper.writeNullableLong(buffer, maxSizeMessages);
}
/* (non-Javadoc)
@ -1688,6 +1712,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode());
result = prime * result + ((maxSizeMessages == null) ? 0 : maxSizeMessages.hashCode());
return result;
}
@ -2054,6 +2079,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} else if (!enableIngressTimestamp.equals(other.enableIngressTimestamp))
return false;
if (maxSizeMessages == null) {
if (other.maxSizeMessages != null)
return false;
} else if (!maxSizeMessages.equals(other.maxSizeMessages))
return false;
return true;
}

View File

@ -894,6 +894,15 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="global-max-messages" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Number of messages before all addresses will enter into their Full Policy configured.
It works in conjunction with global-max-size, being watever value hits its maximum first.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="global-max-size" type="xsd:string" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -3934,6 +3943,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="max-size-messages" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the maximum number of messages allowed on the address. This is used in PAGING, BLOCK and FAIL policies. It does not support notations and it is a simple number of messages allowed.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1"
minOccurs="0">
<xsd:annotation>

View File

@ -357,6 +357,35 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
}
}
@Test
public void testParseMaxSizeOnAddressSettings() throws Exception {
String configStr = "<configuration><address-settings>" + "\n" + "<address-setting match=\"foo\">" + "\n" + "<max-size-messages>123</max-size-messages>\n" + "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration configuration = parser.parseMainConfig(input);
AddressSettings settings = configuration.getAddressesSettings().get("foo");
Assert.assertEquals(123, settings.getMaxSizeMessages());
}
// you should not use K, M notations on address settings max-size-messages
@Test
public void testExpectedErrorOverMaxMessageNotation() throws Exception {
String configStr = "<configuration><address-settings>" + "\n" + "<address-setting match=\"foo\">" + "\n" + "<max-size-messages>123K</max-size-messages>\n" + "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
boolean valid = false;
try {
parser.parseMainConfig(input);
} catch (Throwable expected) {
valid = true;
}
Assert.assertTrue("Exception expected", valid);
}
private static String bridgePart = "<bridges>\n" +
" <bridge name=\"my-bridge\">\n" +
" <queue-name>sausage-factory</queue-name>\n" +
@ -416,6 +445,47 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
Configuration configuration = parser.parseMainConfig(inputStream);
}
@Test
public void testMaxSize() throws Exception {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<global-max-size>10M</global-max-size>");
stream.println("<global-max-messages>1000</global-max-messages>");
stream.println("</core></configuration>");
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
FileConfigurationParser parser = new FileConfigurationParser();
Configuration configuration = parser.parseMainConfig(inputStream);
Assert.assertEquals(10 * 1024 * 1024, configuration.getGlobalMaxSize());
Assert.assertEquals(1000, configuration.getGlobalMaxMessages());
}
@Test
public void testExceptionMaxSize() throws Exception {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<global-max-messages>1000K</global-max-messages>");
stream.println("</core></configuration>");
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
FileConfigurationParser parser = new FileConfigurationParser();
boolean exceptionHappened = false;
try {
parser.parseMainConfig(inputStream);
} catch (Throwable e) {
exceptionHappened = true;
}
Assert.assertTrue("Exception expected parsing notation for global-max-messages", exceptionHappened);
}
@Test
public void testRetentionJournalOptionsDays() throws Exception {
testStreamDatesOption("DAYS", TimeUnit.DAYS);

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.paging.impl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
/** Use this class to access things that are meant on test only */
public class PagingManagerTestAccessor {
public static void resetMaxSize(PagingManager pagingManager, long maxSize, long maxElements) {
((PagingManagerImpl)pagingManager).resetMaxSize(maxSize, maxElements);
}
public static SizeAwareMetric globalSizeAwareMetric(PagingManager pagingManager) {
return ((PagingManagerImpl)pagingManager).getSizeAwareMetric();
}
}

View File

@ -309,7 +309,7 @@ public class FileMoveManagerTest {
PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null);
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
managerImpl.start();

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.server.Queue;
public class QueueImplTestAccessor {
public static int getQueueMemorySize(Queue queue) {
return ((QueueImpl)queue).queueMemorySize.get();
}
}

View File

@ -65,6 +65,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
addressSettingsToMerge.setMaxDeliveryAttempts(1000);
addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
addressSettingsToMerge.setMaxSizeBytes(1001);
addressSettingsToMerge.setMaxSizeMessages(101);
addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
addressSettingsToMerge.setRedeliveryDelay(1003);
addressSettingsToMerge.setPageSizeBytes(1004);
@ -76,6 +77,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
Assert.assertEquals(addressSettings.getExpiryAddress(), exp);
Assert.assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000);
Assert.assertEquals(addressSettings.getMaxSizeBytes(), 1001);
Assert.assertEquals(addressSettings.getMaxSizeMessages(), 101);
Assert.assertEquals(addressSettings.getMessageCounterHistoryDayLimit(), 1002);
Assert.assertEquals(addressSettings.getRedeliveryDelay(), 1003);
Assert.assertEquals(addressSettings.getPageSizeBytes(), 1004);

View File

@ -22,6 +22,9 @@ import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
import org.apache.activemq.artemis.core.server.impl.QueueImplTestAccessor;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import javax.management.Notification;
@ -30,7 +33,6 @@ import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularDataSupport;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.util.Arrays;
@ -43,7 +45,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -71,13 +72,11 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
@ -1579,16 +1578,9 @@ public class QueueControlTest extends ManagementTestBase {
final LocalQueueBinding binding2 = (LocalQueueBinding) server.getPostOffice().getBinding(dlq);
Queue q2 = binding2.getQueue();
Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
queueMemorySizeField.setAccessible(true);
//Get memory size counters to verify
AtomicInteger queueMemorySize1 = (AtomicInteger) queueMemorySizeField.get(q);
AtomicInteger queueMemorySize2 = (AtomicInteger) queueMemorySizeField.get(q2);
//Verify that original queue has a memory size greater than 0 and DLQ is 0
assertTrue(queueMemorySize1.get() > 0);
assertEquals(0, queueMemorySize2.get());
assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0);
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
// Read and rollback all messages to DLQ
ClientConsumer clientConsumer = session.createConsumer(qName);
@ -1603,8 +1595,8 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertNull(clientConsumer.receiveImmediate());
//Verify that original queue has a memory size of 0 and DLQ is greater than 0 after rollback
assertEquals(0, queueMemorySize1.get());
assertTrue(queueMemorySize2.get() > 0);
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
assertTrue(QueueImplTestAccessor.getQueueMemorySize(q2) > 0);
QueueControl dlqQueueControl = createManagementControl(dla, dlq);
assertMessageMetrics(dlqQueueControl, numMessagesToTest, durable);
@ -1616,8 +1608,8 @@ public class QueueControlTest extends ManagementTestBase {
assertMessageMetrics(dlqQueueControl, 0, durable);
//Verify that original queue has a memory size of greater than 0 and DLQ is 0 after move
assertTrue(queueMemorySize1.get() > 0);
assertEquals(0, queueMemorySize2.get());
assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0);
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
// .. and that the messages is now on the original queue once more.
for (int i = 0; i < numMessagesToTest; i++) {
@ -1630,8 +1622,8 @@ public class QueueControlTest extends ManagementTestBase {
clientConsumer.close();
//Verify that original queue and DLQ have a memory size of 0
assertEquals(0, queueMemorySize1.get());
assertEquals(0, queueMemorySize2.get());
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
}
/**
@ -1662,17 +1654,12 @@ public class QueueControlTest extends ManagementTestBase {
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue);
Queue q = binding.getQueue();
Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
queueMemorySizeField.setAccessible(true);
//Get memory size counters to verify
AtomicInteger queueMemorySize = (AtomicInteger) queueMemorySizeField.get(q);
QueueControl queueControl = createManagementControl(address, queue);
assertMessageMetrics(queueControl, 1, durable);
//verify memory usage is greater than 0
Assert.assertTrue(queueMemorySize.get() > 0);
Assert.assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0);
// moved all messages to otherQueue
int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString());
@ -1680,7 +1667,7 @@ public class QueueControlTest extends ManagementTestBase {
assertMessageMetrics(queueControl, 0, durable);
//verify memory usage is 0 after move
Assert.assertEquals(0, queueMemorySize.get());
Assert.assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
// check there is no message to consume from queue
consumeMessages(0, session, queue);
@ -1873,7 +1860,7 @@ public class QueueControlTest extends ManagementTestBase {
}
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue);
Assert.assertEquals(10, binding.getQueue().getMessageCount());
Wait.assertEquals(10, () -> binding.getQueue().getMessageCount());
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(10, queueControl.getMessageCount());
@ -2206,10 +2193,7 @@ public class QueueControlTest extends ManagementTestBase {
final int MESSAGE_SIZE = 1024 * 3; // 3k
// reset maxSize for Paging mode
Field maxSizField = PagingManagerImpl.class.getDeclaredField("maxSize");
maxSizField.setAccessible(true);
maxSizField.setLong(server.getPagingManager(), 10240);
PagingManagerTestAccessor.resetMaxSize(server.getPagingManager(), 10240, 0);
clearDataRecreateServerDirs();
SimpleString address = RandomUtil.randomSimpleString();
@ -2230,7 +2214,7 @@ public class QueueControlTest extends ManagementTestBase {
bb.put(getSamplebyte(j));
}
final int numberOfMessages = 8000;
final int numberOfMessages = 100;
ClientMessage message;
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
@ -2250,10 +2234,7 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertEquals(numberOfMessages, removedMatchedMessagesCount);
assertMessageMetrics(queueControl, 0, durable);
Field queueMemoprySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
queueMemoprySizeField.setAccessible(true);
AtomicInteger queueMemorySize = (AtomicInteger) queueMemoprySizeField.get(queue);
Assert.assertEquals(0, queueMemorySize.get());
Assert.assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(queue));
session.deleteQueue(queueName);
}
@ -3826,12 +3807,9 @@ public class QueueControlTest extends ManagementTestBase {
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, queueControl.getScheduledCount());
Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
queueMemorySizeField.setAccessible(true);
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue);
Queue q = binding.getQueue();
AtomicInteger queueMemorySize1 = (AtomicInteger) queueMemorySizeField.get(q);
assertEquals(0, queueMemorySize1.get());
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(durable);
@ -3843,7 +3821,7 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertEquals(0, queueControl.getMessageCount());
//Verify that original queue has a memory size of 0
assertEquals(0, queueMemorySize1.get());
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
session.deleteQueue(queue);
}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnection;
import org.junit.Assert;
import org.junit.Test;
/**
@ -58,7 +59,8 @@ public class ProducerFlowControlTest extends ProducerFlowControlBaseTest {
pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
msg = (TextMessage) consumer.receive();
msg = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(msg);
assertEquals("Message 2", msg.getText());
msg.acknowledge();

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.junit.Assert;
import org.junit.Test;
public class MaxMessagesPagingTest extends ActiveMQTestBase {
protected static final int PAGE_MAX = 100 * 1024;
protected static final int PAGE_SIZE = 10 * 1024;
protected ActiveMQServer server;
@Test
public void testGlobalMaxMessages() throws Exception {
final SimpleString ADDRESS = new SimpleString("testGlobalMaxMessages");
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
final int PAGE_MAX = 100 * 1024;
final int PAGE_SIZE = 10 * 1024;
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getConfiguration().setGlobalMaxMessages(100);
server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS));
ClientProducer producer = session.createProducer(ADDRESS);
ClientMessage message = null;
int messageSize = 1 * 1024;
for (int i = 0; i < 30; i++) {
message = session.createMessage(true);
message.getBodyBuffer().writerIndex(0);
message.getBodyBuffer().writeBytes(new byte[messageSize]);
for (int j = 1; j <= messageSize; j++) {
message.getBodyBuffer().writeInt(j);
}
producer.send(message);
}
Queue queue = server.locateQueue(ADDRESS);
Wait.assertTrue(queue.getPagingStore()::isPaging);
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
for (int i = 0; i < 30; i++) {
message = consumer.receive(5000);
Assert.assertNotNull(message);
message.acknowledge();
}
session.commit();
Wait.assertFalse(queue.getPagingStore()::isPaging);
messageSize = 1;
for (int i = 0; i < 102; i++) {
message = session.createMessage(true);
message.getBodyBuffer().writerIndex(0);
message.getBodyBuffer().writeBytes(new byte[messageSize]);
producer.send(message);
if (i == 30) {
// it should not kick based on the size of the address
Wait.assertFalse(queue.getPagingStore()::isPaging);
}
}
Wait.assertTrue(queue.getPagingStore()::isPaging);
SizeAwareMetric globalSizeMetric = PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
// this is validating the test is actually validating paging after over elements
Assert.assertTrue(globalSizeMetric.isOverElements());
Assert.assertFalse(globalSizeMetric.isOverSize());
session.close();
}
@Test
public void testGlobalMaxMessagesMultipleQueues() throws Exception {
final String baseAddress = "testGlobal";
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
final int PAGE_MAX = 100 * 1024;
final int PAGE_SIZE = 10 * 1024;
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getConfiguration().setGlobalMaxMessages(50);
server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
for (int adr = 1; adr <= 2; adr++) {
SimpleString address = new SimpleString(baseAddress + adr);
session.createQueue(new QueueConfiguration(address).setAddress(address));
}
for (int adr = 1; adr <= 2; adr++) {
SimpleString address = new SimpleString(baseAddress + adr);
ClientProducer producer = session.createProducer(address);
ClientMessage message = null;
for (int i = 0; i < 30; i++) {
message = session.createMessage(true);
message.getBodyBuffer().writerIndex(0);
message.getBodyBuffer().writeBytes(new byte[1]);
producer.send(message);
}
Queue queue = server.locateQueue(address);
if (adr == 1) {
// first address is fine
Wait.assertFalse(queue.getPagingStore()::isPaging);
} else {
// on second one we reach max
Wait.assertTrue(queue.getPagingStore()::isPaging);
}
}
SizeAwareMetric globalSizeMetric = PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
// this is validating the test is actually validating paging after over elements
Assert.assertTrue(globalSizeMetric.isOverElements());
Assert.assertFalse(globalSizeMetric.isOverSize());
session.close();
}
@Test
public void testMaxOnAddress() throws Exception {
final String baseAddress = "testMaxOnAddress";
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
final int PAGE_MAX = 100 * 1024;
final int PAGE_SIZE = 10 * 1024;
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getConfiguration().setGlobalMaxMessages(50);
server.start();
AddressSettings max5 = new AddressSettings().setMaxSizeMessages(5);
server.getAddressSettingsRepository().addMatch("#", max5);
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
for (int adr = 1; adr <= 2; adr++) {
SimpleString address = new SimpleString(baseAddress + adr);
session.createQueue(new QueueConfiguration(address).setAddress(address));
}
for (int adr = 1; adr <= 1; adr++) {
SimpleString address = new SimpleString(baseAddress + adr);
ClientProducer producer = session.createProducer(address);
ClientMessage message = null;
Queue queue = server.locateQueue(address);
for (int i = 0; i < 10; i++) {
message = session.createMessage(true);
message.getBodyBuffer().writerIndex(0);
message.getBodyBuffer().writeBytes(new byte[1]);
producer.send(message);
if (i >= 4) {
Wait.assertTrue(queue.getPagingStore()::isPaging);
} else {
Assert.assertFalse(queue.getPagingStore().isPaging());
}
}
}
}
@Test
public void testMaxOnAddressHitGlobal() throws Exception {
final String baseAddress = "testMaxOnAddress";
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
final int PAGE_MAX = 100 * 1024;
final int PAGE_SIZE = 10 * 1024;
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getConfiguration().setGlobalMaxMessages(40);
server.start();
AddressSettings max5 = new AddressSettings().setMaxSizeMessages(5).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getAddressSettingsRepository().addMatch("#", max5);
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
for (int adr = 0; adr < 11; adr++) {
SimpleString address = new SimpleString(baseAddress + adr);
session.createQueue(new QueueConfiguration(address).setAddress(address));
ClientProducer producer = session.createProducer(address);
ClientMessage message = null;
Queue queue = server.locateQueue(address);
for (int i = 0; i < 4; i++) {
message = session.createMessage(true);
message.getBodyBuffer().writerIndex(0);
message.getBodyBuffer().writeBytes(new byte[1]);
producer.send(message);
}
if (adr >= 9) {
Wait.assertTrue(queue.getPagingStore()::isPaging);
} else {
Assert.assertFalse(queue.getPagingStore().isPaging());
}
}
}
}

View File

@ -404,6 +404,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
public void addSize(int size, boolean sizeOnly) {
}
@Override
public void addSize(int size) {

View File

@ -910,7 +910,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
}
@Override
public FakePagingManager addSize(int s) {
public FakePagingManager addSize(int s, boolean sizeOnly) {
globalSize.addAndGet(s);
return this;
}

View File

@ -81,7 +81,7 @@ public class FakePagingManager implements PagingManager {
}
@Override
public FakePagingManager addSize(int size) {
public FakePagingManager addSize(int size, boolean sizeOnly) {
return this;
}