store the translog id in the commit point data of a lucene commit point. work done towards better concurrency on flush operation

This commit is contained in:
kimchy 2011-05-19 18:04:22 +03:00
parent 0d63fd68a8
commit 1911368feb
3 changed files with 59 additions and 29 deletions

View File

@ -26,8 +26,8 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.bloom.BloomFilter;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.IndexWriters;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.ReaderSearcherHolder;
import org.elasticsearch.common.lucene.uid.UidField;
@ -62,6 +62,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -122,6 +123,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private volatile boolean possibleMergeNeeded = false;
// we use flushNeeded here, since if there are no changes, then the commit won't write
// will not really happen, and then the commitUserData and the new translog will not be reflected
private volatile boolean flushNeeded = false;
private volatile int disableFlushCounter = 0;
// indexing searcher is initialized
@ -141,6 +146,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final Object failedEngineMutex = new Object();
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<FailedEngineListener>();
private final AtomicLong translogIdGenerator = new AtomicLong();
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
@ -227,11 +234,23 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
try {
translog.newTranslog(newTransactionLogId());
if (IndexReader.indexExists(store.directory())) {
Map<String, String> commitUserData = IndexReader.getCommitUserData(store.directory());
if (commitUserData.containsKey(Translog.TRANSLOG_ID_KEY)) {
translogIdGenerator.set(Long.parseLong(commitUserData.get(Translog.TRANSLOG_ID_KEY)));
} else {
translogIdGenerator.set(System.currentTimeMillis());
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())).map());
}
} else {
translogIdGenerator.set(System.currentTimeMillis());
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())).map());
}
translog.newTranslog(translogIdGenerator.get());
this.nrtResource = buildNrtResource(indexWriter);
if (indexingSearcher.get() != null) {
indexingSearcher.get().release();
indexingSearcher.set(null);
indexingSearcher.get().release();
}
} catch (IOException e) {
try {
@ -266,6 +285,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
innerCreate(create, writer);
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (IOException e) {
throw new CreateFailedEngineException(shardId, create, e);
} catch (OutOfMemoryError e) {
@ -374,6 +394,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
innerIndex(index, writer);
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (IOException e) {
throw new IndexFailedEngineException(shardId, index, e);
} catch (OutOfMemoryError e) {
@ -475,6 +496,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
innerDelete(delete, writer);
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (IOException e) {
throw new DeleteFailedEngineException(shardId, delete, e);
} catch (OutOfMemoryError e) {
@ -572,6 +594,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
translog.add(new Translog.DeleteByQuery(delete));
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (IOException e) {
throw new DeleteByQueryFailedEngineException(shardId, delete, e);
} finally {
@ -681,10 +704,17 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// to be allocated to a different node
indexWriter.close();
indexWriter = createWriter();
if (flushNeeded) {
flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
translog.newTranslog(translogId);
}
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
nrtResource = buildNrtResource(indexWriter);
current.markForClose();
translog.newTranslog(newTransactionLogId());
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
@ -692,14 +722,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new FlushFailedEngineException(shardId, e);
}
} else {
try {
indexWriter.commit();
translog.newTranslog(newTransactionLogId());
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
if (flushNeeded) {
flushNeeded = false;
try {
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
translog.newTranslog(translogId);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
}
}
}
} finally {
@ -1013,12 +1047,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
config.setMaxThreadStates(indexConcurrency);
indexWriter = new IndexWriter(store.directory(), config);
// we commit here on a fresh index since we want to have a commit point to support snapshotting
if (create) {
indexWriter.commit();
}
} catch (IOException e) {
safeClose(indexWriter);
throw e;
@ -1070,14 +1098,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
return newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
}
private long newTransactionLogId() throws IOException {
try {
return IndexWriters.rollbackSegmentInfos(indexWriter).getVersion();
} catch (Exception e) {
return IndexReader.getCurrentVersion(store.directory());
}
}
private static class RobinSearchResult implements Searcher {
private final AcquirableResource<ReaderSearcherHolder> nrtHolder;

View File

@ -43,6 +43,7 @@ import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
@ -84,9 +85,16 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus.index().startTime(System.currentTimeMillis());
long version = -1;
long translogId = -1;
try {
if (IndexReader.indexExists(indexShard.store().directory())) {
version = IndexReader.getCurrentVersion(indexShard.store().directory());
Map<String, String> commitUserData = IndexReader.getCommitUserData(indexShard.store().directory());
if (commitUserData.containsKey(Translog.TRANSLOG_ID_KEY)) {
translogId = Long.parseLong(commitUserData.get(Translog.TRANSLOG_ID_KEY));
} else {
translogId = version;
}
}
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
@ -108,7 +116,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
}
recoveryStatus.translog().startTime(System.currentTimeMillis());
if (version == -1) {
if (translogId == -1) {
// no translog files, bail
indexShard.start("post recovery from gateway, no translog");
// no index, just start the shard and bail
@ -118,9 +126,9 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
// move an existing translog, if exists, to "recovering" state, and start reading from it
FsTranslog translog = (FsTranslog) indexShard.translog();
File recoveringTranslogFile = new File(translog.location(), "translog-" + version + ".recovering");
File recoveringTranslogFile = new File(translog.location(), "translog-" + translogId + ".recovering");
if (!recoveringTranslogFile.exists()) {
File translogFile = new File(translog.location(), "translog-" + version);
File translogFile = new File(translog.location(), "translog-" + translogId);
if (translogFile.exists()) {
for (int i = 0; i < 3; i++) {
if (translogFile.renameTo(recoveringTranslogFile)) {

View File

@ -40,6 +40,8 @@ import java.io.InputStream;
@ThreadSafe
public interface Translog extends IndexShardComponent {
public static final String TRANSLOG_ID_KEY = "translog_id";
/**
* Returns the id of the current transaction log.
*/