diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/SuspendableRefContainer.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/SuspendableRefContainer.java new file mode 100644 index 00000000000..2afb78591dd --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/SuspendableRefContainer.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.lease.Releasable; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Container that represents a resource with reference counting capabilities. Provides operations to suspend acquisition of new references. + * This is useful for resource management when resources are intermittently unavailable. + * + * Assumes less than Integer.MAX_VALUE references are concurrently being held at one point in time. + */ +public final class SuspendableRefContainer { + private static final int TOTAL_PERMITS = Integer.MAX_VALUE; + private final Semaphore semaphore; + + public SuspendableRefContainer() { + // fair semaphore to ensure that blockAcquisition() does not starve under thread contention + this.semaphore = new Semaphore(TOTAL_PERMITS, true); + } + + /** + * Tries acquiring a reference. Returns reference holder if reference acquisition is not blocked at the time of invocation (see + * {@link #blockAcquisition()}). Returns null if reference acquisition is blocked at the time of invocation. + * + * @return reference holder if reference acquisition is not blocked, null otherwise + * @throws InterruptedException if the current thread is interrupted + */ + public Releasable tryAcquire() throws InterruptedException { + if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting + return idempotentRelease(1); + } else { + return null; + } + } + + /** + * Acquires a reference. Blocks if reference acquisition is blocked at the time of invocation. + * + * @return reference holder + * @throws InterruptedException if the current thread is interrupted + */ + public Releasable acquire() throws InterruptedException { + semaphore.acquire(); + return idempotentRelease(1); + } + + /** + * Acquires a reference. Blocks if reference acquisition is blocked at the time of invocation. + * + * @return reference holder + */ + public Releasable acquireUninterruptibly() { + semaphore.acquireUninterruptibly(); + return idempotentRelease(1); + } + + /** + * Disables reference acquisition and waits until all existing references are released. + * When released, reference acquisition is enabled again. + * This guarantees that between successful acquisition and release, no one is holding a reference. + * + * @return references holder to all references + */ + public Releasable blockAcquisition() { + semaphore.acquireUninterruptibly(TOTAL_PERMITS); + return idempotentRelease(TOTAL_PERMITS); + } + + /** + * Helper method that ensures permits are only released once + * + * @return reference holder + */ + private Releasable idempotentRelease(int permits) { + AtomicBoolean closed = new AtomicBoolean(); + return () -> { + if (closed.compareAndSet(false, true)) { + semaphore.release(permits); + } + }; + } + + /** + * Returns the number of references currently being held. + */ + public int activeRefs() { + int availablePermits = semaphore.availablePermits(); + if (availablePermits == 0) { + // when blockAcquisition is holding all permits + return 0; + } else { + return TOTAL_PERMITS - availablePermits; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/SuspendableRefContainerTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/SuspendableRefContainerTests.java new file mode 100644 index 00000000000..83db2d4a7c6 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/SuspendableRefContainerTests.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class SuspendableRefContainerTests extends ESTestCase { + + public void testBasicAcquire() throws InterruptedException { + SuspendableRefContainer refContainer = new SuspendableRefContainer(); + assertThat(refContainer.activeRefs(), equalTo(0)); + + Releasable lock1 = randomLockingMethod(refContainer); + assertThat(refContainer.activeRefs(), equalTo(1)); + Releasable lock2 = randomLockingMethod(refContainer); + assertThat(refContainer.activeRefs(), equalTo(2)); + lock1.close(); + assertThat(refContainer.activeRefs(), equalTo(1)); + lock1.close(); // check idempotence + assertThat(refContainer.activeRefs(), equalTo(1)); + lock2.close(); + assertThat(refContainer.activeRefs(), equalTo(0)); + } + + public void testAcquisitionBlockingBlocksNewAcquisitions() throws InterruptedException { + SuspendableRefContainer refContainer = new SuspendableRefContainer(); + assertThat(refContainer.activeRefs(), equalTo(0)); + + try (Releasable block = refContainer.blockAcquisition()) { + assertThat(refContainer.activeRefs(), equalTo(0)); + assertThat(refContainer.tryAcquire(), nullValue()); + assertThat(refContainer.activeRefs(), equalTo(0)); + } + try (Releasable lock = refContainer.tryAcquire()) { + assertThat(refContainer.activeRefs(), equalTo(1)); + } + + // same with blocking acquire + AtomicBoolean acquired = new AtomicBoolean(); + Thread t = new Thread(() -> { + try (Releasable lock = randomBoolean() ? refContainer.acquire() : refContainer.acquireUninterruptibly()) { + acquired.set(true); + assertThat(refContainer.activeRefs(), equalTo(1)); + } catch (InterruptedException e) { + fail("Interrupted"); + } + }); + try (Releasable block = refContainer.blockAcquisition()) { + assertThat(refContainer.activeRefs(), equalTo(0)); + t.start(); + // check that blocking acquire really blocks + assertThat(acquired.get(), equalTo(false)); + assertThat(refContainer.activeRefs(), equalTo(0)); + } + t.join(); + assertThat(acquired.get(), equalTo(true)); + assertThat(refContainer.activeRefs(), equalTo(0)); + } + + public void testAcquisitionBlockingWaitsOnExistingAcquisitions() throws InterruptedException { + SuspendableRefContainer refContainer = new SuspendableRefContainer(); + + AtomicBoolean acquired = new AtomicBoolean(); + Thread t = new Thread(() -> { + try (Releasable block = refContainer.blockAcquisition()) { + acquired.set(true); + assertThat(refContainer.activeRefs(), equalTo(0)); + } + }); + try (Releasable lock = randomLockingMethod(refContainer)) { + assertThat(refContainer.activeRefs(), equalTo(1)); + t.start(); + assertThat(acquired.get(), equalTo(false)); + assertThat(refContainer.activeRefs(), equalTo(1)); + } + t.join(); + assertThat(acquired.get(), equalTo(true)); + assertThat(refContainer.activeRefs(), equalTo(0)); + } + + private Releasable randomLockingMethod(SuspendableRefContainer refContainer) throws InterruptedException { + switch (randomInt(2)) { + case 0: return refContainer.tryAcquire(); + case 1: return refContainer.acquire(); + case 2: return refContainer.acquireUninterruptibly(); + } + throw new IllegalArgumentException("randomLockingMethod inconsistent"); + } +}