Engine: close snapshots before recovery counter

#9760 was a fix for translog leaking due to measing a delete flag. This is not needed here as we have a better solution to not loose the flag. This commit takes the changes from 1x in order  to keep the code base similar and enjoy the extra tests.

Closes #9760
This commit is contained in:
Boaz Leskes 2015-02-19 11:46:10 +01:00
parent 702b2abd1b
commit 0f1c779d2c
4 changed files with 48 additions and 46 deletions

View File

@ -28,7 +28,6 @@ import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
@ -778,7 +777,7 @@ public class InternalEngine extends Engine {
recoveryHandler.phase1(phase1Snapshot);
} catch (Throwable e) {
maybeFailEngine("recovery phase 1", e);
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
Releasables.closeWhileHandlingException(phase1Snapshot, onGoingRecoveries);
throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
}
@ -787,14 +786,14 @@ public class InternalEngine extends Engine {
phase2Snapshot = translog.snapshot();
} catch (Throwable e) {
maybeFailEngine("snapshot recovery", e);
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
Releasables.closeWhileHandlingException(phase1Snapshot, onGoingRecoveries);
throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e));
}
try {
recoveryHandler.phase2(phase2Snapshot);
} catch (Throwable e) {
maybeFailEngine("recovery phase 2", e);
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot);
Releasables.closeWhileHandlingException(phase1Snapshot, phase2Snapshot, onGoingRecoveries);
throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
}
@ -810,8 +809,8 @@ public class InternalEngine extends Engine {
maybeFailEngine("recovery phase 3", e);
throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
} finally {
Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot,
phase2Snapshot, phase3Snapshot); // hmm why can't we use try-with here?
Releasables.close(success, phase1Snapshot, phase2Snapshot, phase3Snapshot,
onGoingRecoveries, writeLock); // hmm why can't we use try-with here?
}
}

View File

