ARTEMIS-2762 JournalRecord is using too much memory to track JournalFile updates
This commit is contained in:
parent
5a4057dbc0
commit
4e1af3aea3
|
@ -24,6 +24,8 @@ public interface JournalFile {
|
|||
|
||||
void incNegCount(JournalFile file);
|
||||
|
||||
void incNegCount(JournalFile file, int delta);
|
||||
|
||||
int getPosCount();
|
||||
|
||||
void incPosCount();
|
||||
|
|
|
@ -92,12 +92,28 @@ public class JournalFileImpl implements JournalFile {
|
|||
|
||||
@Override
|
||||
public void incNegCount(final JournalFile file) {
|
||||
if (file != this) {
|
||||
totalNegativeToOthers.incrementAndGet();
|
||||
incNegCount(file, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incNegCount(final JournalFile file, int delta) {
|
||||
if (delta <= 0) {
|
||||
throw new IllegalArgumentException("delta must be > 0");
|
||||
}
|
||||
AtomicInteger previous = negCounts.putIfAbsent(file, new AtomicInteger(1));
|
||||
if (file != this) {
|
||||
totalNegativeToOthers.addAndGet(delta);
|
||||
}
|
||||
// GC-free path: including capturing lambdas
|
||||
AtomicInteger previous = negCounts.get(file);
|
||||
if (previous != null) {
|
||||
previous.incrementAndGet();
|
||||
previous.addAndGet(delta);
|
||||
return;
|
||||
}
|
||||
// no counter yet: slow path, allocating
|
||||
previous = negCounts.putIfAbsent(file, new AtomicInteger(delta));
|
||||
// racy attempt to create the counter
|
||||
if (previous != null) {
|
||||
previous.addAndGet(delta);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,11 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.journal.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
|
||||
/**
|
||||
* This holds the relationship a record has with other files in regard to reference counting.
|
||||
* Note: This class used to be called PosFiles
|
||||
|
@ -29,11 +24,15 @@ import org.apache.activemq.artemis.api.core.Pair;
|
|||
*/
|
||||
public class JournalRecord {
|
||||
|
||||
// use a very small size to account for near empty cases
|
||||
private static int INITIAL_FILES_CAPACITY = 5;
|
||||
private final JournalFile addFile;
|
||||
|
||||
private final int size;
|
||||
|
||||
private List<Pair<JournalFile, Integer>> updateFiles;
|
||||
// use this singleton to save using a separated boolean field to mark the "deleted" state
|
||||
// that would enlarge JournalRecord of several bytes
|
||||
private static final ObjIntIntArrayList<JournalFile> DELETED = new ObjIntIntArrayList<>(0);
|
||||
private ObjIntIntArrayList<JournalFile> fileUpdates;
|
||||
|
||||
public JournalRecord(final JournalFile addFile, final int size) {
|
||||
this.addFile = addFile;
|
||||
|
@ -45,26 +44,45 @@ public class JournalRecord {
|
|||
addFile.addSize(size);
|
||||
}
|
||||
|
||||
void addUpdateFile(final JournalFile updateFile, final int size) {
|
||||
if (updateFiles == null) {
|
||||
updateFiles = new ArrayList<>();
|
||||
void addUpdateFile(final JournalFile updateFile, final int bytes) {
|
||||
checkNotDeleted();
|
||||
if (bytes == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
updateFiles.add(new Pair<>(updateFile, size));
|
||||
|
||||
if (fileUpdates == null) {
|
||||
fileUpdates = new ObjIntIntArrayList<>(INITIAL_FILES_CAPACITY);
|
||||
}
|
||||
final int files = fileUpdates.size();
|
||||
if (files > 0) {
|
||||
final int lastIndex = files - 1;
|
||||
if (fileUpdates.addToIntsIfMatch(lastIndex, updateFile, bytes, 1)) {
|
||||
updateFile.incPosCount();
|
||||
updateFile.addSize(bytes);
|
||||
return;
|
||||
}
|
||||
}
|
||||
fileUpdates.add(updateFile, bytes, 1);
|
||||
updateFile.incPosCount();
|
||||
|
||||
updateFile.addSize(size);
|
||||
updateFile.addSize(bytes);
|
||||
}
|
||||
|
||||
void delete(final JournalFile file) {
|
||||
file.incNegCount(addFile);
|
||||
addFile.decSize(size);
|
||||
|
||||
if (updateFiles != null) {
|
||||
for (Pair<JournalFile, Integer> updFile : updateFiles) {
|
||||
file.incNegCount(updFile.getA());
|
||||
updFile.getA().decSize(updFile.getB());
|
||||
checkNotDeleted();
|
||||
final ObjIntIntArrayList<JournalFile> fileUpdates = this.fileUpdates;
|
||||
try {
|
||||
file.incNegCount(addFile);
|
||||
addFile.decSize(size);
|
||||
if (fileUpdates != null) {
|
||||
// not-capturing lambda to save allocation
|
||||
fileUpdates.forEach((updFile, bytes, posCount, f) -> {
|
||||
f.incNegCount(updFile, posCount);
|
||||
updFile.decSize(bytes);
|
||||
}, file);
|
||||
}
|
||||
} finally {
|
||||
if (fileUpdates != null) {
|
||||
fileUpdates.clear();
|
||||
this.fileUpdates = DELETED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -74,16 +92,23 @@ public class JournalRecord {
|
|||
StringBuilder buffer = new StringBuilder();
|
||||
buffer.append("JournalRecord(add=" + addFile.getFile().getFileName());
|
||||
|
||||
if (updateFiles != null) {
|
||||
|
||||
for (Pair<JournalFile, Integer> update : updateFiles) {
|
||||
buffer.append(", update=" + update.getA().getFile().getFileName());
|
||||
final ObjIntIntArrayList<JournalFile> fileUpdates = this.fileUpdates;
|
||||
if (fileUpdates != null) {
|
||||
if (fileUpdates == DELETED) {
|
||||
buffer.append(", deleted");
|
||||
} else {
|
||||
fileUpdates.forEach((file, ignoredA, ignoredB, builder) -> builder.append(", update=").append(file.getFile().getFileName()), buffer);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
buffer.append(")");
|
||||
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
private void checkNotDeleted() {
|
||||
if (fileUpdates == DELETED) {
|
||||
throw new IllegalStateException("the record is already deleted");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.journal.impl;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Ordered collection of (T, int, int) tuples with positive integers and not null T.<br>
|
||||
* This isn't supposed to be a generic collection, but with some effort can became one and
|
||||
* could be moved into commons utils.
|
||||
*/
|
||||
final class ObjIntIntArrayList<T> {
|
||||
|
||||
private static final Object[] EMPTY_OBJECTS = new Object[0];
|
||||
private static final long[] EMPTY_INTS = new long[0];
|
||||
private Object[] objects;
|
||||
private long[] ints;
|
||||
private int size;
|
||||
|
||||
ObjIntIntArrayList(int initialCapacity) {
|
||||
objects = initialCapacity == 0 ? EMPTY_OBJECTS : new Object[initialCapacity];
|
||||
ints = initialCapacity == 0 ? EMPTY_INTS : new long[initialCapacity];
|
||||
size = 0;
|
||||
}
|
||||
|
||||
private static long packInts(int a, int b) {
|
||||
if (a < 0 || b < 0) {
|
||||
throw new IllegalArgumentException("a and b must be >= 0");
|
||||
}
|
||||
return (((long) a) << 32) | (b & 0xFFFFFFFFL);
|
||||
}
|
||||
|
||||
private static int unpackA(long ints) {
|
||||
return (int) (ints >> 32);
|
||||
}
|
||||
|
||||
private static int unpackB(long ints) {
|
||||
return (int) ints;
|
||||
}
|
||||
|
||||
private void ensureCapacity() {
|
||||
final int currentCapacity = objects.length;
|
||||
final int expectedCapacity = size + 1;
|
||||
if (expectedCapacity - currentCapacity <= 0) {
|
||||
return;
|
||||
}
|
||||
grow(expectedCapacity, currentCapacity);
|
||||
}
|
||||
|
||||
private void grow(int expectedCapacity, int currentCapacity) {
|
||||
assert expectedCapacity - currentCapacity > 0;
|
||||
int newCapacity = currentCapacity + (currentCapacity >> 1);
|
||||
// to cover the 0,1 cases
|
||||
if (newCapacity - expectedCapacity < 0) {
|
||||
newCapacity = expectedCapacity;
|
||||
}
|
||||
if (newCapacity - 2147483639 > 0) {
|
||||
newCapacity = hugeCapacity(expectedCapacity);
|
||||
}
|
||||
final Object[] oldObjects = objects;
|
||||
final long[] oldInts = ints;
|
||||
try {
|
||||
final Object[] newObjects = Arrays.copyOf(oldObjects, newCapacity);
|
||||
final long[] newInts = Arrays.copyOf(oldInts, newCapacity);
|
||||
objects = newObjects;
|
||||
ints = newInts;
|
||||
} catch (OutOfMemoryError outOfMemoryError) {
|
||||
// restore previous ones
|
||||
objects = oldObjects;
|
||||
ints = oldInts;
|
||||
throw outOfMemoryError;
|
||||
}
|
||||
}
|
||||
|
||||
private static int hugeCapacity(int minCapacity) {
|
||||
if (minCapacity < 0) {
|
||||
throw new OutOfMemoryError();
|
||||
} else {
|
||||
return minCapacity > 2147483639 ? 2147483647 : 2147483639;
|
||||
}
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public boolean addToIntsIfMatch(int index, T e, int deltaA, int deltaB) {
|
||||
Objects.requireNonNull(e, "e must be not null");
|
||||
if (deltaA < 0) {
|
||||
throw new IllegalArgumentException("deltaA must be >= 0");
|
||||
}
|
||||
if (deltaB < 0) {
|
||||
throw new IllegalArgumentException("deltaB must be >= 0");
|
||||
}
|
||||
if (index < 0 || index >= size) {
|
||||
throw new IndexOutOfBoundsException("index must be >=0 and <" + size);
|
||||
}
|
||||
final Object elm = objects[index];
|
||||
if (!Objects.equals(elm, e)) {
|
||||
return false;
|
||||
}
|
||||
final long packedInts = ints[index];
|
||||
final int oldA = unpackA(packedInts);
|
||||
final long newA = oldA + deltaA;
|
||||
// overflow check
|
||||
if (newA < oldA) {
|
||||
return false;
|
||||
}
|
||||
final int oldB = unpackB(packedInts);
|
||||
final long newB = oldB + deltaB;
|
||||
// overflow check
|
||||
if (newB < oldB) {
|
||||
return false;
|
||||
}
|
||||
ints[index] = packInts((int) newA, (int) newB);
|
||||
return true;
|
||||
}
|
||||
|
||||
public void add(final T e, final int a, int b) {
|
||||
Objects.requireNonNull(e, "e must be not null");
|
||||
final long packedInts = packInts(a, b);
|
||||
ensureCapacity();
|
||||
objects[size] = e;
|
||||
ints[size] = packedInts;
|
||||
size++;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
Arrays.fill(objects, 0, size, null);
|
||||
size = 0;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface ObjIntIntConsumerOneArg<T, A> {
|
||||
|
||||
void accept(T e, int a, int b, A arg);
|
||||
}
|
||||
|
||||
public <A> void forEach(ObjIntIntConsumerOneArg<? super T, ? super A> onFile, A arg) {
|
||||
for (int i = 0, size = this.size; i < size; i++) {
|
||||
final T e = (T) objects[i];
|
||||
final long packedInts = ints[i];
|
||||
onFile.accept(e, unpackA(packedInts), unpackB(packedInts), arg);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.journal.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class ObjIntIntArrayListTest {
|
||||
|
||||
@Test
|
||||
public void addShouldAppendTuplesInOrder() {
|
||||
List<Integer> expectedAList = new ArrayList<>();
|
||||
List<Integer> expectedBList = new ArrayList<>();
|
||||
List<Integer> expectedCList = new ArrayList<>();
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
final int elementsToAdd = 10;
|
||||
assertEquals(0, list.size());
|
||||
for (int i = 0; i < elementsToAdd; i++) {
|
||||
Integer a = i;
|
||||
Integer b = i + 1;
|
||||
Integer c = i + 2;
|
||||
list.add(a, b, c);
|
||||
assertEquals(i + 1, list.size());
|
||||
expectedAList.add(a);
|
||||
expectedBList.add(b);
|
||||
expectedCList.add(c);
|
||||
}
|
||||
List<Integer> aList = new ArrayList<>();
|
||||
List<Integer> bList = new ArrayList<>();
|
||||
List<Integer> cList = new ArrayList<>();
|
||||
final Object expectedArg = new Object();
|
||||
list.forEach((a, b, c, arg) -> {
|
||||
aList.add(a);
|
||||
bList.add(b);
|
||||
cList.add(c);
|
||||
Assert.assertSame(expectedArg, arg);
|
||||
}, expectedArg);
|
||||
assertThat(aList, equalTo(expectedAList));
|
||||
assertThat(bList, equalTo(expectedBList));
|
||||
assertThat(cList, equalTo(expectedCList));
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void addShouldFailAppendNull() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.add(null, 1, 2);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void addShouldFailAppendNegativeA() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.add(0, -1, 1);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void addShouldFailAppendNegativeB() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.add(0, 1, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clearShouldDeleteAllElements() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.add(1, 3, 3);
|
||||
assertEquals(1, list.size());
|
||||
list.clear();
|
||||
assertEquals(0, list.size());
|
||||
list.forEach((a, b, c, ignored) -> {
|
||||
Assert.fail("the list should be empty");
|
||||
}, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateIfMatchShouldReturnFalseOnMissingElement() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
Integer e = 1;
|
||||
list.add(e, 2, 3);
|
||||
Assert.assertFalse(list.addToIntsIfMatch(0, 2, 1, 1));
|
||||
list.forEach((a, b, c, ignored) -> {
|
||||
Assert.assertEquals(e, a);
|
||||
Assert.assertEquals(2, b);
|
||||
Assert.assertEquals(3, c);
|
||||
}, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateIfMatchShouldReturnFalseOnOverflowOfA() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
Integer e = 1;
|
||||
list.add(e, Integer.MAX_VALUE, 3);
|
||||
Assert.assertFalse(list.addToIntsIfMatch(0, e, Integer.MAX_VALUE, 1));
|
||||
list.forEach((a, b, c, ignored) -> {
|
||||
Assert.assertEquals(e, a);
|
||||
Assert.assertEquals(Integer.MAX_VALUE, b);
|
||||
Assert.assertEquals(3, c);
|
||||
}, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateIfMatchShouldReturnFalseOnOverflowOfB() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
Integer e = 1;
|
||||
list.add(e, 2, Integer.MAX_VALUE);
|
||||
Assert.assertFalse(list.addToIntsIfMatch(0, e, 1, Integer.MAX_VALUE));
|
||||
list.forEach((a, b, c, ignored) -> {
|
||||
Assert.assertEquals(e, a);
|
||||
Assert.assertEquals(2, b);
|
||||
Assert.assertEquals(Integer.MAX_VALUE, c);
|
||||
}, null);
|
||||
}
|
||||
|
||||
@Test(expected = IndexOutOfBoundsException.class)
|
||||
public void updateIfMatchShouldFailOnNegativeIndex() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.addToIntsIfMatch(-1, 1, 1, 1);
|
||||
}
|
||||
|
||||
@Test(expected = IndexOutOfBoundsException.class)
|
||||
public void updateIfMatchShouldFailBeyondSize() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.addToIntsIfMatch(0, 1, 1, 1);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void updateIfMatchShouldFailOnNull() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.add(1, 2, 3);
|
||||
list.addToIntsIfMatch(0, null, 1, 1);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void updateIfMatchShouldFailOnNegativeDeltaA() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
Integer e = 1;
|
||||
list.add(1, 2, 3);
|
||||
list.addToIntsIfMatch(0, e, -1, 1);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void updateIfMatchShouldFailOnNegativeDeltaB() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
Integer e = 1;
|
||||
list.add(e, 2, 3);
|
||||
list.addToIntsIfMatch(0, e, 1, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateIfMatchShouldModifyExistingTuple() {
|
||||
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
|
||||
list.add(1, 2, 3);
|
||||
list.add(2, 3, 4);
|
||||
list.add(3, 4, 5);
|
||||
Assert.assertTrue(list.addToIntsIfMatch(0, 1, 1, 1));
|
||||
Assert.assertTrue(list.addToIntsIfMatch(1, 2, 1, 1));
|
||||
Assert.assertTrue(list.addToIntsIfMatch(2, 3, 1, 1));
|
||||
Assert.assertEquals(3, list.size());
|
||||
List<Integer> eList = new ArrayList<>();
|
||||
List<Integer> aList = new ArrayList<>();
|
||||
List<Integer> bList = new ArrayList<>();
|
||||
list.forEach((e, a, b, ignore) -> {
|
||||
eList.add(e);
|
||||
aList.add(a);
|
||||
bList.add(b);
|
||||
}, null);
|
||||
Assert.assertEquals(3, eList.size());
|
||||
Assert.assertEquals(3, aList.size());
|
||||
Assert.assertEquals(3, bList.size());
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final int index = i;
|
||||
final Integer expectedE = index + 1;
|
||||
final Integer expectedA = expectedE + 2;
|
||||
final Integer expectedB = expectedE + 3;
|
||||
Assert.assertEquals(expectedE, eList.get(index));
|
||||
Assert.assertEquals(expectedA, aList.get(index));
|
||||
Assert.assertEquals(expectedB, bList.get(index));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -746,13 +746,18 @@ public class ReclaimerTest extends ActiveMQTestBase {
|
|||
|
||||
@Override
|
||||
public void incNegCount(final JournalFile file) {
|
||||
incNegCount(file, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incNegCount(JournalFile file, int delta) {
|
||||
Integer count = negCounts.get(file);
|
||||
|
||||
int c = count == null ? 1 : count.intValue() + 1;
|
||||
int c = count == null ? delta : count.intValue() + delta;
|
||||
|
||||
negCounts.put(file, c);
|
||||
|
||||
totalDep++;
|
||||
totalDep += delta;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue