This commit is contained in:
Clebert Suconic 2020-05-15 09:23:14 -04:00
commit 5db1aad860
6 changed files with 443 additions and 34 deletions

View File

@ -24,6 +24,8 @@ public interface JournalFile {
void incNegCount(JournalFile file);
void incNegCount(JournalFile file, int delta);
int getPosCount();
void incPosCount();

View File

@ -92,12 +92,28 @@ public class JournalFileImpl implements JournalFile {
@Override
public void incNegCount(final JournalFile file) {
if (file != this) {
totalNegativeToOthers.incrementAndGet();
incNegCount(file, 1);
}
@Override
public void incNegCount(final JournalFile file, int delta) {
if (delta <= 0) {
throw new IllegalArgumentException("delta must be > 0");
}
AtomicInteger previous = negCounts.putIfAbsent(file, new AtomicInteger(1));
if (file != this) {
totalNegativeToOthers.addAndGet(delta);
}
// GC-free path: including capturing lambdas
AtomicInteger previous = negCounts.get(file);
if (previous != null) {
previous.incrementAndGet();
previous.addAndGet(delta);
return;
}
// no counter yet: slow path, allocating
previous = negCounts.putIfAbsent(file, new AtomicInteger(delta));
// racy attempt to create the counter
if (previous != null) {
previous.addAndGet(delta);
}
}

View File

@ -16,11 +16,6 @@
*/
package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.api.core.Pair;
/**
* This holds the relationship a record has with other files in regard to reference counting.
* Note: This class used to be called PosFiles
@ -29,11 +24,15 @@ import org.apache.activemq.artemis.api.core.Pair;
*/
public class JournalRecord {
// use a very small size to account for near empty cases
private static int INITIAL_FILES_CAPACITY = 5;
private final JournalFile addFile;
private final int size;
private List<Pair<JournalFile, Integer>> updateFiles;
// use this singleton to save using a separated boolean field to mark the "deleted" state
// that would enlarge JournalRecord of several bytes
private static final ObjIntIntArrayList<JournalFile> DELETED = new ObjIntIntArrayList<>(0);
private ObjIntIntArrayList<JournalFile> fileUpdates;
public JournalRecord(final JournalFile addFile, final int size) {
this.addFile = addFile;
@ -45,26 +44,45 @@ public class JournalRecord {
addFile.addSize(size);
}
void addUpdateFile(final JournalFile updateFile, final int size) {
if (updateFiles == null) {
updateFiles = new ArrayList<>();
void addUpdateFile(final JournalFile updateFile, final int bytes) {
checkNotDeleted();
if (bytes == 0) {
return;
}
updateFiles.add(new Pair<>(updateFile, size));
if (fileUpdates == null) {
fileUpdates = new ObjIntIntArrayList<>(INITIAL_FILES_CAPACITY);
}
final int files = fileUpdates.size();
if (files > 0) {
final int lastIndex = files - 1;
if (fileUpdates.addToIntsIfMatch(lastIndex, updateFile, bytes, 1)) {
updateFile.incPosCount();
updateFile.addSize(bytes);
return;
}
}
fileUpdates.add(updateFile, bytes, 1);
updateFile.incPosCount();
updateFile.addSize(size);
updateFile.addSize(bytes);
}
void delete(final JournalFile file) {
file.incNegCount(addFile);
addFile.decSize(size);
if (updateFiles != null) {
for (Pair<JournalFile, Integer> updFile : updateFiles) {
file.incNegCount(updFile.getA());
updFile.getA().decSize(updFile.getB());
checkNotDeleted();
final ObjIntIntArrayList<JournalFile> fileUpdates = this.fileUpdates;
try {
file.incNegCount(addFile);
addFile.decSize(size);
if (fileUpdates != null) {
// not-capturing lambda to save allocation
fileUpdates.forEach((updFile, bytes, posCount, f) -> {
f.incNegCount(updFile, posCount);
updFile.decSize(bytes);
}, file);
}
} finally {
if (fileUpdates != null) {
fileUpdates.clear();
this.fileUpdates = DELETED;
}
}
}
@ -74,16 +92,23 @@ public class JournalRecord {
StringBuilder buffer = new StringBuilder();
buffer.append("JournalRecord(add=" + addFile.getFile().getFileName());
if (updateFiles != null) {
for (Pair<JournalFile, Integer> update : updateFiles) {
buffer.append(", update=" + update.getA().getFile().getFileName());
final ObjIntIntArrayList<JournalFile> fileUpdates = this.fileUpdates;
if (fileUpdates != null) {
if (fileUpdates == DELETED) {
buffer.append(", deleted");
} else {
fileUpdates.forEach((file, ignoredA, ignoredB, builder) -> builder.append(", update=").append(file.getFile().getFileName()), buffer);
}
}
buffer.append(")");
return buffer.toString();
}
private void checkNotDeleted() {
if (fileUpdates == DELETED) {
throw new IllegalStateException("the record is already deleted");
}
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.journal.impl;
import java.util.Arrays;
import java.util.Objects;
/**
* Ordered collection of (T, int, int) tuples with positive integers and not null T.<br>
* This isn't supposed to be a generic collection, but with some effort can became one and
* could be moved into commons utils.
*/
final class ObjIntIntArrayList<T> {
private static final Object[] EMPTY_OBJECTS = new Object[0];
private static final long[] EMPTY_INTS = new long[0];
private Object[] objects;
private long[] ints;
private int size;
ObjIntIntArrayList(int initialCapacity) {
objects = initialCapacity == 0 ? EMPTY_OBJECTS : new Object[initialCapacity];
ints = initialCapacity == 0 ? EMPTY_INTS : new long[initialCapacity];
size = 0;
}
private static long packInts(int a, int b) {
if (a < 0 || b < 0) {
throw new IllegalArgumentException("a and b must be >= 0");
}
return (((long) a) << 32) | (b & 0xFFFFFFFFL);
}
private static int unpackA(long ints) {
return (int) (ints >> 32);
}
private static int unpackB(long ints) {
return (int) ints;
}
private void ensureCapacity() {
final int currentCapacity = objects.length;
final int expectedCapacity = size + 1;
if (expectedCapacity - currentCapacity <= 0) {
return;
}
grow(expectedCapacity, currentCapacity);
}
private void grow(int expectedCapacity, int currentCapacity) {
assert expectedCapacity - currentCapacity > 0;
int newCapacity = currentCapacity + (currentCapacity >> 1);
// to cover the 0,1 cases
if (newCapacity - expectedCapacity < 0) {
newCapacity = expectedCapacity;
}
if (newCapacity - 2147483639 > 0) {
newCapacity = hugeCapacity(expectedCapacity);
}
final Object[] oldObjects = objects;
final long[] oldInts = ints;
try {
final Object[] newObjects = Arrays.copyOf(oldObjects, newCapacity);
final long[] newInts = Arrays.copyOf(oldInts, newCapacity);
objects = newObjects;
ints = newInts;
} catch (OutOfMemoryError outOfMemoryError) {
// restore previous ones
objects = oldObjects;
ints = oldInts;
throw outOfMemoryError;
}
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) {
throw new OutOfMemoryError();
} else {
return minCapacity > 2147483639 ? 2147483647 : 2147483639;
}
}
public int size() {
return size;
}
public boolean addToIntsIfMatch(int index, T e, int deltaA, int deltaB) {
Objects.requireNonNull(e, "e must be not null");
if (deltaA < 0) {
throw new IllegalArgumentException("deltaA must be >= 0");
}
if (deltaB < 0) {
throw new IllegalArgumentException("deltaB must be >= 0");
}
if (index < 0 || index >= size) {
throw new IndexOutOfBoundsException("index must be >=0 and <" + size);
}
final Object elm = objects[index];
if (!Objects.equals(elm, e)) {
return false;
}
final long packedInts = ints[index];
final int oldA = unpackA(packedInts);
final long newA = oldA + deltaA;
// overflow check
if (newA < oldA) {
return false;
}
final int oldB = unpackB(packedInts);
final long newB = oldB + deltaB;
// overflow check
if (newB < oldB) {
return false;
}
ints[index] = packInts((int) newA, (int) newB);
return true;
}
public void add(final T e, final int a, int b) {
Objects.requireNonNull(e, "e must be not null");
final long packedInts = packInts(a, b);
ensureCapacity();
objects[size] = e;
ints[size] = packedInts;
size++;
}
public void clear() {
Arrays.fill(objects, 0, size, null);
size = 0;
}
@FunctionalInterface
public interface ObjIntIntConsumerOneArg<T, A> {
void accept(T e, int a, int b, A arg);
}
public <A> void forEach(ObjIntIntConsumerOneArg<? super T, ? super A> onFile, A arg) {
for (int i = 0, size = this.size; i < size; i++) {
final T e = (T) objects[i];
final long packedInts = ints[i];
onFile.accept(e, unpackA(packedInts), unpackB(packedInts), arg);
}
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class ObjIntIntArrayListTest {
@Test
public void addShouldAppendTuplesInOrder() {
List<Integer> expectedAList = new ArrayList<>();
List<Integer> expectedBList = new ArrayList<>();
List<Integer> expectedCList = new ArrayList<>();
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
final int elementsToAdd = 10;
assertEquals(0, list.size());
for (int i = 0; i < elementsToAdd; i++) {
Integer a = i;
Integer b = i + 1;
Integer c = i + 2;
list.add(a, b, c);
assertEquals(i + 1, list.size());
expectedAList.add(a);
expectedBList.add(b);
expectedCList.add(c);
}
List<Integer> aList = new ArrayList<>();
List<Integer> bList = new ArrayList<>();
List<Integer> cList = new ArrayList<>();
final Object expectedArg = new Object();
list.forEach((a, b, c, arg) -> {
aList.add(a);
bList.add(b);
cList.add(c);
Assert.assertSame(expectedArg, arg);
}, expectedArg);
assertThat(aList, equalTo(expectedAList));
assertThat(bList, equalTo(expectedBList));
assertThat(cList, equalTo(expectedCList));
}
@Test(expected = NullPointerException.class)
public void addShouldFailAppendNull() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.add(null, 1, 2);
}
@Test(expected = IllegalArgumentException.class)
public void addShouldFailAppendNegativeA() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.add(0, -1, 1);
}
@Test(expected = IllegalArgumentException.class)
public void addShouldFailAppendNegativeB() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.add(0, 1, -1);
}
@Test
public void clearShouldDeleteAllElements() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.add(1, 3, 3);
assertEquals(1, list.size());
list.clear();
assertEquals(0, list.size());
list.forEach((a, b, c, ignored) -> {
Assert.fail("the list should be empty");
}, null);
}
@Test
public void updateIfMatchShouldReturnFalseOnMissingElement() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
Integer e = 1;
list.add(e, 2, 3);
Assert.assertFalse(list.addToIntsIfMatch(0, 2, 1, 1));
list.forEach((a, b, c, ignored) -> {
Assert.assertEquals(e, a);
Assert.assertEquals(2, b);
Assert.assertEquals(3, c);
}, null);
}
@Test
public void updateIfMatchShouldReturnFalseOnOverflowOfA() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
Integer e = 1;
list.add(e, Integer.MAX_VALUE, 3);
Assert.assertFalse(list.addToIntsIfMatch(0, e, Integer.MAX_VALUE, 1));
list.forEach((a, b, c, ignored) -> {
Assert.assertEquals(e, a);
Assert.assertEquals(Integer.MAX_VALUE, b);
Assert.assertEquals(3, c);
}, null);
}
@Test
public void updateIfMatchShouldReturnFalseOnOverflowOfB() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
Integer e = 1;
list.add(e, 2, Integer.MAX_VALUE);
Assert.assertFalse(list.addToIntsIfMatch(0, e, 1, Integer.MAX_VALUE));
list.forEach((a, b, c, ignored) -> {
Assert.assertEquals(e, a);
Assert.assertEquals(2, b);
Assert.assertEquals(Integer.MAX_VALUE, c);
}, null);
}
@Test(expected = IndexOutOfBoundsException.class)
public void updateIfMatchShouldFailOnNegativeIndex() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.addToIntsIfMatch(-1, 1, 1, 1);
}
@Test(expected = IndexOutOfBoundsException.class)
public void updateIfMatchShouldFailBeyondSize() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.addToIntsIfMatch(0, 1, 1, 1);
}
@Test(expected = NullPointerException.class)
public void updateIfMatchShouldFailOnNull() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.add(1, 2, 3);
list.addToIntsIfMatch(0, null, 1, 1);
}
@Test(expected = IllegalArgumentException.class)
public void updateIfMatchShouldFailOnNegativeDeltaA() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
Integer e = 1;
list.add(1, 2, 3);
list.addToIntsIfMatch(0, e, -1, 1);
}
@Test(expected = IllegalArgumentException.class)
public void updateIfMatchShouldFailOnNegativeDeltaB() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
Integer e = 1;
list.add(e, 2, 3);
list.addToIntsIfMatch(0, e, 1, -1);
}
@Test
public void updateIfMatchShouldModifyExistingTuple() {
ObjIntIntArrayList<Integer> list = new ObjIntIntArrayList<>(0);
list.add(1, 2, 3);
list.add(2, 3, 4);
list.add(3, 4, 5);
Assert.assertTrue(list.addToIntsIfMatch(0, 1, 1, 1));
Assert.assertTrue(list.addToIntsIfMatch(1, 2, 1, 1));
Assert.assertTrue(list.addToIntsIfMatch(2, 3, 1, 1));
Assert.assertEquals(3, list.size());
List<Integer> eList = new ArrayList<>();
List<Integer> aList = new ArrayList<>();
List<Integer> bList = new ArrayList<>();
list.forEach((e, a, b, ignore) -> {
eList.add(e);
aList.add(a);
bList.add(b);
}, null);
Assert.assertEquals(3, eList.size());
Assert.assertEquals(3, aList.size());
Assert.assertEquals(3, bList.size());
for (int i = 0; i < 3; i++) {
final int index = i;
final Integer expectedE = index + 1;
final Integer expectedA = expectedE + 2;
final Integer expectedB = expectedE + 3;
Assert.assertEquals(expectedE, eList.get(index));
Assert.assertEquals(expectedA, aList.get(index));
Assert.assertEquals(expectedB, bList.get(index));
}
}
}

View File

@ -746,13 +746,18 @@ public class ReclaimerTest extends ActiveMQTestBase {
@Override
public void incNegCount(final JournalFile file) {
incNegCount(file, 1);
}
@Override
public void incNegCount(JournalFile file, int delta) {
Integer count = negCounts.get(file);
int c = count == null ? 1 : count.intValue() + 1;
int c = count == null ? delta : count.intValue() + delta;
negCounts.put(file, c);
totalDep++;
totalDep += delta;
}
@Override