diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 598062eb320..4c6e27352b8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -26,8 +26,8 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.bloom.BloomFilter; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lucene.IndexWriters; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.ReaderSearcherHolder; import org.elasticsearch.common.lucene.uid.UidField; @@ -62,6 +62,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -122,6 +123,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private volatile boolean possibleMergeNeeded = false; + // we use flushNeeded here, since if there are no changes, then the commit won't write + // will not really happen, and then the commitUserData and the new translog will not be reflected + private volatile boolean flushNeeded = false; + private volatile int disableFlushCounter = 0; // indexing searcher is initialized @@ -141,6 +146,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final Object failedEngineMutex = new Object(); private final CopyOnWriteArrayList failedEngineListeners = new CopyOnWriteArrayList(); + private final AtomicLong translogIdGenerator = new AtomicLong(); + @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, @@ -227,11 +234,23 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } try { - translog.newTranslog(newTransactionLogId()); + if (IndexReader.indexExists(store.directory())) { + Map commitUserData = IndexReader.getCommitUserData(store.directory()); + if (commitUserData.containsKey(Translog.TRANSLOG_ID_KEY)) { + translogIdGenerator.set(Long.parseLong(commitUserData.get(Translog.TRANSLOG_ID_KEY))); + } else { + translogIdGenerator.set(System.currentTimeMillis()); + indexWriter.commit(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())).map()); + } + } else { + translogIdGenerator.set(System.currentTimeMillis()); + indexWriter.commit(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get())).map()); + } + translog.newTranslog(translogIdGenerator.get()); this.nrtResource = buildNrtResource(indexWriter); if (indexingSearcher.get() != null) { - indexingSearcher.get().release(); indexingSearcher.set(null); + indexingSearcher.get().release(); } } catch (IOException e) { try { @@ -266,6 +285,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { innerCreate(create, writer); dirty = true; possibleMergeNeeded = true; + flushNeeded = true; } catch (IOException e) { throw new CreateFailedEngineException(shardId, create, e); } catch (OutOfMemoryError e) { @@ -374,6 +394,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { innerIndex(index, writer); dirty = true; possibleMergeNeeded = true; + flushNeeded = true; } catch (IOException e) { throw new IndexFailedEngineException(shardId, index, e); } catch (OutOfMemoryError e) { @@ -475,6 +496,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { innerDelete(delete, writer); dirty = true; possibleMergeNeeded = true; + flushNeeded = true; } catch (IOException e) { throw new DeleteFailedEngineException(shardId, delete, e); } catch (OutOfMemoryError e) { @@ -572,6 +594,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { translog.add(new Translog.DeleteByQuery(delete)); dirty = true; possibleMergeNeeded = true; + flushNeeded = true; } catch (IOException e) { throw new DeleteByQueryFailedEngineException(shardId, delete, e); } finally { @@ -681,10 +704,17 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // to be allocated to a different node indexWriter.close(); indexWriter = createWriter(); + + if (flushNeeded) { + flushNeeded = false; + long translogId = translogIdGenerator.incrementAndGet(); + indexWriter.commit(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); + translog.newTranslog(translogId); + } + AcquirableResource current = nrtResource; nrtResource = buildNrtResource(indexWriter); current.markForClose(); - translog.newTranslog(newTransactionLogId()); } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); } catch (OutOfMemoryError e) { @@ -692,14 +722,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { throw new FlushFailedEngineException(shardId, e); } } else { - try { - indexWriter.commit(); - translog.newTranslog(newTransactionLogId()); - } catch (Exception e) { - throw new FlushFailedEngineException(shardId, e); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new FlushFailedEngineException(shardId, e); + if (flushNeeded) { + flushNeeded = false; + try { + long translogId = translogIdGenerator.incrementAndGet(); + indexWriter.commit(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); + translog.newTranslog(translogId); + } catch (Exception e) { + throw new FlushFailedEngineException(shardId, e); + } catch (OutOfMemoryError e) { + failEngine(e); + throw new FlushFailedEngineException(shardId, e); + } } } } finally { @@ -1013,12 +1047,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { config.setMaxThreadStates(indexConcurrency); indexWriter = new IndexWriter(store.directory(), config); - - // we commit here on a fresh index since we want to have a commit point to support snapshotting - if (create) { - indexWriter.commit(); - } - } catch (IOException e) { safeClose(indexWriter); throw e; @@ -1070,14 +1098,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { return newAcquirableResource(new ReaderSearcherHolder(indexSearcher)); } - private long newTransactionLogId() throws IOException { - try { - return IndexWriters.rollbackSegmentInfos(indexWriter).getVersion(); - } catch (Exception e) { - return IndexReader.getCurrentVersion(store.directory()); - } - } - private static class RobinSearchResult implements Searcher { private final AcquirableResource nrtHolder; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 3dfecd7464e..a36997eea8e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -43,6 +43,7 @@ import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ScheduledFuture; /** @@ -84,9 +85,16 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen @Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException { recoveryStatus.index().startTime(System.currentTimeMillis()); long version = -1; + long translogId = -1; try { if (IndexReader.indexExists(indexShard.store().directory())) { version = IndexReader.getCurrentVersion(indexShard.store().directory()); + Map commitUserData = IndexReader.getCommitUserData(indexShard.store().directory()); + if (commitUserData.containsKey(Translog.TRANSLOG_ID_KEY)) { + translogId = Long.parseLong(commitUserData.get(Translog.TRANSLOG_ID_KEY)); + } else { + translogId = version; + } } } catch (IOException e) { throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); @@ -108,7 +116,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen } recoveryStatus.translog().startTime(System.currentTimeMillis()); - if (version == -1) { + if (translogId == -1) { // no translog files, bail indexShard.start("post recovery from gateway, no translog"); // no index, just start the shard and bail @@ -118,9 +126,9 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen // move an existing translog, if exists, to "recovering" state, and start reading from it FsTranslog translog = (FsTranslog) indexShard.translog(); - File recoveringTranslogFile = new File(translog.location(), "translog-" + version + ".recovering"); + File recoveringTranslogFile = new File(translog.location(), "translog-" + translogId + ".recovering"); if (!recoveringTranslogFile.exists()) { - File translogFile = new File(translog.location(), "translog-" + version); + File translogFile = new File(translog.location(), "translog-" + translogId); if (translogFile.exists()) { for (int i = 0; i < 3; i++) { if (translogFile.renameTo(recoveringTranslogFile)) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 3cb5471a08b..ff486fc474d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -40,6 +40,8 @@ import java.io.InputStream; @ThreadSafe public interface Translog extends IndexShardComponent { + public static final String TRANSLOG_ID_KEY = "translog_id"; + /** * Returns the id of the current transaction log. */