[UTILITIES] Introduce a RefCounted interface and basic impl

We already have two places duplicating this rather hairy logic, this
commit intorduces a new RefCoutned interace and an abstract implementation
that can be used for delegation. It factors out all the reference counting
and adds single and multithreaded test for it.

Closes #8210
This commit is contained in:
Simon Willnauer 2014-10-23 17:23:18 +02:00
parent f72e0c89f7
commit 347ce36654
5 changed files with 320 additions and 65 deletions

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A basic RefCounted implementation that is initialized with a
* ref count of 1 and calls {@link #closeInternal()} once it reaches
* a 0 ref count
*/
public abstract class AbstractRefCounted implements RefCounted {
private final AtomicInteger refCount = new AtomicInteger(1);
private final String name;
public AbstractRefCounted(String name) {
this.name = name;
}
@Override
public final void incRef() {
if (tryIncRef() == false) {
throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
}
}
@Override
public final boolean tryIncRef() {
do {
int i = refCount.get();
if (i > 0) {
if (refCount.compareAndSet(i, i + 1)) {
return true;
}
} else {
return false;
}
} while (true);
}
@Override
public final void decRef() {
int i = refCount.decrementAndGet();
assert i >= 0;
if (i == 0) {
closeInternal();
}
}
/**
* Returns the current reference count.
*/
public int refCount() {
return this.refCount.get();
}
protected abstract void closeInternal();
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
/**
* An interface for objects that need to be notified when all reference
* to itself are not in user anymore. This implements basic reference counting
* for instance if async operations holding on to services that are close concurrently
* but should be functional until all async operations have joined
* Classes implementing this interface should ref counted at any time ie. if an object is used it's reference count should
* be increased before using it by calling #incRef and a corresponding #decRef must be called in a try/finally
* block to release the object again ie.:
* <pre>
* inst.incRef();
* try {
* // use the inst...
*
* } finally {
* inst.decRef();
* }
* </pre>
*/
public interface RefCounted {
/**
* Increments the refCount of this instance.
*
* @see #decRef
* @see #tryIncRef()
* @throws org.apache.lucene.store.AlreadyClosedException iff the reference counter can not be incremented.
*/
void incRef();
/**
* Tries to increment the refCount of this instance. This method will return <tt>true</tt> iff the refCount was
*
* @see #decRef()
* @see #incRef()
*/
boolean tryIncRef();
/**
* Decreases the refCount of this instance. If the refCount drops to 0, then this
* instance is considered as closed and should not be used anymore.
*
* @see #incRef
*/
void decRef();
}

View File

@ -26,7 +26,6 @@ import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.lucene46.Lucene46SegmentInfoFormat;
import org.apache.lucene.index.*;
import org.apache.lucene.store.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.Version;
@ -42,6 +41,8 @@ import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.settings.IndexSettings;
@ -53,7 +54,6 @@ import java.io.*;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@ -77,7 +77,7 @@ import java.util.zip.Checksum;
* }
* </pre>
*/
public class Store extends AbstractIndexShardComponent implements CloseableIndexComponent, Closeable {
public class Store extends AbstractIndexShardComponent implements CloseableIndexComponent, Closeable, RefCounted {
private static final String CODEC = "store";
private static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
@ -86,11 +86,17 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
private static final String CORRUPTED = "corrupted_";
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicInteger refCount = new AtomicInteger(1);
private final CodecService codecService;
private final DirectoryService directoryService;
private final StoreDirectory directory;
private final DistributorDirectory distributorDirectory;
private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
protected void closeInternal() {
// close us once we are done
Store.this.closeInternal();
}
};
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException {
@ -134,8 +140,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
final void ensureOpen() { // for testing
if (this.refCount.get() <= 0) {
throw new AlreadyClosedException("Store is already closed");
if (this.refCounter.refCount() <= 0) {
throw new AlreadyClosedException("store is already closed");
}
}
@ -211,10 +217,9 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
* @see #tryIncRef()
* @throws AlreadyClosedException iff the reference counter can not be incremented.
*/
@Override
public final void incRef() {
if (tryIncRef() == false) {
throw new AlreadyClosedException("Store is already closed can't increment refCount current count [" + refCount.get() + "]");
}
refCounter.incRef();
}
/**
@ -229,17 +234,9 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
* @see #decRef()
* @see #incRef()
*/
@Override
public final boolean tryIncRef() {
do {
int i = refCount.get();
if (i > 0) {
if (refCount.compareAndSet(i, i + 1)) {
return true;
}
} else {
return false;
}
} while (true);
return refCounter.tryIncRef();
}
/**
@ -247,13 +244,9 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
* store is closed.
* @see #incRef
*/
@Override
public final void decRef() {
int i = refCount.decrementAndGet();
assert i >= 0;
if (i == 0) {
closeInternal();
}
refCounter.decRef();
}
@Override

View File

@ -27,13 +27,13 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Iterator;
@ -42,7 +42,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -51,7 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RecoveryStatus {
public class RecoveryStatus extends AbstractRefCounted {
private final ESLogger logger;
@ -72,13 +71,11 @@ public class RecoveryStatus {
private final AtomicBoolean finished = new AtomicBoolean();
// we start with 1 which will be decremented on cancel/close/failure
private final AtomicInteger refCount = new AtomicInteger(1);
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
public RecoveryStatus(InternalIndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings(), indexShard.shardId());
@ -260,40 +257,8 @@ public class RecoveryStatus {
return indexOutput;
}
/**
* Tries to increment the refCount of this RecoveryStatus instance. This method will return <tt>true</tt> iff the refCount was
* incremented successfully otherwise <tt>false</tt>. Be sure to always call a corresponding {@link #decRef}, in a finally clause;
*
* @see #decRef()
*/
public final boolean tryIncRef() {
do {
int i = refCount.get();
if (i > 0) {
if (refCount.compareAndSet(i, i + 1)) {
return true;
}
} else {
return false;
}
} while (true);
}
/**
* Decreases the refCount of this Store instance.If the refCount drops to 0, the recovery process this status represents
* is seen as done and resources and temporary files are deleted.
*
* @see #tryIncRef
*/
public final void decRef() {
int i = refCount.decrementAndGet();
assert i >= 0;
if (i == 0) {
closeInternal();
}
}
private void closeInternal() {
@Override
protected void closeInternal() {
try {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();

View File

@ -0,0 +1,153 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
*/
public class RefCountedTest extends ElasticsearchTestCase {
@Test
public void testRefCount() throws IOException {
MyRefCounted counted = new MyRefCounted();
int incs = randomIntBetween(1, 100);
for (int i = 0; i < incs; i++) {
if (randomBoolean()) {
counted.incRef();
} else {
assertTrue(counted.tryIncRef());
}
counted.ensureOpen();
}
for (int i = 0; i < incs; i++) {
counted.decRef();
counted.ensureOpen();
}
counted.incRef();
counted.decRef();
for (int i = 0; i < incs; i++) {
if (randomBoolean()) {
counted.incRef();
} else {
assertTrue(counted.tryIncRef());
}
counted.ensureOpen();
}
for (int i = 0; i < incs; i++) {
counted.decRef();
counted.ensureOpen();
}
counted.decRef();
assertFalse(counted.tryIncRef());
try {
counted.incRef();
fail(" expected exception");
} catch (AlreadyClosedException ex) {
assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]"));
}
try {
counted.ensureOpen();
fail(" expected exception");
} catch (AlreadyClosedException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
}
@Test
public void testMultiThreaded() throws InterruptedException {
final MyRefCounted counted = new MyRefCounted();
Thread[] threads = new Thread[randomIntBetween(2, 5)];
final CountDownLatch latch = new CountDownLatch(1);
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
latch.await();
for (int j = 0; j < 10000; j++) {
counted.incRef();
try {
counted.ensureOpen();
} finally {
counted.decRef();
}
}
} catch (Throwable e) {
exceptions.add(e);
}
}
};
threads[i].start();
}
latch.countDown();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
counted.decRef();
try {
counted.ensureOpen();
fail("expected to be closed");
} catch (AlreadyClosedException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
assertThat(counted.refCount(), is(0));
assertThat(exceptions, Matchers.emptyIterable());
}
private final class MyRefCounted extends AbstractRefCounted {
private final AtomicBoolean closed = new AtomicBoolean(false);
public MyRefCounted() {
super("test");
}
@Override
protected void closeInternal() {
this.closed.set(true);
}
public void ensureOpen() {
if (closed.get()) {
assert this.refCount() == 0;
throw new AlreadyClosedException("closed");
}
}
}
}