diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index 703051d8704..90b7956366a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -73,7 +73,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent { void optimize(Optimize optimize) throws EngineException; - void snapshot(SnapshotHandler snapshotHandler) throws EngineException; + T snapshot(SnapshotHandler snapshotHandler) throws EngineException; void recover(RecoveryHandler recoveryHandler) throws EngineException; @@ -105,9 +105,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent { /** */ - static interface SnapshotHandler { + static interface SnapshotHandler { - void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException; + T snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException; } static interface Searcher extends Releasable { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 0967654c2eb..c0f34132c63 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -330,7 +330,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } } - @Override public void snapshot(SnapshotHandler snapshotHandler) throws EngineException { + @Override public T snapshot(SnapshotHandler snapshotHandler) throws EngineException { SnapshotIndexCommit snapshotIndexCommit = null; Translog.Snapshot traslogSnapshot = null; rwl.readLock().lock(); @@ -345,7 +345,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } try { - snapshotHandler.snapshot(snapshotIndexCommit, traslogSnapshot); + return snapshotHandler.snapshot(snapshotIndexCommit, traslogSnapshot); } finally { snapshotIndexCommit.release(); traslogSnapshot.release(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index 037ee40d0de..ce0433c2e7d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -23,10 +23,14 @@ import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.CloseableIndexComponent; +import static org.elasticsearch.util.TimeValue.*; + /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexComponent { @@ -38,7 +42,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo /** * Snapshots the given shard into the gateway. */ - void snapshot(Snapshot snapshot); + SnapshotStatus snapshot(Snapshot snapshot); /** * Returns true if this gateway requires scheduling management for snapshot @@ -109,4 +113,146 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo return lastTranslogSize; } } + + class SnapshotStatus { + + public static SnapshotStatus NA = new SnapshotStatus(timeValueMillis(0), new Index(0, new SizeValue(0), timeValueMillis(0)), new Translog(0, timeValueMillis(0))); + + private TimeValue totalTime; + + private Index index; + + private Translog translog; + + public SnapshotStatus(TimeValue totalTime, Index index, Translog translog) { + this.index = index; + this.translog = translog; + this.totalTime = totalTime; + } + + public TimeValue totalTime() { + return this.totalTime; + } + + public Index index() { + return index; + } + + public Translog translog() { + return translog; + } + + public static class Translog { + private int numberOfOperations; + private TimeValue time; + + public Translog(int numberOfOperations, TimeValue time) { + this.numberOfOperations = numberOfOperations; + this.time = time; + } + + public int numberOfOperations() { + return numberOfOperations; + } + + public TimeValue time() { + return time; + } + } + + public static class Index { + private int numberOfFiles; + private SizeValue totalSize; + private TimeValue time; + + public Index(int numberOfFiles, SizeValue totalSize, TimeValue time) { + this.numberOfFiles = numberOfFiles; + this.totalSize = totalSize; + this.time = time; + } + + public TimeValue time() { + return this.time; + } + + public int numberOfFiles() { + return numberOfFiles; + } + + public SizeValue totalSize() { + return totalSize; + } + } + } + + class RecoveryStatus { + + private Index index; + + private Translog translog; + + public RecoveryStatus(Index index, Translog translog) { + this.index = index; + this.translog = translog; + } + + public Index index() { + return index; + } + + public Translog translog() { + return translog; + } + + public static class Translog { + private long translogId; + private int numberOfOperations; + private SizeValue totalSize; + + public Translog(long translogId, int numberOfOperations, SizeValue totalSize) { + this.translogId = translogId; + this.numberOfOperations = numberOfOperations; + this.totalSize = totalSize; + } + + /** + * The translog id recovered, -1 indicating no translog. + */ + public long translogId() { + return translogId; + } + + public int numberOfOperations() { + return numberOfOperations; + } + + public SizeValue totalSize() { + return totalSize; + } + } + + public static class Index { + private long version; + private int numberOfFiles; + private SizeValue totalSize; + + public Index(long version, int numberOfFiles, SizeValue totalSize) { + this.version = version; + this.numberOfFiles = numberOfFiles; + this.totalSize = totalSize; + } + + public long version() { + return this.version; + } + + public int numberOfFiles() { + return numberOfFiles; + } + + public SizeValue totalSize() { + return totalSize; + } + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 637c107bc64..1c8268ecce3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -44,7 +44,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class IndexShardGatewayService extends AbstractIndexShardComponent implements CloseableIndexComponent { @@ -108,7 +108,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem indexShard.recovering(); logger.debug("Starting recovery from {}", shardGateway); StopWatch stopWatch = new StopWatch().start(); - RecoveryStatus recoveryStatus = shardGateway.recover(); + IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover(); lastIndexVersion = recoveryStatus.index().version(); lastTranslogId = recoveryStatus.translog().translogId(); @@ -121,9 +121,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem stopWatch.stop(); if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); - sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("]\n"); - sb.append(" Index : numberOfFiles [").append(recoveryStatus.index().numberOfFiles()).append("] with totalSize [").append(recoveryStatus.index().totalSize()).append("]\n"); - sb.append(" Translog : translogId [").append(recoveryStatus.translog().translogId()).append(", numberOfOperations [").append(recoveryStatus.translog().numberOfOperations()).append("] with totalSize [").append(recoveryStatus.translog().totalSize()).append("]"); + sb.append("Recovery completed from ").append(shardGateway).append(", took[").append(stopWatch.totalTime()).append("]\n"); + sb.append(" Index : number_of_files[").append(recoveryStatus.index().numberOfFiles()).append("] with total_size[").append(recoveryStatus.index().totalSize()).append("]\n"); + sb.append(" Translog : translog_id[").append(recoveryStatus.translog().translogId()).append("], number_of_operations[").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]"); logger.debug(sb.toString()); } // refresh the shard @@ -147,18 +147,30 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem return; } try { - indexShard.snapshot(new Engine.SnapshotHandler() { - @Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException { + IndexShardGateway.SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler() { + @Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException { if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) { - shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogSize)); + IndexShardGateway.SnapshotStatus snapshotStatus = + shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogSize)); lastIndexVersion = snapshotIndexCommit.getVersion(); lastTranslogId = translogSnapshot.translogId(); lastTranslogSize = translogSnapshot.size(); + return snapshotStatus; } + return IndexShardGateway.SnapshotStatus.NA; } }); + if (snapshotStatus != IndexShardGateway.SnapshotStatus.NA) { + if (logger.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("Snapshot completed to ").append(shardGateway).append(", took[").append(snapshotStatus.totalTime()).append("]\n"); + sb.append(" Index : number_of_files[").append(snapshotStatus.index().numberOfFiles()).append("] with total_size[").append(snapshotStatus.index().totalSize()).append("], took[").append(snapshotStatus.index().time()).append("]\n"); + sb.append(" Translog : number_of_operations[").append(snapshotStatus.translog().numberOfOperations()).append("], took[").append(snapshotStatus.translog().time()).append("]"); + logger.debug(sb.toString()); + } + } } catch (IllegalIndexShardStateException e) { // ignore, that's fine } catch (IndexShardGatewaySnapshotFailedException e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java deleted file mode 100644 index 73e3b14972b..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.gateway; - -import org.elasticsearch.util.SizeValue; - -/** - * @author kimchy (Shay Banon) - */ -public class RecoveryStatus { - - private Index index; - - private Translog translog; - - public RecoveryStatus(Index index, Translog translog) { - this.index = index; - this.translog = translog; - } - - public Index index() { - return index; - } - - public Translog translog() { - return translog; - } - - public static class Translog { - private long translogId; - private int numberOfOperations; - private SizeValue totalSize; - - public Translog(long translogId, int numberOfOperations, SizeValue totalSize) { - this.translogId = translogId; - this.numberOfOperations = numberOfOperations; - this.totalSize = totalSize; - } - - /** - * The translog id recovered, -1 indicating no translog. - */ - public long translogId() { - return translogId; - } - - public int numberOfOperations() { - return numberOfOperations; - } - - public SizeValue totalSize() { - return totalSize; - } - } - - public static class Index { - private long version; - private int numberOfFiles; - private SizeValue totalSize; - - public Index(long version, int numberOfFiles, SizeValue totalSize) { - this.numberOfFiles = numberOfFiles; - this.totalSize = totalSize; - } - - public long version() { - return this.version; - } - - public int numberOfFiles() { - return numberOfFiles; - } - - public SizeValue totalSize() { - return totalSize; - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java index 5b37e4ce80f..e3d08d9bca1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java @@ -26,7 +26,6 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException; -import org.elasticsearch.index.gateway.RecoveryStatus; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -37,6 +36,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.io.stream.DataInputStreamInput; import org.elasticsearch.util.io.stream.DataOutputStreamOutput; import org.elasticsearch.util.io.stream.StreamOutput; @@ -105,14 +105,19 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog); } - @Override public void snapshot(Snapshot snapshot) { + @Override public SnapshotStatus snapshot(Snapshot snapshot) { + long totalTimeStart = System.currentTimeMillis(); boolean indexDirty = false; boolean translogDirty = false; final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit(); final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot(); + int indexNumberOfFiles = 0; + long indexTotalFilesSize = 0; + long indexTime = 0; if (snapshot.indexChanged()) { + long time = System.currentTimeMillis(); indexDirty = true; // snapshot into the index final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length); @@ -144,6 +149,12 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements } } } + indexNumberOfFiles++; + try { + indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName); + } catch (IOException e) { + // ignore... + } threadPool.execute(new Runnable() { @Override public void run() { try { @@ -164,6 +175,7 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements if (lastException.get() != null) { throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get()); } + indexTime = System.currentTimeMillis() - time; } // we reopen the RAF each snapshot and not keep an open one since we want to make sure we // can sync it to disk later on (close it as well) @@ -172,15 +184,20 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements // if we have a different trnaslogId we want to flush the full translog to a new file (based on the translogId). // If we still work on existing translog, just append the latest translog operations + int translogNumberOfOperations = 0; + long translogTime = 0; if (snapshot.newTranslogCreated()) { translogDirty = true; try { + long time = System.currentTimeMillis(); translogRaf = new RandomAccessFile(translogFile, "rw"); StreamOutput out = new DataOutputStreamOutput(translogRaf); out.writeInt(-1); // write the number of operations header with -1 currently for (Translog.Operation operation : translogSnapshot) { + translogNumberOfOperations++; writeTranslogOperation(out, operation); } + translogTime = System.currentTimeMillis() - time; } catch (Exception e) { try { translogRaf.close(); @@ -192,13 +209,16 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements } else if (snapshot.sameTranslogNewOperations()) { translogDirty = true; try { + long time = System.currentTimeMillis(); translogRaf = new RandomAccessFile(translogFile, "rw"); // seek to the end, since we append translogRaf.seek(translogRaf.length()); StreamOutput out = new DataOutputStreamOutput(translogRaf); for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) { + translogNumberOfOperations++; writeTranslogOperation(out, operation); } + translogTime = System.currentTimeMillis() - time; } catch (Exception e) { try { if (translogRaf != null) { @@ -214,8 +234,12 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements // now write the segments file and update the translog header try { if (indexDirty) { + indexNumberOfFiles++; + indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName()); + long time = System.currentTimeMillis(); copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(), new File(locationIndex, snapshotIndexCommit.getSegmentsFileName())); + indexTime += (System.currentTimeMillis() - time); } } catch (Exception e) { try { @@ -269,6 +293,10 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements } } } + + return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart), + new SnapshotStatus.Index(indexNumberOfFiles, new SizeValue(indexTotalFilesSize), new TimeValue(indexTime)), + new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime))); } private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index cc4c4e2d77e..1995c0826b5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.gateway.none; import com.google.inject.Inject; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; -import org.elasticsearch.index.gateway.RecoveryStatus; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -33,7 +32,7 @@ import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.settings.Settings; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class NoneIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { @@ -50,8 +49,8 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES)), new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES))); } - @Override public void snapshot(Snapshot snapshot) { - // nothing to do here + @Override public SnapshotStatus snapshot(Snapshot snapshot) { + return SnapshotStatus.NA; } @Override public boolean requiresSnapshotScheduling() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 6f0621cc994..145865f8194 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -67,7 +67,7 @@ public interface IndexShard extends IndexShardComponent, CloseableComponent { void optimize(Engine.Optimize optimize) throws ElasticSearchException; - void snapshot(Engine.SnapshotHandler snapshotHandler) throws EngineException; + T snapshot(Engine.SnapshotHandler snapshotHandler) throws EngineException; void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 502ffa5c7dd..a3530256073 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -364,9 +364,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I engine.optimize(optimize); } - public void snapshot(Engine.SnapshotHandler snapshotHandler) throws EngineException { + public T snapshot(Engine.SnapshotHandler snapshotHandler) throws EngineException { readAllowed(); - engine.snapshot(snapshotHandler); + return engine.snapshot(snapshotHandler); } public void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException { diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java index 21c95cbbcc1..63e42eb16d0 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java @@ -268,8 +268,8 @@ public abstract class AbstractSimpleEngineTests { final ExecutorService executorService = Executors.newCachedThreadPool(); - engine.snapshot(new Engine.SnapshotHandler() { - @Override public void snapshot(final SnapshotIndexCommit snapshotIndexCommit1, final Translog.Snapshot translogSnapshot1) { + engine.snapshot(new Engine.SnapshotHandler() { + @Override public Void snapshot(final SnapshotIndexCommit snapshotIndexCommit1, final Translog.Snapshot translogSnapshot1) { assertThat(snapshotIndexCommit1, snapshotIndexCommitExists()); assertThat(translogSnapshot1, translogSize(1)); Translog.Create create1 = (Translog.Create) translogSnapshot1.iterator().next(); @@ -294,16 +294,18 @@ public abstract class AbstractSimpleEngineTests { assertThat(snapshotIndexCommit1, snapshotIndexCommitExists()); - engine.snapshot(new Engine.SnapshotHandler() { - @Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit2, Translog.Snapshot translogSnapshot2) throws EngineException { + engine.snapshot(new Engine.SnapshotHandler() { + @Override public Void snapshot(SnapshotIndexCommit snapshotIndexCommit2, Translog.Snapshot translogSnapshot2) throws EngineException { assertThat(snapshotIndexCommit1, snapshotIndexCommitExists()); assertThat(snapshotIndexCommit2, snapshotIndexCommitExists()); assertThat(snapshotIndexCommit2.getSegmentsFileName(), not(equalTo(snapshotIndexCommit1.getSegmentsFileName()))); assertThat(translogSnapshot2, translogSize(1)); Translog.Create create3 = (Translog.Create) translogSnapshot2.iterator().next(); assertThat(create3.source(), equalTo(B_3)); + return null; } }); + return null; } });