[TRANSLOG] Remove channel refcounting for assertions

This refcounting doesn't work for shadow replicas since we open
the same translog file from more than one node while running a rolling
restart. This functionality is also superseeded by our filesystem abstraction
which detects file leaks under the hood.
This commit is contained in:
Simon Willnauer 2015-05-04 14:20:39 +02:00
parent bbffca193c
commit 24e73a2c83
5 changed files with 15 additions and 159 deletions

View File

@ -31,6 +31,8 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
class ChannelReference extends AbstractRefCounted {
@ -47,8 +49,6 @@ class ChannelReference extends AbstractRefCounted {
this.channel = FileChannel.open(file, openOptions);
try {
this.stream = TranslogStreams.translogStreamFor(file);
final Map<FsChannelReader, RuntimeException> existing = openedFiles.put(file().toString(), ConcurrentCollections.<FsChannelReader, RuntimeException>newConcurrentMap());
assert existing == null || existing.size() == 0 : "a channel for the file[" + file + "] was previously opened from " + ExceptionsHelper.stackTrace(Iterables.getFirst(existing.values(), null));
} catch (Throwable t) {
IOUtils.closeWhileHandlingException(channel);
throw t;
@ -67,27 +67,6 @@ class ChannelReference extends AbstractRefCounted {
return this.stream;
}
/**
* called to add this owner to the list of reference holders (used for leakage detection).
* also asserts that there is no double "attachment"
*/
boolean assertAttach(FsChannelReader owner) {
Map<FsChannelReader, RuntimeException> ownerMap = openedFiles.get(file().toString());
Throwable previous = ownerMap.put(owner, new RuntimeException(file.toString() + " attached", null));
assert previous == null : "double attachment by the same owner";
return true;
}
/** removes an owner to the least of list holders (used for leakage detection).
* also asserts that this owner did attach before.
*/
boolean assertDetach(FsChannelReader owner) {
Map<FsChannelReader, RuntimeException> ownerMap = openedFiles.get(file().toString());
Throwable previous = ownerMap.remove(owner);
assert previous != null : "reader detaches, but was never attached";
return true;
}
@Override
public String toString() {
return "channel: file [" + file + "], ref count [" + refCount() + "]";
@ -96,21 +75,5 @@ class ChannelReference extends AbstractRefCounted {
@Override
protected void closeInternal() {
IOUtils.closeWhileHandlingException(channel);
assert openedFiles.remove(file().toString()) != null;
}
// per file, which objects refer to it and a throwable of the allocation code
static final Map<String, Map<FsChannelReader, RuntimeException>> openedFiles;
static {
boolean assertsEnabled = false;
assert (assertsEnabled = true);
if (assertsEnabled) {
openedFiles = ConcurrentCollections.newConcurrentMap();
} else {
openedFiles = null;
}
}
}

View File

@ -45,7 +45,6 @@ public abstract class FsChannelReader implements Closeable, Comparable<FsChannel
this.id = id;
this.channelReference = channelReference;
this.channel = channelReference.channel();
assert channelReference.assertAttach(this);
}
public long translogId() {
@ -117,7 +116,6 @@ public abstract class FsChannelReader implements Closeable, Comparable<FsChannel
}
protected void doClose() throws IOException {
assert channelReference.assertDetach(this);
channelReference.decRef();
}

View File

@ -151,9 +151,15 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog,
if (indexSettingsService != null) {
indexSettingsService.addListener(applySettings);
}
recoverFromFiles();
// now that we know which files are there, create a new current one.
current = createTranslogFile(null);
try {
recoverFromFiles();
// now that we know which files are there, create a new current one.
current = createTranslogFile(null);
} catch (Throwable t) {
// close the opened translog files if we fail to create a new translog...
IOUtils.closeWhileHandlingException(uncommittedTranslogs);
throw t;
}
}
/** recover all translog files found on disk */
@ -269,59 +275,6 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog,
return size;
}
/** asserts that all files were closed, if not throws an {@link AssertionError} with details regarding the open files */
public static void assertAllClosed() {
if (ChannelReference.openedFiles == null) {
return;
}
RuntimeException exampleAllocator = null;
ArrayList<String> files = new ArrayList<>();
for (Map.Entry<String, Map<FsChannelReader, RuntimeException>> file : ChannelReference.openedFiles.entrySet()) {
files.add(file.getKey());
for (RuntimeException allocator : file.getValue().values()) {
if (exampleAllocator == null) {
exampleAllocator = new RuntimeException(file.getKey() + " is still open", allocator);
} else {
exampleAllocator.addSuppressed(allocator);
}
}
}
if (exampleAllocator != null) {
throw new AssertionError("some translog files are still open [" + Strings.collectionToCommaDelimitedString(files) + "]", exampleAllocator);
}
}
/** force close an open reference captured in assertion code * */
public static void assertForceCloseAllReferences() {
if (ChannelReference.openedFiles == null) {
return;
}
for (Map.Entry<String, Map<FsChannelReader, RuntimeException>> file : ChannelReference.openedFiles.entrySet()) {
IOUtils.closeWhileHandlingException(file.getValue().keySet());
}
}
/** gets a list of unreferenced files (only works if assertions are enabled, returns an empty array otherwise) */
public String[] getUnreferenced() throws IOException {
if (ChannelReference.openedFiles == null) {
return Strings.EMPTY_ARRAY; // not supported
}
ArrayList<String> result = new ArrayList<>();
try (ReleasableLock lock = writeLock.acquire()) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
for (Path file : stream) {
final long id = parseIdFromFileName(file);
if (id < 0) {
logger.trace("failed to extract translog id from [{}]", file);
} else if (ChannelReference.openedFiles.containsKey(file.toString()) == false) {
result.add(file.toString());
}
}
}
}
return result.toArray(Strings.EMPTY_ARRAY);
}
@Override
public void markCommitted(final long translogId) throws FileNotFoundException {
try (ReleasableLock lock = writeLock.acquire()) {

View File

@ -179,42 +179,10 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@After
public void tearDown() throws Exception {
super.tearDown();
try {
assertTranslogNotLeaking((FsTranslog) engine.translog());
assertTranslogNotLeaking((FsTranslog) replicaEngine.translog());
} finally {
IOUtils.close(
replicaEngine, storeReplica,
engine, store);
terminate(threadPool);
}
assertTranslogFilesClosed();
}
protected void assertTranslogNotLeaking(final FsTranslog translog) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
try {
assertThat(translog.getUnreferenced(), emptyArray());
} catch (IOException e) {
throw new ElasticsearchException("error while checking for unreferenced files in translog", e);
}
}
});
}
protected void assertTranslogFilesClosed() throws Exception {
try {
assertBusy(new Runnable() {
@Override
public void run() {
FsTranslog.assertAllClosed();
}
});
} finally {
FsTranslog.assertForceCloseAllReferences();
}
IOUtils.close(
replicaEngine, storeReplica,
engine, store);
terminate(threadPool);
}

