Add page tracking to MockPageCacheRecycler.
This found an issue in BytesRefHash that forgot to release the start offsets. Close #4814
This commit is contained in:
parent
3586157467
commit
af1513f908
|
@ -162,7 +162,7 @@ public final class BytesRefHash extends AbstractHash {
|
|||
super.release();
|
||||
success = true;
|
||||
} finally {
|
||||
Releasables.release(success, bytes, hashes);
|
||||
Releasables.release(success, bytes, hashes, startOffsets);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.cache.recycler;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.recycler.Recycler.V;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -26,9 +28,20 @@ import org.elasticsearch.test.TestCluster;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class MockPageCacheRecycler extends PageCacheRecycler {
|
||||
|
||||
private static final ConcurrentMap<Object, Throwable> ACQUIRED_PAGES = Maps.newConcurrentMap();
|
||||
|
||||
public static void ensureAllPagesAreReleased() {
|
||||
if (ACQUIRED_PAGES.size() > 0) {
|
||||
final Throwable t = ACQUIRED_PAGES.entrySet().iterator().next().getValue();
|
||||
throw new RuntimeException(ACQUIRED_PAGES.size() + " pages have not been released", t);
|
||||
}
|
||||
ACQUIRED_PAGES.clear();
|
||||
}
|
||||
|
||||
private final Random random;
|
||||
|
||||
@Inject
|
||||
|
@ -38,13 +51,45 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
|
|||
random = new Random(seed);
|
||||
}
|
||||
|
||||
private static <T> V<T> wrap(final V<T> v) {
|
||||
ACQUIRED_PAGES.put(v, new Throwable());
|
||||
final Thread t = Thread.currentThread();
|
||||
return new V<T>() {
|
||||
|
||||
@Override
|
||||
public boolean release() throws ElasticsearchException {
|
||||
if (t != Thread.currentThread()) {
|
||||
// Releasing from a different thread doesn't break anything but this is bad practice as pages should be acquired
|
||||
// as late as possible and released as soon as possible in a try/finally fashion
|
||||
throw new RuntimeException("Page was allocated in " + t + " but released in " + Thread.currentThread());
|
||||
}
|
||||
final Throwable t = ACQUIRED_PAGES.remove(v);
|
||||
if (t == null) {
|
||||
throw new IllegalStateException("Releasing a page that has not been acquired");
|
||||
}
|
||||
return v.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T v() {
|
||||
return v.v();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRecycled() {
|
||||
return v.isRecycled();
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public V<byte[]> bytePage(boolean clear) {
|
||||
final V<byte[]> page = super.bytePage(clear);
|
||||
if (!clear) {
|
||||
random.nextBytes(page.v());
|
||||
}
|
||||
return page;
|
||||
return wrap(page);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -55,7 +100,7 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
|
|||
page.v()[i] = random.nextInt();
|
||||
}
|
||||
}
|
||||
return page;
|
||||
return wrap(page);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,7 +111,7 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
|
|||
page.v()[i] = random.nextLong();
|
||||
}
|
||||
}
|
||||
return page;
|
||||
return wrap(page);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,7 +122,12 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
|
|||
page.v()[i] = random.nextDouble() - 0.5;
|
||||
}
|
||||
}
|
||||
return page;
|
||||
return wrap(page);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V<Object[]> objectPage() {
|
||||
return wrap(super.objectPage());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,6 +36,9 @@ public class BytesRefHashTests extends ElasticsearchTestCase {
|
|||
BytesRefHash hash;
|
||||
|
||||
private void newHash() {
|
||||
if (hash != null) {
|
||||
hash.release();
|
||||
}
|
||||
// Test high load factors to make sure that collision resolution works fine
|
||||
final float maxLoadFactor = 0.6f + randomFloat() * 0.39f;
|
||||
hash = new BytesRefHash(randomIntBetween(0, 100), maxLoadFactor, BigArraysTests.randomCacheRecycler());
|
||||
|
@ -48,7 +51,8 @@ public class BytesRefHashTests extends ElasticsearchTestCase {
|
|||
}
|
||||
|
||||
public void testDuell() {
|
||||
final BytesRef[] values = new BytesRef[randomIntBetween(1, 100000)];
|
||||
final int len = randomIntBetween(1, 100000);
|
||||
final BytesRef[] values = new BytesRef[len];
|
||||
for (int i = 0; i < values.length; ++i) {
|
||||
values[i] = new BytesRef(randomAsciiOfLength(5));
|
||||
}
|
||||
|
@ -111,6 +115,7 @@ public class BytesRefHashTests extends ElasticsearchTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
hash.release();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -150,6 +155,7 @@ public class BytesRefHashTests extends ElasticsearchTestCase {
|
|||
}
|
||||
newHash();
|
||||
}
|
||||
hash.release();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -190,6 +196,7 @@ public class BytesRefHashTests extends ElasticsearchTestCase {
|
|||
assertAllIn(strings, hash);
|
||||
newHash();
|
||||
}
|
||||
hash.release();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -225,6 +232,7 @@ public class BytesRefHashTests extends ElasticsearchTestCase {
|
|||
assertAllIn(strings, hash);
|
||||
newHash();
|
||||
}
|
||||
hash.release();
|
||||
}
|
||||
|
||||
private void assertAllIn(Set<String> strings, BytesRefHash hash) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.lucene.util.AbstractRandomizedTest;
|
|||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TimeUnits;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
|
||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|||
import org.elasticsearch.test.engine.MockInternalEngine;
|
||||
import org.elasticsearch.test.junit.listeners.LoggingListener;
|
||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
|
@ -109,6 +111,11 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
|
|||
return new File(uri);
|
||||
}
|
||||
|
||||
@After
|
||||
public void ensureAllPagesReleased() {
|
||||
MockPageCacheRecycler.ensureAllPagesAreReleased();
|
||||
}
|
||||
|
||||
public static void ensureAllFilesClosed() throws IOException {
|
||||
try {
|
||||
for (MockDirectoryHelper.ElasticsearchMockDirectoryWrapper w : MockDirectoryHelper.wrappers) {
|
||||
|
|
Loading…
Reference in New Issue