@ -121,8 +121,10 @@ public interface Translog extends IndexShardComponent, Closeable, Accountable {
/**
* Clears unreferenced transaction logs.
*
* @return the number of clean up files
*/
void clearUnreferenced();
int clearUnreferenced();
/**
* Sync's the translog.

View File

@ -43,12 +43,7 @@ import org.elasticsearch.index.translog.TranslogStreams;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.*;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
@ -200,8 +195,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
@Override
public void clearUnreferenced() {
public int clearUnreferenced() {
rwl.writeLock().lock();
int deleted = 0;
try {
for (Path location : locations) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
@ -210,6 +206,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
try {
logger.trace("delete unreferenced translog file: " + file);
Files.delete(file);
deleted++;
} catch (Exception ex) {
logger.debug("failed to delete " + file, ex);
}
@ -222,6 +219,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} finally {
rwl.writeLock().unlock();
}
return deleted;
}
@Override

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.engine;
import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -35,7 +34,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.ExceptionsHelper;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
@ -57,7 +56,6 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
@ -84,19 +82,18 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*;
public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public static final String TRANSLOG_PRIMARY_LOCATION = "work/fs-translog/primary";
public static final String TRANSLOG_REPLICA_LOCATION = "work/fs-translog/replica";
protected final ShardId shardId = new ShardId(new Index("index"), 1);
protected ThreadPool threadPool;
@ -107,8 +104,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
protected Translog translog;
protected Translog replicaTranslog;
protected Engine engine;
protected Engine replicaEngine;
protected InternalEngine engine;
protected InternalEngine replicaEngine;
private Settings defaultSettings;
private int indexConcurrency;
@ -118,6 +115,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
// clean up shared directory
IOUtils.rm(Paths.get(TRANSLOG_PRIMARY_LOCATION));
IOUtils.rm(Paths.get(TRANSLOG_REPLICA_LOCATION));
CodecService codecService = new CodecService(shardId.index());
indexConcurrency = randomIntBetween(1, 20);
String name = Codec.getDefault().getName();
@ -142,21 +142,21 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
Lucene.cleanLuceneIndex(storeReplica.directory());
translog = createTranslog();
engine = createEngine(store, translog);
LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)engine).getCurrentIndexWriterConfig();
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
assertEquals(((InternalEngine)engine).config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
if (randomBoolean()) {
((InternalEngine)engine).config().setEnableGcDeletes(false);
engine.config().setEnableGcDeletes(false);
}
replicaTranslog = createTranslogReplica();
replicaEngine = createEngine(storeReplica, replicaTranslog);
currentIndexWriterConfig = ((InternalEngine)replicaEngine).getCurrentIndexWriterConfig();
currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig();
assertEquals(((InternalEngine)replicaEngine).config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
if (randomBoolean()) {
((InternalEngine)engine).config().setEnableGcDeletes(false);
engine.config().setEnableGcDeletes(false);
}
}
@ -193,14 +193,14 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
protected Store createStore() throws IOException {
return createStore(newDirectory());
return createStore(newDirectory());
}
protected Store createStore(final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
@Override
public Directory[] build() throws IOException {
return new Directory[]{ directory };
return new Directory[]{directory};
}
@Override
@ -212,11 +212,11 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
protected Translog createTranslog() throws IOException {
return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/primary"));
return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get(TRANSLOG_PRIMARY_LOCATION));
}
protected Translog createTranslogReplica() throws IOException {
return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/replica"));
return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get(TRANSLOG_REPLICA_LOCATION));
}
protected IndexDeletionPolicy createIndexDeletionPolicy() {
@ -248,7 +248,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
IndexWriterConfig iwc = newIndexWriterConfig();
EngineConfig config = new EngineConfig(shardId, false/*per default optimization for auto generated ids is disabled*/, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
, null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
@ -306,7 +306,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
((InternalEngine)engine).config().setCompoundOnFlush(false);
((InternalEngine) engine).config().setCompoundOnFlush(false);
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
@ -354,7 +354,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
((InternalEngine)engine).config().setCompoundOnFlush(true);
((InternalEngine) engine).config().setCompoundOnFlush(true);
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, false);
engine.create(new Engine.Create(null, newUid("4"), doc4));
engine.refresh("test");
@ -381,11 +381,11 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(2).getDeletedDocs(), equalTo(0));
assertThat(segments.get(2).isCompound(), equalTo(true));
}
public void testVerboseSegments() throws Exception {
List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.refresh("test");
@ -393,7 +393,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
segments = engine.segments(true);
assertThat(segments.size(), equalTo(1));
assertThat(segments.get(0).ramTree, notNullValue());
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.refresh("test");
@ -406,7 +406,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(0).ramTree, notNullValue());
assertThat(segments.get(1).ramTree, notNullValue());
assertThat(segments.get(2).ramTree, notNullValue());
}
@ -416,7 +416,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
@ -438,7 +438,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
for (Segment segment : segments) {
assertThat(segment.getMergeId(), nullValue());
}
index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush();
@ -763,8 +763,13 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
});
// post recovery should flush the translog
MatcherAssert.assertThat(translog.snapshot(), TranslogSizeMatcher.translogSize(0));
// and we should not leak files
assertThat("there are unreferenced translog files left", translog.clearUnreferenced(), equalTo(0));
engine.flush();
assertThat("there are unreferenced translog files left, post flush", translog.clearUnreferenced(), equalTo(0));
engine.close();
}
@ -1351,11 +1356,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
Store store = createStore();
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
Engine engine = new InternalEngine(config(indexSettingsService, store, createTranslog(), createMergeScheduler(indexSettingsService)));
((InternalEngine)engine).config().setEnableGcDeletes(false);
engine.config().setEnableGcDeletes(false);
// Add document
Document document = testDocument();
@ -1441,7 +1444,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
wrapper.setRandomIOExceptionRateOnOpen(randomDouble());
try (Store store = createStore(wrapper)) {
int refCount = store.refCount();
assertTrue("refCount: "+ store.refCount(), store.refCount() > 0);
assertTrue("refCount: " + store.refCount(), store.refCount() > 0);
Translog translog = createTranslog();
InternalEngine holder;
try {
@ -1451,7 +1454,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
continue;
}
holder.config().setFailEngineOnCorruption(true);
assertEquals(store.refCount(), refCount+1);
assertEquals(store.refCount(), refCount + 1);
final int numStarts = scaledRandomIntBetween(1, 5);
for (int j = 0; j < numStarts; j++) {
try {