View File

@ -72,7 +72,6 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
super.afterIfSuccessful();
if (translog.isOpen()) {
assertNotLeaking();
if (translog.currentId() > 1) {
translog.markCommitted(translog.currentId());
assertFileDeleted(translog, translog.currentId() - 1);
@ -106,27 +105,6 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
protected abstract FsTranslog create() throws IOException;
protected void assertNotLeaking() throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
try {
assertThat(translog.getUnreferenced(), emptyArray());
} catch (IOException e) {
logger.warn("error while checking for unreferenced files in translog");
}
}
});
}
protected void assertTranslogFilesClosed() throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
FsTranslog.assertAllClosed();
}
});
}
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) {
list.add(op);
@ -354,7 +332,6 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
ArrayList<Translog.Operation> secOps = new ArrayList<>();
addToTranslogAndList(translog, secOps, new Translog.Index("test", "2", new byte[]{2}));
assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1));
assertNotLeaking();
Translog.Snapshot secondSnapshot = translog.newSnapshot();
translog.add(new Translog.Index("test", "3", new byte[]{3}));
@ -362,17 +339,14 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
assertThat(secondSnapshot.estimatedTotalOperations(), equalTo(1));
assertFileIsPresent(translog, 1);
assertFileIsPresent(translog, 2);
assertNotLeaking();
firstSnapshot.close();
assertFileDeleted(translog, 1);
assertFileIsPresent(translog, 2);
secondSnapshot.close();
assertFileIsPresent(translog, 2); // it's the current nothing should be deleted
assertNotLeaking();
translog.newTranslog();
translog.markCommitted(translog.currentId());
assertNotLeaking();
assertFileIsPresent(translog, 3); // it's the current nothing should be deleted
assertFileDeleted(translog, 2);