This closes #4387
This commit is contained in:
commit
347f9d7aed
|
@ -42,18 +42,14 @@ public class SizeAwareMetric {
|
|||
private static final AtomicIntegerFieldUpdater<SizeAwareMetric> flagUpdater = AtomicIntegerFieldUpdater.newUpdater(SizeAwareMetric.class, "flag");
|
||||
private volatile int flag = NOT_USED;
|
||||
|
||||
private long maxElements;
|
||||
private long maxElements = -1; // disabled by default
|
||||
|
||||
private long lowerMarkElements;
|
||||
|
||||
private long maxSize;
|
||||
private long maxSize = -1; // disabled by default
|
||||
|
||||
private long lowerMarkSize;
|
||||
|
||||
private boolean sizeEnabled = false;
|
||||
|
||||
private boolean elementsEnabled = false;
|
||||
|
||||
private AddCallback onSizeCallback;
|
||||
|
||||
private Runnable overCallback;
|
||||
|
@ -62,8 +58,6 @@ public class SizeAwareMetric {
|
|||
|
||||
/** To be used in a case where we just measure elements */
|
||||
public SizeAwareMetric() {
|
||||
this.sizeEnabled = false;
|
||||
this.elementsEnabled = false;
|
||||
}
|
||||
|
||||
|
||||
|
@ -78,8 +72,6 @@ public class SizeAwareMetric {
|
|||
this.lowerMarkElements = lowerMarkElements;
|
||||
this.maxSize = maxSize;
|
||||
this.lowerMarkSize = lowerMarkSize;
|
||||
this.sizeEnabled = maxSize >= 0;
|
||||
this.elementsEnabled = maxElements >= 0;
|
||||
}
|
||||
|
||||
public boolean isOver() {
|
||||
|
@ -99,12 +91,7 @@ public class SizeAwareMetric {
|
|||
}
|
||||
|
||||
public boolean isElementsEnabled() {
|
||||
return elementsEnabled;
|
||||
}
|
||||
|
||||
public SizeAwareMetric setElementsEnabled(boolean elementsEnabled) {
|
||||
this.elementsEnabled = elementsEnabled;
|
||||
return this;
|
||||
return maxElements >= 0;
|
||||
}
|
||||
|
||||
public long getElements() {
|
||||
|
@ -112,12 +99,7 @@ public class SizeAwareMetric {
|
|||
}
|
||||
|
||||
public boolean isSizeEnabled() {
|
||||
return sizeEnabled;
|
||||
}
|
||||
|
||||
public SizeAwareMetric setSizeEnabled(boolean sizeEnabled) {
|
||||
this.sizeEnabled = sizeEnabled;
|
||||
return this;
|
||||
return maxSize >= 0;
|
||||
}
|
||||
|
||||
public SizeAwareMetric setOnSizeCallback(AddCallback onSize) {
|
||||
|
@ -214,56 +196,69 @@ public class SizeAwareMetric {
|
|||
}
|
||||
|
||||
private void checkUnder(long currentElements, long currentSize) {
|
||||
if (sizeEnabled) {
|
||||
if (currentSize <= lowerMarkSize && changeFlag(OVER_SIZE, PENDING_FREE)) {
|
||||
// checking if we need to switch from OVER_SIZE to OVER_ELEMENTS, to avoid calling under needless
|
||||
if (!(elementsEnabled && currentElements >= maxElements && changeFlag(PENDING_FREE, OVER_ELEMENTS))) {
|
||||
try {
|
||||
under();
|
||||
} finally {
|
||||
changeFlag(PENDING_FREE, FREE);
|
||||
}
|
||||
}
|
||||
return; // we must return now as we already checked for the elements portion
|
||||
if (isUnderSize(currentSize) && changeFlag(OVER_SIZE, PENDING_FREE)) {
|
||||
if (isOverElements(currentElements) && changeFlag(PENDING_FREE, OVER_ELEMENTS)) {
|
||||
logger.debug("Switch from OVER_SIZE to OVER_ELEMENTS [currentSize={}, currentElements={}, lowerMarkSize={}, maxElements={}]",
|
||||
currentSize, currentElements, lowerMarkSize, maxElements);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
logger.debug("UnderSize [currentSize={}, lowerMarkSize={}]", currentSize, lowerMarkSize);
|
||||
under();
|
||||
} finally {
|
||||
changeFlag(PENDING_FREE, FREE);
|
||||
}
|
||||
}
|
||||
|
||||
if (elementsEnabled) {
|
||||
if (currentElements <= lowerMarkElements && changeFlag(OVER_ELEMENTS, PENDING_FREE)) {
|
||||
// checking if we need to switch from OVER_ELEMENTS to OVER_SIZE, to avoid calling under needless
|
||||
if (!(sizeEnabled && currentSize >= maxSize && changeFlag(PENDING_FREE, OVER_SIZE))) {
|
||||
// this is checking the other side from size (elements).
|
||||
// on this case we are just switching sides and we should not fire under();
|
||||
try {
|
||||
under();
|
||||
} finally {
|
||||
changeFlag(PENDING_FREE, FREE);
|
||||
}
|
||||
}
|
||||
return; // this return here is moot I know. I am keeping it here for consistence with the size portion
|
||||
// and in case eventually further checks are added on this method, this needs to be reviewed.
|
||||
if (isUnderElements(currentElements) && changeFlag(OVER_ELEMENTS, PENDING_FREE)) {
|
||||
if (isOverSize(currentSize) && changeFlag(PENDING_FREE, OVER_SIZE)) {
|
||||
logger.debug("Switch from OVER_ELEMENTS to OVER_SIZE [currentElements={}, currentSize={}, lowerMarkElements={}, maxSize={}]",
|
||||
currentElements, currentSize, lowerMarkElements, maxSize);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
logger.debug("UnderElements [currentElements={}, lowerMarkElements={}]", currentElements, lowerMarkElements);
|
||||
under();
|
||||
} finally {
|
||||
changeFlag(PENDING_FREE, FREE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isUnderSize(long currentSize) {
|
||||
return isSizeEnabled() && currentSize < lowerMarkSize;
|
||||
}
|
||||
|
||||
private boolean isOverSize(long currentSize) {
|
||||
return isSizeEnabled() && currentSize >= maxSize;
|
||||
}
|
||||
|
||||
private boolean isUnderElements(long currentElements) {
|
||||
return isElementsEnabled() && currentElements < lowerMarkElements;
|
||||
}
|
||||
|
||||
private boolean isOverElements(long currentElements) {
|
||||
return isElementsEnabled() && currentElements >= 0 && currentElements >= maxElements;
|
||||
}
|
||||
|
||||
private void checkOver(long currentElements, long currentSize) {
|
||||
if (sizeEnabled) {
|
||||
if (currentSize >= maxSize && changeFlag(FREE, PENDING_OVER_SIZE)) {
|
||||
try {
|
||||
over();
|
||||
} finally {
|
||||
changeFlag(PENDING_OVER_SIZE, OVER_SIZE);
|
||||
}
|
||||
if (isOverSize(currentSize) && changeFlag(FREE, PENDING_OVER_SIZE)) {
|
||||
try {
|
||||
logger.debug("OverSize [currentSize={}, maxSize={}]", currentSize, maxSize);
|
||||
over();
|
||||
} finally {
|
||||
changeFlag(PENDING_OVER_SIZE, OVER_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
if (elementsEnabled && currentElements >= 0) {
|
||||
if (currentElements >= maxElements && changeFlag(FREE, PENDING_OVER_ELEMENTS)) {
|
||||
try {
|
||||
over();
|
||||
} finally {
|
||||
changeFlag(PENDING_OVER_ELEMENTS, OVER_ELEMENTS);
|
||||
}
|
||||
if (isOverElements(currentElements) && changeFlag(FREE, PENDING_OVER_ELEMENTS)) {
|
||||
try {
|
||||
logger.debug("currentElements [currentSize={}, maxElements={}]", currentElements, maxElements);
|
||||
over();
|
||||
} finally {
|
||||
changeFlag(PENDING_OVER_ELEMENTS, OVER_ELEMENTS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ public class SizeAwareMetricTest {
|
|||
|
||||
ExecutorService executor;
|
||||
|
||||
private void setupExecutor(int threads) throws Exception {
|
||||
private void setupExecutor(int threads) {
|
||||
if (executor == null) {
|
||||
executor = Executors.newFixedThreadPool(threads);
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ public class SizeAwareMetricTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithParent() throws Exception {
|
||||
public void testWithParent() {
|
||||
AtomicBoolean childBoolean = new AtomicBoolean(false);
|
||||
AtomicBoolean parentBoolean = new AtomicBoolean(false);
|
||||
|
||||
|
@ -227,7 +227,7 @@ public class SizeAwareMetricTest {
|
|||
|
||||
|
||||
@Test
|
||||
public void testMaxElements() throws Exception {
|
||||
public void testMaxElements() {
|
||||
SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 10,10);
|
||||
|
||||
AtomicBoolean over = new AtomicBoolean(false);
|
||||
|
@ -251,7 +251,7 @@ public class SizeAwareMetricTest {
|
|||
|
||||
}
|
||||
@Test
|
||||
public void testMaxElementsReleaseNonSizeParentMetric() throws Exception {
|
||||
public void testMaxElementsReleaseNonSizeParentMetric() {
|
||||
SizeAwareMetric metricMain = new SizeAwareMetric(10000, 500, 10,10);
|
||||
SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 1000,1000);
|
||||
|
||||
|
@ -299,7 +299,7 @@ public class SizeAwareMetricTest {
|
|||
|
||||
|
||||
@Test
|
||||
public void testMaxElementsReleaseNonSize() throws Exception {
|
||||
public void testMaxElementsReleaseNonSize() {
|
||||
SizeAwareMetric metric = new SizeAwareMetric(10000, 500, 10,10);
|
||||
|
||||
AtomicBoolean over = new AtomicBoolean(false);
|
||||
|
@ -350,7 +350,7 @@ public class SizeAwareMetricTest {
|
|||
final AtomicBoolean globalMetricOver = new AtomicBoolean(false);
|
||||
final AtomicBoolean[] metricOverArray = new AtomicBoolean[THREADS];
|
||||
|
||||
SizeAwareMetric globalMetric = new SizeAwareMetric(10000, 500, 0, 0);
|
||||
SizeAwareMetric globalMetric = new SizeAwareMetric(10000, 500, 10000, 500);
|
||||
|
||||
SizeAwareMetric[] metric = new SizeAwareMetric[THREADS];
|
||||
|
||||
|
@ -369,24 +369,24 @@ public class SizeAwareMetricTest {
|
|||
CyclicBarrier flagStart = new CyclicBarrier(THREADS + 1);
|
||||
for (int istart = 0; istart < THREADS; istart++) {
|
||||
final AtomicBoolean metricOver = new AtomicBoolean(false);
|
||||
final SizeAwareMetric themetric = new SizeAwareMetric(1000, 500, 0, 0);
|
||||
themetric.setOnSizeCallback(globalMetric::addSize);
|
||||
themetric.setOverCallback(() -> {
|
||||
final SizeAwareMetric theMetric = new SizeAwareMetric(1000, 500, 1000, 500);
|
||||
theMetric.setOnSizeCallback(globalMetric::addSize);
|
||||
theMetric.setOverCallback(() -> {
|
||||
metricOver.set(true);
|
||||
metricOverCalls.incrementAndGet();
|
||||
});
|
||||
themetric.setUnderCallback(() -> {
|
||||
theMetric.setUnderCallback(() -> {
|
||||
metricOver.set(false);
|
||||
metricUnderCalls.incrementAndGet();
|
||||
});
|
||||
metric[istart] = themetric;
|
||||
metric[istart] = theMetric;
|
||||
metricOverArray[istart] = metricOver;
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
flagStart.await(10, TimeUnit.SECONDS);
|
||||
|
||||
for (int iadd = 0; iadd < ELEMENTS; iadd++) {
|
||||
themetric.addSize(1);
|
||||
theMetric.addSize(1);
|
||||
}
|
||||
latchDone.countDown();
|
||||
} catch (Throwable e) {
|
||||
|
@ -463,7 +463,7 @@ public class SizeAwareMetricTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateMax() throws Exception {
|
||||
public void testUpdateMax() {
|
||||
AtomicBoolean over = new AtomicBoolean(false);
|
||||
SizeAwareMetric metric = new SizeAwareMetric(1000, 500, -1, -1);
|
||||
metric.setOverCallback(() -> over.set(true));
|
||||
|
@ -472,31 +472,31 @@ public class SizeAwareMetricTest {
|
|||
metric.addSize(900);
|
||||
Assert.assertFalse(over.get());
|
||||
|
||||
metric.setMax(800, 700, 0, 0);
|
||||
metric.setMax(800, 700, -1, -1);
|
||||
Assert.assertTrue(over.get());
|
||||
|
||||
metric.addSize(-200);
|
||||
metric.addSize(-201);
|
||||
Assert.assertFalse(over.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisabled() throws Exception {
|
||||
public void testDisabled() {
|
||||
AtomicBoolean over = new AtomicBoolean(false);
|
||||
SizeAwareMetric metric = new SizeAwareMetric(0, 0, -1, -1);
|
||||
metric.setSizeEnabled(false);
|
||||
SizeAwareMetric metric = new SizeAwareMetric(-1, -1, -1, -1);
|
||||
metric.setOverCallback(() -> over.set(true));
|
||||
metric.addSize(100);
|
||||
|
||||
Assert.assertEquals(100, metric.getSize());
|
||||
Assert.assertEquals(1, metric.getElements());
|
||||
Assert.assertFalse(over.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleNonSized() throws Exception {
|
||||
public void testMultipleNonSized() {
|
||||
AtomicBoolean over = new AtomicBoolean(false);
|
||||
final SizeAwareMetric metricMain = new SizeAwareMetric(0, 0, 1, 1);
|
||||
SizeAwareMetric metric = new SizeAwareMetric(0, 0, 1, 1);
|
||||
metric.setSizeEnabled(false);
|
||||
final SizeAwareMetric metricMain = new SizeAwareMetric(-1, -1, -1, -1);
|
||||
SizeAwareMetric metric = new SizeAwareMetric(-1, -1, -1, -1);
|
||||
|
||||
metric.setOverCallback(() -> over.set(true));
|
||||
metric.setOnSizeCallback(metricMain::addSize);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
@ -516,30 +516,29 @@ public class SizeAwareMetricTest {
|
|||
Assert.assertEquals(200, metric.getSize());
|
||||
Assert.assertEquals(10, metricMain.getElements());
|
||||
Assert.assertEquals(10, metric.getElements());
|
||||
|
||||
Assert.assertFalse(over.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetNeverUsed() throws Exception {
|
||||
SizeAwareMetric metric = new SizeAwareMetric(0, 0, 0, 0);
|
||||
public void testResetNeverUsed() {
|
||||
AtomicBoolean over = new AtomicBoolean(false);
|
||||
|
||||
SizeAwareMetric metric = new SizeAwareMetric(0, 0, 0, 0);
|
||||
metric.setOverCallback(() -> over.set(true));
|
||||
metric.setElementsEnabled(true);
|
||||
metric.setSizeEnabled(true);
|
||||
metric.setMax(0, 0, 0, 0);
|
||||
|
||||
Assert.assertFalse(over.get());
|
||||
Assert.assertFalse(metric.isOver());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchSides() throws Exception {
|
||||
public void testSwitchSides() {
|
||||
SizeAwareMetric metric = new SizeAwareMetric(2000, 2000, 1, 1);
|
||||
AtomicBoolean over = new AtomicBoolean(false);
|
||||
|
||||
metric.setOverCallback(() -> over.set(true));
|
||||
metric.setUnderCallback(() -> over.set(false));
|
||||
metric.setElementsEnabled(true);
|
||||
metric.setSizeEnabled(true);
|
||||
|
||||
metric.addSize(2500, true);
|
||||
|
||||
|
@ -633,11 +632,18 @@ public class SizeAwareMetricTest {
|
|||
|
||||
Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
|
||||
|
||||
|
||||
Assert.assertEquals(0, metric.getSize());
|
||||
Assert.assertEquals(0, metric.getElements());
|
||||
Assert.assertEquals(0, errors.get());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsistency() {
|
||||
SizeAwareMetric metric = new SizeAwareMetric(-1, -1, -1, -1);
|
||||
Assert.assertFalse(metric.isSizeEnabled());
|
||||
Assert.assertFalse(metric.isElementsEnabled());
|
||||
metric.setMax(1, 1, 1, 1);
|
||||
Assert.assertTrue(metric.isSizeEnabled());
|
||||
Assert.assertTrue(metric.isElementsEnabled());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,8 +142,6 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
this.maxSize = maxSize;
|
||||
this.maxMessages = maxMessages;
|
||||
this.globalSizeMetric = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages);
|
||||
globalSizeMetric.setSizeEnabled(maxSize >= 0);
|
||||
globalSizeMetric.setElementsEnabled(maxMessages >= 0);
|
||||
globalSizeMetric.setOverCallback(() -> setGlobalFull(true));
|
||||
globalSizeMetric.setUnderCallback(() -> setGlobalFull(false));
|
||||
this.managerExecutor = pagingSPI.newExecutor();
|
||||
|
|
|
@ -217,9 +217,6 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
private void configureSizeMetric() {
|
||||
size.setMax(maxSize, maxSize, maxMessages, maxMessages);
|
||||
size.setSizeEnabled(maxSize >= 0);
|
||||
size.setElementsEnabled(maxMessages >= 0);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue