Remove close method in PageCacheRecycler/Recycler (#41917)
The changes in #39317 brought to light some concurrency issues in the close method of Recyclers as we do not wait for threads running in the threadpool to be finished prior to the closing of the PageCacheRecycler and the Recyclers that are used internally. #41695 was opened to address the concurrent close issues but upon review, the closing of these classes is not really needed as the instances should be become available for garbage collection once there is no longer a reference to the closed node. Closes #41683
This commit is contained in:
parent
44c3418531
commit
80432a3552
|
@ -184,7 +184,6 @@ public abstract class TransportClient extends AbstractClient {
|
|||
resourcesToClose.add(circuitBreakerService);
|
||||
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
|
||||
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
|
||||
resourcesToClose.add(pageCacheRecycler);
|
||||
modules.add(settingsModule);
|
||||
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
|
||||
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
|
||||
|
@ -376,7 +375,6 @@ public abstract class TransportClient extends AbstractClient {
|
|||
closeables.add(plugin);
|
||||
}
|
||||
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
|
||||
closeables.add(injector.getInstance(PageCacheRecycler.class));
|
||||
IOUtils.closeWhileHandlingException(closeables);
|
||||
}
|
||||
|
||||
|
|
|
@ -28,9 +28,4 @@ abstract class AbstractRecycler<T> implements Recycler<T> {
|
|||
this.c = c;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// no-op by default
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,13 +37,6 @@ public class ConcurrentDequeRecycler<T> extends DequeRecycler<T> {
|
|||
this.size = new AtomicInteger();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
assert deque.size() == size.get();
|
||||
super.close();
|
||||
size.set(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V<T> obtain() {
|
||||
final V<T> v = super.obtain();
|
||||
|
|
|
@ -36,16 +36,6 @@ public class DequeRecycler<T> extends AbstractRecycler<T> {
|
|||
this.maxSize = maxSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// call destroy() for every cached object
|
||||
for (T t : deque) {
|
||||
c.destroy(t);
|
||||
}
|
||||
// finally get rid of all references
|
||||
deque.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public V<T> obtain() {
|
||||
final T v = deque.pollFirst();
|
||||
|
|
|
@ -34,9 +34,4 @@ abstract class FilterRecycler<T> implements Recycler<T> {
|
|||
return wrap(getDelegate().obtain());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
getDelegate().close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,11 +31,6 @@ public class NoneRecycler<T> extends AbstractRecycler<T> {
|
|||
return new NV<>(c.newInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// no-op
|
||||
}
|
||||
|
||||
public static class NV<T> implements Recycler.V<T> {
|
||||
|
||||
T value;
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.elasticsearch.common.lease.Releasable;
|
|||
* A recycled object, note, implementations should support calling obtain and then recycle
|
||||
* on different threads.
|
||||
*/
|
||||
public interface Recycler<T> extends Releasable {
|
||||
public interface Recycler<T> {
|
||||
|
||||
interface Factory<T> {
|
||||
Recycler<T> build();
|
||||
|
@ -53,8 +53,6 @@ public interface Recycler<T> extends Releasable {
|
|||
|
||||
}
|
||||
|
||||
void close();
|
||||
|
||||
V<T> obtain();
|
||||
|
||||
}
|
||||
|
|
|
@ -145,13 +145,6 @@ public enum Recyclers {
|
|||
return recyclers[slot()];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (Recycler<T> recycler : recyclers) {
|
||||
recycler.close();
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.recycler.AbstractRecyclerC;
|
||||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -39,7 +37,7 @@ import static org.elasticsearch.common.recycler.Recyclers.dequeFactory;
|
|||
import static org.elasticsearch.common.recycler.Recyclers.none;
|
||||
|
||||
/** A recycler of fixed-size pages. */
|
||||
public class PageCacheRecycler implements Releasable {
|
||||
public class PageCacheRecycler {
|
||||
|
||||
public static final Setting<Type> TYPE_SETTING =
|
||||
new Setting<>("cache.recycler.page.type", Type.CONCURRENT.name(), Type::parse, Property.NodeScope);
|
||||
|
@ -73,11 +71,6 @@ public class PageCacheRecycler implements Releasable {
|
|||
NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(true, bytePage, intPage, longPage, objectPage);
|
||||
}
|
||||
|
||||
public PageCacheRecycler(Settings settings) {
|
||||
final Type type = TYPE_SETTING.get(settings);
|
||||
final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
|
||||
|
|
|
@ -376,7 +376,6 @@ public class Node implements Closeable {
|
|||
|
||||
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
|
||||
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
|
||||
resourcesToClose.add(pageCacheRecycler);
|
||||
modules.add(settingsModule);
|
||||
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
|
||||
NetworkModule.getNamedWriteables().stream(),
|
||||
|
@ -842,8 +841,6 @@ public class Node implements Closeable {
|
|||
toClose.add(() -> stopWatch.stop().start("node_environment"));
|
||||
|
||||
toClose.add(injector.getInstance(NodeEnvironment.class));
|
||||
toClose.add(() -> stopWatch.stop().start("page_cache_recycler"));
|
||||
toClose.add(injector.getInstance(PageCacheRecycler.class));
|
||||
toClose.add(stopWatch::stop);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
|
|
@ -99,7 +99,6 @@ public abstract class AbstractRecyclerTestCase extends ESTestCase {
|
|||
assertNotSame(b1, b2);
|
||||
}
|
||||
o.close();
|
||||
r.close();
|
||||
}
|
||||
|
||||
public void testRecycle() {
|
||||
|
@ -111,7 +110,6 @@ public abstract class AbstractRecyclerTestCase extends ESTestCase {
|
|||
o = r.obtain();
|
||||
assertRecycled(o.v());
|
||||
o.close();
|
||||
r.close();
|
||||
}
|
||||
|
||||
public void testDoubleRelease() {
|
||||
|
@ -128,7 +126,6 @@ public abstract class AbstractRecyclerTestCase extends ESTestCase {
|
|||
final Recycler.V<byte[]> v2 = r.obtain();
|
||||
final Recycler.V<byte[]> v3 = r.obtain();
|
||||
assertNotSame(v2.v(), v3.v());
|
||||
r.close();
|
||||
}
|
||||
|
||||
public void testDestroyWhenOverCapacity() {
|
||||
|
@ -152,9 +149,6 @@ public abstract class AbstractRecyclerTestCase extends ESTestCase {
|
|||
// release first ref, verify for destruction
|
||||
o.close();
|
||||
assertDead(data);
|
||||
|
||||
// close the rest
|
||||
r.close();
|
||||
}
|
||||
|
||||
public void testClose() {
|
||||
|
@ -171,10 +165,6 @@ public abstract class AbstractRecyclerTestCase extends ESTestCase {
|
|||
|
||||
// verify that recycle() ran
|
||||
assertRecycled(data);
|
||||
|
||||
// closing the recycler should mark recycled instances via destroy()
|
||||
r.close();
|
||||
assertDead(data);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue