From 347ce36654449afcda68823bb774fafdb4c9619b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Oct 2014 17:23:18 +0200 Subject: [PATCH] [UTILITIES] Introduce a RefCounted interface and basic impl We already have two places duplicating this rather hairy logic, this commit intorduces a new RefCoutned interace and an abstract implementation that can be used for delegation. It factors out all the reference counting and adds single and multithreaded test for it. Closes #8210 --- .../util/concurrent/AbstractRefCounted.java | 77 +++++++++ .../common/util/concurrent/RefCounted.java | 67 ++++++++ .../org/elasticsearch/index/store/Store.java | 43 +++-- .../indices/recovery/RecoveryStatus.java | 45 +----- .../util/concurrent/RefCountedTest.java | 153 ++++++++++++++++++ 5 files changed, 320 insertions(+), 65 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java create mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java create mode 100644 src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTest.java diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java b/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java new file mode 100644 index 00000000000..6b74aec0da1 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.apache.lucene.store.AlreadyClosedException; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A basic RefCounted implementation that is initialized with a + * ref count of 1 and calls {@link #closeInternal()} once it reaches + * a 0 ref count + */ +public abstract class AbstractRefCounted implements RefCounted { + private final AtomicInteger refCount = new AtomicInteger(1); + private final String name; + + public AbstractRefCounted(String name) { + this.name = name; + } + + @Override + public final void incRef() { + if (tryIncRef() == false) { + throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]"); + } + } + + @Override + public final boolean tryIncRef() { + do { + int i = refCount.get(); + if (i > 0) { + if (refCount.compareAndSet(i, i + 1)) { + return true; + } + } else { + return false; + } + } while (true); + } + + @Override + public final void decRef() { + int i = refCount.decrementAndGet(); + assert i >= 0; + if (i == 0) { + closeInternal(); + } + + } + + /** + * Returns the current reference count. + */ + public int refCount() { + return this.refCount.get(); + } + + protected abstract void closeInternal(); +} diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java b/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java new file mode 100644 index 00000000000..8ffbbc9b6e4 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +/** + * An interface for objects that need to be notified when all reference + * to itself are not in user anymore. This implements basic reference counting + * for instance if async operations holding on to services that are close concurrently + * but should be functional until all async operations have joined + * Classes implementing this interface should ref counted at any time ie. if an object is used it's reference count should + * be increased before using it by calling #incRef and a corresponding #decRef must be called in a try/finally + * block to release the object again ie.: + *
+ *      inst.incRef();
+ *      try {
+ *        // use the inst...
+ *
+ *      } finally {
+ *          inst.decRef();
+ *      }
+ * 
+ */ +public interface RefCounted { + + /** + * Increments the refCount of this instance. + * + * @see #decRef + * @see #tryIncRef() + * @throws org.apache.lucene.store.AlreadyClosedException iff the reference counter can not be incremented. + */ + void incRef(); + + /** + * Tries to increment the refCount of this instance. This method will return true iff the refCount was + * + * @see #decRef() + * @see #incRef() + */ + boolean tryIncRef(); + + /** + * Decreases the refCount of this instance. If the refCount drops to 0, then this + * instance is considered as closed and should not be used anymore. + * + * @see #incRef + */ + void decRef(); + +} diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 402a511b40f..ab17f6e71ec 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -26,7 +26,6 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.lucene46.Lucene46SegmentInfoFormat; import org.apache.lucene.index.*; import org.apache.lucene.store.*; -import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.Version; @@ -42,6 +41,8 @@ import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.settings.IndexSettings; @@ -53,7 +54,6 @@ import java.io.*; import java.nio.file.NoSuchFileException; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -77,7 +77,7 @@ import java.util.zip.Checksum; * } * */ -public class Store extends AbstractIndexShardComponent implements CloseableIndexComponent, Closeable { +public class Store extends AbstractIndexShardComponent implements CloseableIndexComponent, Closeable, RefCounted { private static final String CODEC = "store"; private static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0 @@ -86,11 +86,17 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex private static final String CORRUPTED = "corrupted_"; private final AtomicBoolean isClosed = new AtomicBoolean(false); - private final AtomicInteger refCount = new AtomicInteger(1); private final CodecService codecService; private final DirectoryService directoryService; private final StoreDirectory directory; private final DistributorDirectory distributorDirectory; + private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { + @Override + protected void closeInternal() { + // close us once we are done + Store.this.closeInternal(); + } + }; @Inject public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException { @@ -134,8 +140,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } final void ensureOpen() { // for testing - if (this.refCount.get() <= 0) { - throw new AlreadyClosedException("Store is already closed"); + if (this.refCounter.refCount() <= 0) { + throw new AlreadyClosedException("store is already closed"); } } @@ -211,10 +217,9 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex * @see #tryIncRef() * @throws AlreadyClosedException iff the reference counter can not be incremented. */ + @Override public final void incRef() { - if (tryIncRef() == false) { - throw new AlreadyClosedException("Store is already closed can't increment refCount current count [" + refCount.get() + "]"); - } + refCounter.incRef(); } /** @@ -229,17 +234,9 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex * @see #decRef() * @see #incRef() */ + @Override public final boolean tryIncRef() { - do { - int i = refCount.get(); - if (i > 0) { - if (refCount.compareAndSet(i, i + 1)) { - return true; - } - } else { - return false; - } - } while (true); + return refCounter.tryIncRef(); } /** @@ -247,13 +244,9 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex * store is closed. * @see #incRef */ + @Override public final void decRef() { - int i = refCount.decrementAndGet(); - assert i >= 0; - if (i == 0) { - closeInternal(); - } - + refCounter.decRef(); } @Override diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java index 2095eb68a4a..60bf2994f3e 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java @@ -27,13 +27,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; -import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Iterator; @@ -42,7 +42,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -51,7 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; */ -public class RecoveryStatus { +public class RecoveryStatus extends AbstractRefCounted { private final ESLogger logger; @@ -72,13 +71,11 @@ public class RecoveryStatus { private final AtomicBoolean finished = new AtomicBoolean(); - // we start with 1 which will be decremented on cancel/close/failure - private final AtomicInteger refCount = new AtomicInteger(1); - private final ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); private final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums(); public RecoveryStatus(InternalIndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) { + super("recovery_status"); this.recoveryId = idGenerator.incrementAndGet(); this.listener = listener; this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings(), indexShard.shardId()); @@ -260,40 +257,8 @@ public class RecoveryStatus { return indexOutput; } - /** - * Tries to increment the refCount of this RecoveryStatus instance. This method will return true iff the refCount was - * incremented successfully otherwise false. Be sure to always call a corresponding {@link #decRef}, in a finally clause; - * - * @see #decRef() - */ - public final boolean tryIncRef() { - do { - int i = refCount.get(); - if (i > 0) { - if (refCount.compareAndSet(i, i + 1)) { - return true; - } - } else { - return false; - } - } while (true); - } - - /** - * Decreases the refCount of this Store instance.If the refCount drops to 0, the recovery process this status represents - * is seen as done and resources and temporary files are deleted. - * - * @see #tryIncRef - */ - public final void decRef() { - int i = refCount.decrementAndGet(); - assert i >= 0; - if (i == 0) { - closeInternal(); - } - } - - private void closeInternal() { + @Override + protected void closeInternal() { try { // clean open index outputs Iterator> iterator = openIndexOutputs.entrySet().iterator(); diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTest.java b/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTest.java new file mode 100644 index 00000000000..3ff0767fa44 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + */ +public class RefCountedTest extends ElasticsearchTestCase { + + @Test + public void testRefCount() throws IOException { + MyRefCounted counted = new MyRefCounted(); + + int incs = randomIntBetween(1, 100); + for (int i = 0; i < incs; i++) { + if (randomBoolean()) { + counted.incRef(); + } else { + assertTrue(counted.tryIncRef()); + } + counted.ensureOpen(); + } + + for (int i = 0; i < incs; i++) { + counted.decRef(); + counted.ensureOpen(); + } + + counted.incRef(); + counted.decRef(); + for (int i = 0; i < incs; i++) { + if (randomBoolean()) { + counted.incRef(); + } else { + assertTrue(counted.tryIncRef()); + } + counted.ensureOpen(); + } + + for (int i = 0; i < incs; i++) { + counted.decRef(); + counted.ensureOpen(); + } + + counted.decRef(); + assertFalse(counted.tryIncRef()); + try { + counted.incRef(); + fail(" expected exception"); + } catch (AlreadyClosedException ex) { + assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]")); + } + + try { + counted.ensureOpen(); + fail(" expected exception"); + } catch (AlreadyClosedException ex) { + assertThat(ex.getMessage(), equalTo("closed")); + } + } + + @Test + public void testMultiThreaded() throws InterruptedException { + final MyRefCounted counted = new MyRefCounted(); + Thread[] threads = new Thread[randomIntBetween(2, 5)]; + final CountDownLatch latch = new CountDownLatch(1); + final CopyOnWriteArrayList exceptions = new CopyOnWriteArrayList<>(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + latch.await(); + for (int j = 0; j < 10000; j++) { + counted.incRef(); + try { + counted.ensureOpen(); + } finally { + counted.decRef(); + } + } + } catch (Throwable e) { + exceptions.add(e); + } + } + }; + threads[i].start(); + } + latch.countDown(); + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + counted.decRef(); + try { + counted.ensureOpen(); + fail("expected to be closed"); + } catch (AlreadyClosedException ex) { + assertThat(ex.getMessage(), equalTo("closed")); + } + assertThat(counted.refCount(), is(0)); + assertThat(exceptions, Matchers.emptyIterable()); + + } + + private final class MyRefCounted extends AbstractRefCounted { + + private final AtomicBoolean closed = new AtomicBoolean(false); + + public MyRefCounted() { + super("test"); + } + + @Override + protected void closeInternal() { + this.closed.set(true); + } + + public void ensureOpen() { + if (closed.get()) { + assert this.refCount() == 0; + throw new AlreadyClosedException("closed"); + } + } + } +}