Add operation counter for IndexShard
Adds a container that represents a resource with reference counting capabilities. Provides operations to suspend acquisition of new references. Useful for resource management when resources are intermittently unavailable. Closes #15956
This commit is contained in:
parent
d91a898f6a
commit
e1006ea400
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.util.concurrent;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
|
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Container that represents a resource with reference counting capabilities. Provides operations to suspend acquisition of new references.
|
||||||
|
* This is useful for resource management when resources are intermittently unavailable.
|
||||||
|
*
|
||||||
|
* Assumes less than Integer.MAX_VALUE references are concurrently being held at one point in time.
|
||||||
|
*/
|
||||||
|
public final class SuspendableRefContainer {
|
||||||
|
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
|
||||||
|
private final Semaphore semaphore;
|
||||||
|
|
||||||
|
public SuspendableRefContainer() {
|
||||||
|
// fair semaphore to ensure that blockAcquisition() does not starve under thread contention
|
||||||
|
this.semaphore = new Semaphore(TOTAL_PERMITS, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries acquiring a reference. Returns reference holder if reference acquisition is not blocked at the time of invocation (see
|
||||||
|
* {@link #blockAcquisition()}). Returns null if reference acquisition is blocked at the time of invocation.
|
||||||
|
*
|
||||||
|
* @return reference holder if reference acquisition is not blocked, null otherwise
|
||||||
|
* @throws InterruptedException if the current thread is interrupted
|
||||||
|
*/
|
||||||
|
public Releasable tryAcquire() throws InterruptedException {
|
||||||
|
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting
|
||||||
|
return idempotentRelease(1);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a reference. Blocks if reference acquisition is blocked at the time of invocation.
|
||||||
|
*
|
||||||
|
* @return reference holder
|
||||||
|
* @throws InterruptedException if the current thread is interrupted
|
||||||
|
*/
|
||||||
|
public Releasable acquire() throws InterruptedException {
|
||||||
|
semaphore.acquire();
|
||||||
|
return idempotentRelease(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a reference. Blocks if reference acquisition is blocked at the time of invocation.
|
||||||
|
*
|
||||||
|
* @return reference holder
|
||||||
|
*/
|
||||||
|
public Releasable acquireUninterruptibly() {
|
||||||
|
semaphore.acquireUninterruptibly();
|
||||||
|
return idempotentRelease(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disables reference acquisition and waits until all existing references are released.
|
||||||
|
* When released, reference acquisition is enabled again.
|
||||||
|
* This guarantees that between successful acquisition and release, no one is holding a reference.
|
||||||
|
*
|
||||||
|
* @return references holder to all references
|
||||||
|
*/
|
||||||
|
public Releasable blockAcquisition() {
|
||||||
|
semaphore.acquireUninterruptibly(TOTAL_PERMITS);
|
||||||
|
return idempotentRelease(TOTAL_PERMITS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method that ensures permits are only released once
|
||||||
|
*
|
||||||
|
* @return reference holder
|
||||||
|
*/
|
||||||
|
private Releasable idempotentRelease(int permits) {
|
||||||
|
AtomicBoolean closed = new AtomicBoolean();
|
||||||
|
return () -> {
|
||||||
|
if (closed.compareAndSet(false, true)) {
|
||||||
|
semaphore.release(permits);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of references currently being held.
|
||||||
|
*/
|
||||||
|
public int activeRefs() {
|
||||||
|
int availablePermits = semaphore.availablePermits();
|
||||||
|
if (availablePermits == 0) {
|
||||||
|
// when blockAcquisition is holding all permits
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return TOTAL_PERMITS - availablePermits;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.util.concurrent;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
|
public class SuspendableRefContainerTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testBasicAcquire() throws InterruptedException {
|
||||||
|
SuspendableRefContainer refContainer = new SuspendableRefContainer();
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
|
||||||
|
Releasable lock1 = randomLockingMethod(refContainer);
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||||
|
Releasable lock2 = randomLockingMethod(refContainer);
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(2));
|
||||||
|
lock1.close();
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||||
|
lock1.close(); // check idempotence
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||||
|
lock2.close();
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAcquisitionBlockingBlocksNewAcquisitions() throws InterruptedException {
|
||||||
|
SuspendableRefContainer refContainer = new SuspendableRefContainer();
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
|
||||||
|
try (Releasable block = refContainer.blockAcquisition()) {
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
assertThat(refContainer.tryAcquire(), nullValue());
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
}
|
||||||
|
try (Releasable lock = refContainer.tryAcquire()) {
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// same with blocking acquire
|
||||||
|
AtomicBoolean acquired = new AtomicBoolean();
|
||||||
|
Thread t = new Thread(() -> {
|
||||||
|
try (Releasable lock = randomBoolean() ? refContainer.acquire() : refContainer.acquireUninterruptibly()) {
|
||||||
|
acquired.set(true);
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
fail("Interrupted");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try (Releasable block = refContainer.blockAcquisition()) {
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
t.start();
|
||||||
|
// check that blocking acquire really blocks
|
||||||
|
assertThat(acquired.get(), equalTo(false));
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
}
|
||||||
|
t.join();
|
||||||
|
assertThat(acquired.get(), equalTo(true));
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAcquisitionBlockingWaitsOnExistingAcquisitions() throws InterruptedException {
|
||||||
|
SuspendableRefContainer refContainer = new SuspendableRefContainer();
|
||||||
|
|
||||||
|
AtomicBoolean acquired = new AtomicBoolean();
|
||||||
|
Thread t = new Thread(() -> {
|
||||||
|
try (Releasable block = refContainer.blockAcquisition()) {
|
||||||
|
acquired.set(true);
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try (Releasable lock = randomLockingMethod(refContainer)) {
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||||
|
t.start();
|
||||||
|
assertThat(acquired.get(), equalTo(false));
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||||
|
}
|
||||||
|
t.join();
|
||||||
|
assertThat(acquired.get(), equalTo(true));
|
||||||
|
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Releasable randomLockingMethod(SuspendableRefContainer refContainer) throws InterruptedException {
|
||||||
|
switch (randomInt(2)) {
|
||||||
|
case 0: return refContainer.tryAcquire();
|
||||||
|
case 1: return refContainer.acquire();
|
||||||
|
case 2: return refContainer.acquireUninterruptibly();
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("randomLockingMethod inconsistent");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue