From 24e73a2c83ff334047a5bb9858be7be22c4e04bf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 4 May 2015 14:20:39 +0200 Subject: [PATCH] [TRANSLOG] Remove channel refcounting for assertions This refcounting doesn't work for shadow replicas since we open the same translog file from more than one node while running a rolling restart. This functionality is also superseeded by our filesystem abstraction which detects file leaks under the hood. --- .../index/translog/fs/ChannelReference.java | 41 +----------- .../index/translog/fs/FsChannelReader.java | 2 - .../index/translog/fs/FsTranslog.java | 65 +++---------------- .../index/engine/InternalEngineTests.java | 40 ++---------- .../translog/fs/AbstractTranslogTests.java | 26 -------- 5 files changed, 15 insertions(+), 159 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/translog/fs/ChannelReference.java b/src/main/java/org/elasticsearch/index/translog/fs/ChannelReference.java index f5a9ed2ee08..8e62da169af 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/ChannelReference.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/ChannelReference.java @@ -31,6 +31,8 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.OpenOption; import java.nio.file.Path; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.Map; class ChannelReference extends AbstractRefCounted { @@ -47,8 +49,6 @@ class ChannelReference extends AbstractRefCounted { this.channel = FileChannel.open(file, openOptions); try { this.stream = TranslogStreams.translogStreamFor(file); - final Map existing = openedFiles.put(file().toString(), ConcurrentCollections.newConcurrentMap()); - assert existing == null || existing.size() == 0 : "a channel for the file[" + file + "] was previously opened from " + ExceptionsHelper.stackTrace(Iterables.getFirst(existing.values(), null)); } catch (Throwable t) { IOUtils.closeWhileHandlingException(channel); throw t; @@ -67,27 +67,6 @@ class ChannelReference extends AbstractRefCounted { return this.stream; } - /** - * called to add this owner to the list of reference holders (used for leakage detection). - * also asserts that there is no double "attachment" - */ - boolean assertAttach(FsChannelReader owner) { - Map ownerMap = openedFiles.get(file().toString()); - Throwable previous = ownerMap.put(owner, new RuntimeException(file.toString() + " attached", null)); - assert previous == null : "double attachment by the same owner"; - return true; - } - - /** removes an owner to the least of list holders (used for leakage detection). - * also asserts that this owner did attach before. - */ - boolean assertDetach(FsChannelReader owner) { - Map ownerMap = openedFiles.get(file().toString()); - Throwable previous = ownerMap.remove(owner); - assert previous != null : "reader detaches, but was never attached"; - return true; - } - @Override public String toString() { return "channel: file [" + file + "], ref count [" + refCount() + "]"; @@ -96,21 +75,5 @@ class ChannelReference extends AbstractRefCounted { @Override protected void closeInternal() { IOUtils.closeWhileHandlingException(channel); - assert openedFiles.remove(file().toString()) != null; } - - // per file, which objects refer to it and a throwable of the allocation code - static final Map> openedFiles; - - static { - boolean assertsEnabled = false; - assert (assertsEnabled = true); - if (assertsEnabled) { - openedFiles = ConcurrentCollections.newConcurrentMap(); - } else { - openedFiles = null; - } - } - - } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java index b3af0219a31..31ec0a07209 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelReader.java @@ -45,7 +45,6 @@ public abstract class FsChannelReader implements Closeable, Comparable files = new ArrayList<>(); - for (Map.Entry> file : ChannelReference.openedFiles.entrySet()) { - files.add(file.getKey()); - for (RuntimeException allocator : file.getValue().values()) { - if (exampleAllocator == null) { - exampleAllocator = new RuntimeException(file.getKey() + " is still open", allocator); - } else { - exampleAllocator.addSuppressed(allocator); - } - } - } - if (exampleAllocator != null) { - throw new AssertionError("some translog files are still open [" + Strings.collectionToCommaDelimitedString(files) + "]", exampleAllocator); - } - } - - /** force close an open reference captured in assertion code * */ - public static void assertForceCloseAllReferences() { - if (ChannelReference.openedFiles == null) { - return; - } - for (Map.Entry> file : ChannelReference.openedFiles.entrySet()) { - IOUtils.closeWhileHandlingException(file.getValue().keySet()); - } - } - - /** gets a list of unreferenced files (only works if assertions are enabled, returns an empty array otherwise) */ - public String[] getUnreferenced() throws IOException { - if (ChannelReference.openedFiles == null) { - return Strings.EMPTY_ARRAY; // not supported - } - ArrayList result = new ArrayList<>(); - try (ReleasableLock lock = writeLock.acquire()) { - try (DirectoryStream stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) { - for (Path file : stream) { - final long id = parseIdFromFileName(file); - if (id < 0) { - logger.trace("failed to extract translog id from [{}]", file); - } else if (ChannelReference.openedFiles.containsKey(file.toString()) == false) { - result.add(file.toString()); - } - } - } - } - return result.toArray(Strings.EMPTY_ARRAY); - } - @Override public void markCommitted(final long translogId) throws FileNotFoundException { try (ReleasableLock lock = writeLock.acquire()) { diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5ab877f7fa6..bb1bde36ca0 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -179,42 +179,10 @@ public class InternalEngineTests extends ElasticsearchTestCase { @After public void tearDown() throws Exception { super.tearDown(); - try { - assertTranslogNotLeaking((FsTranslog) engine.translog()); - assertTranslogNotLeaking((FsTranslog) replicaEngine.translog()); - } finally { - IOUtils.close( - replicaEngine, storeReplica, - engine, store); - terminate(threadPool); - } - assertTranslogFilesClosed(); - } - - protected void assertTranslogNotLeaking(final FsTranslog translog) throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - try { - assertThat(translog.getUnreferenced(), emptyArray()); - } catch (IOException e) { - throw new ElasticsearchException("error while checking for unreferenced files in translog", e); - } - } - }); - } - - protected void assertTranslogFilesClosed() throws Exception { - try { - assertBusy(new Runnable() { - @Override - public void run() { - FsTranslog.assertAllClosed(); - } - }); - } finally { - FsTranslog.assertForceCloseAllReferences(); - } + IOUtils.close( + replicaEngine, storeReplica, + engine, store); + terminate(threadPool); } diff --git a/src/test/java/org/elasticsearch/index/translog/fs/AbstractTranslogTests.java b/src/test/java/org/elasticsearch/index/translog/fs/AbstractTranslogTests.java index 4e1c8c0c674..9b96a852996 100644 --- a/src/test/java/org/elasticsearch/index/translog/fs/AbstractTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/translog/fs/AbstractTranslogTests.java @@ -72,7 +72,6 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase { super.afterIfSuccessful(); if (translog.isOpen()) { - assertNotLeaking(); if (translog.currentId() > 1) { translog.markCommitted(translog.currentId()); assertFileDeleted(translog, translog.currentId() - 1); @@ -106,27 +105,6 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase { protected abstract FsTranslog create() throws IOException; - protected void assertNotLeaking() throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - try { - assertThat(translog.getUnreferenced(), emptyArray()); - } catch (IOException e) { - logger.warn("error while checking for unreferenced files in translog"); - } - } - }); - } - - protected void assertTranslogFilesClosed() throws Exception { - assertBusy(new Runnable() { - @Override - public void run() { - FsTranslog.assertAllClosed(); - } - }); - } protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) { list.add(op); @@ -354,7 +332,6 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase { ArrayList secOps = new ArrayList<>(); addToTranslogAndList(translog, secOps, new Translog.Index("test", "2", new byte[]{2})); assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1)); - assertNotLeaking(); Translog.Snapshot secondSnapshot = translog.newSnapshot(); translog.add(new Translog.Index("test", "3", new byte[]{3})); @@ -362,17 +339,14 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase { assertThat(secondSnapshot.estimatedTotalOperations(), equalTo(1)); assertFileIsPresent(translog, 1); assertFileIsPresent(translog, 2); - assertNotLeaking(); firstSnapshot.close(); assertFileDeleted(translog, 1); assertFileIsPresent(translog, 2); secondSnapshot.close(); assertFileIsPresent(translog, 2); // it's the current nothing should be deleted - assertNotLeaking(); translog.newTranslog(); translog.markCommitted(translog.currentId()); - assertNotLeaking(); assertFileIsPresent(translog, 3); // it's the current nothing should be deleted assertFileDeleted(translog, 2);