add snapshot logging information

This commit is contained in:
kimchy 2010-04-08 19:28:41 +03:00
parent 9244a46938
commit b0494a8415
10 changed files with 215 additions and 123 deletions

View File

@ -73,7 +73,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
void optimize(Optimize optimize) throws EngineException;
void snapshot(SnapshotHandler snapshotHandler) throws EngineException;
<T> T snapshot(SnapshotHandler<T> snapshotHandler) throws EngineException;
void recover(RecoveryHandler recoveryHandler) throws EngineException;
@ -105,9 +105,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
/**
*/
static interface SnapshotHandler {
static interface SnapshotHandler<T> {
void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException;
T snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException;
}
static interface Searcher extends Releasable {

View File

@ -330,7 +330,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
}
@Override public void snapshot(SnapshotHandler snapshotHandler) throws EngineException {
@Override public <T> T snapshot(SnapshotHandler<T> snapshotHandler) throws EngineException {
SnapshotIndexCommit snapshotIndexCommit = null;
Translog.Snapshot traslogSnapshot = null;
rwl.readLock().lock();
@ -345,7 +345,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
try {
snapshotHandler.snapshot(snapshotIndexCommit, traslogSnapshot);
return snapshotHandler.snapshot(snapshotIndexCommit, traslogSnapshot);
} finally {
snapshotIndexCommit.release();
traslogSnapshot.release();

View File

@ -23,10 +23,14 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.CloseableIndexComponent;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public interface IndexShardGateway extends IndexShardComponent, CloseableIndexComponent {
@ -38,7 +42,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
/**
* Snapshots the given shard into the gateway.
*/
void snapshot(Snapshot snapshot);
SnapshotStatus snapshot(Snapshot snapshot);
/**
* Returns <tt>true</tt> if this gateway requires scheduling management for snapshot
@ -109,4 +113,146 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
return lastTranslogSize;
}
}
class SnapshotStatus {
public static SnapshotStatus NA = new SnapshotStatus(timeValueMillis(0), new Index(0, new SizeValue(0), timeValueMillis(0)), new Translog(0, timeValueMillis(0)));
private TimeValue totalTime;
private Index index;
private Translog translog;
public SnapshotStatus(TimeValue totalTime, Index index, Translog translog) {
this.index = index;
this.translog = translog;
this.totalTime = totalTime;
}
public TimeValue totalTime() {
return this.totalTime;
}
public Index index() {
return index;
}
public Translog translog() {
return translog;
}
public static class Translog {
private int numberOfOperations;
private TimeValue time;
public Translog(int numberOfOperations, TimeValue time) {
this.numberOfOperations = numberOfOperations;
this.time = time;
}
public int numberOfOperations() {
return numberOfOperations;
}
public TimeValue time() {
return time;
}
}
public static class Index {
private int numberOfFiles;
private SizeValue totalSize;
private TimeValue time;
public Index(int numberOfFiles, SizeValue totalSize, TimeValue time) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.time = time;
}
public TimeValue time() {
return this.time;
}
public int numberOfFiles() {
return numberOfFiles;
}
public SizeValue totalSize() {
return totalSize;
}
}
}
class RecoveryStatus {
private Index index;
private Translog translog;
public RecoveryStatus(Index index, Translog translog) {
this.index = index;
this.translog = translog;
}
public Index index() {
return index;
}
public Translog translog() {
return translog;
}
public static class Translog {
private long translogId;
private int numberOfOperations;
private SizeValue totalSize;
public Translog(long translogId, int numberOfOperations, SizeValue totalSize) {
this.translogId = translogId;
this.numberOfOperations = numberOfOperations;
this.totalSize = totalSize;
}
/**
* The translog id recovered, <tt>-1</tt> indicating no translog.
*/
public long translogId() {
return translogId;
}
public int numberOfOperations() {
return numberOfOperations;
}
public SizeValue totalSize() {
return totalSize;
}
}
public static class Index {
private long version;
private int numberOfFiles;
private SizeValue totalSize;
public Index(long version, int numberOfFiles, SizeValue totalSize) {
this.version = version;
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
}
public long version() {
return this.version;
}
public int numberOfFiles() {
return numberOfFiles;
}
public SizeValue totalSize() {
return totalSize;
}
}
}
}

View File

@ -44,7 +44,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class IndexShardGatewayService extends AbstractIndexShardComponent implements CloseableIndexComponent {
@ -108,7 +108,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
indexShard.recovering();
logger.debug("Starting recovery from {}", shardGateway);
StopWatch stopWatch = new StopWatch().start();
RecoveryStatus recoveryStatus = shardGateway.recover();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = recoveryStatus.translog().translogId();
@ -121,9 +121,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("]\n");
sb.append(" Index : numberOfFiles [").append(recoveryStatus.index().numberOfFiles()).append("] with totalSize [").append(recoveryStatus.index().totalSize()).append("]\n");
sb.append(" Translog : translogId [").append(recoveryStatus.translog().translogId()).append(", numberOfOperations [").append(recoveryStatus.translog().numberOfOperations()).append("] with totalSize [").append(recoveryStatus.translog().totalSize()).append("]");
sb.append("Recovery completed from ").append(shardGateway).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append(" Index : number_of_files[").append(recoveryStatus.index().numberOfFiles()).append("] with total_size[").append(recoveryStatus.index().totalSize()).append("]\n");
sb.append(" Translog : translog_id[").append(recoveryStatus.translog().translogId()).append("], number_of_operations[").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
@ -147,18 +147,30 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
return;
}
try {
indexShard.snapshot(new Engine.SnapshotHandler() {
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
IndexShardGateway.SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler<IndexShardGateway.SnapshotStatus>() {
@Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) {
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogSize));
IndexShardGateway.SnapshotStatus snapshotStatus =
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogSize));
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogSize = translogSnapshot.size();
return snapshotStatus;
}
return IndexShardGateway.SnapshotStatus.NA;
}
});
if (snapshotStatus != IndexShardGateway.SnapshotStatus.NA) {
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Snapshot completed to ").append(shardGateway).append(", took[").append(snapshotStatus.totalTime()).append("]\n");
sb.append(" Index : number_of_files[").append(snapshotStatus.index().numberOfFiles()).append("] with total_size[").append(snapshotStatus.index().totalSize()).append("], took[").append(snapshotStatus.index().time()).append("]\n");
sb.append(" Translog : number_of_operations[").append(snapshotStatus.translog().numberOfOperations()).append("], took[").append(snapshotStatus.translog().time()).append("]");
logger.debug(sb.toString());
}
}
} catch (IllegalIndexShardStateException e) {
// ignore, that's fine
} catch (IndexShardGatewaySnapshotFailedException e) {

View File

@ -1,95 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway;
import org.elasticsearch.util.SizeValue;
/**
* @author kimchy (Shay Banon)
*/
public class RecoveryStatus {
private Index index;
private Translog translog;
public RecoveryStatus(Index index, Translog translog) {
this.index = index;
this.translog = translog;
}
public Index index() {
return index;
}
public Translog translog() {
return translog;
}
public static class Translog {
private long translogId;
private int numberOfOperations;
private SizeValue totalSize;
public Translog(long translogId, int numberOfOperations, SizeValue totalSize) {
this.translogId = translogId;
this.numberOfOperations = numberOfOperations;
this.totalSize = totalSize;
}
/**
* The translog id recovered, <tt>-1</tt> indicating no translog.
*/
public long translogId() {
return translogId;
}
public int numberOfOperations() {
return numberOfOperations;
}
public SizeValue totalSize() {
return totalSize;
}
}
public static class Index {
private long version;
private int numberOfFiles;
private SizeValue totalSize;
public Index(long version, int numberOfFiles, SizeValue totalSize) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
}
public long version() {
return this.version;
}
public int numberOfFiles() {
return numberOfFiles;
}
public SizeValue totalSize() {
return totalSize;
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -37,6 +36,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.io.stream.DataInputStreamInput;
import org.elasticsearch.util.io.stream.DataOutputStreamOutput;
import org.elasticsearch.util.io.stream.StreamOutput;
@ -105,14 +105,19 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
}
@Override public void snapshot(Snapshot snapshot) {
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
long totalTimeStart = System.currentTimeMillis();
boolean indexDirty = false;
boolean translogDirty = false;
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
long indexTime = 0;
if (snapshot.indexChanged()) {
long time = System.currentTimeMillis();
indexDirty = true;
// snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
@ -144,6 +149,12 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
}
}
}
indexNumberOfFiles++;
try {
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName);
} catch (IOException e) {
// ignore...
}
threadPool.execute(new Runnable() {
@Override public void run() {
try {
@ -164,6 +175,7 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
if (lastException.get() != null) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
}
indexTime = System.currentTimeMillis() - time;
}
// we reopen the RAF each snapshot and not keep an open one since we want to make sure we
// can sync it to disk later on (close it as well)
@ -172,15 +184,20 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
// if we have a different trnaslogId we want to flush the full translog to a new file (based on the translogId).
// If we still work on existing translog, just append the latest translog operations
int translogNumberOfOperations = 0;
long translogTime = 0;
if (snapshot.newTranslogCreated()) {
translogDirty = true;
try {
long time = System.currentTimeMillis();
translogRaf = new RandomAccessFile(translogFile, "rw");
StreamOutput out = new DataOutputStreamOutput(translogRaf);
out.writeInt(-1); // write the number of operations header with -1 currently
for (Translog.Operation operation : translogSnapshot) {
translogNumberOfOperations++;
writeTranslogOperation(out, operation);
}
translogTime = System.currentTimeMillis() - time;
} catch (Exception e) {
try {
translogRaf.close();
@ -192,13 +209,16 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
} else if (snapshot.sameTranslogNewOperations()) {
translogDirty = true;
try {
long time = System.currentTimeMillis();
translogRaf = new RandomAccessFile(translogFile, "rw");
// seek to the end, since we append
translogRaf.seek(translogRaf.length());
StreamOutput out = new DataOutputStreamOutput(translogRaf);
for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
translogNumberOfOperations++;
writeTranslogOperation(out, operation);
}
translogTime = System.currentTimeMillis() - time;
} catch (Exception e) {
try {
if (translogRaf != null) {
@ -214,8 +234,12 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
// now write the segments file and update the translog header
try {
if (indexDirty) {
indexNumberOfFiles++;
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
long time = System.currentTimeMillis();
copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(),
new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()));
indexTime += (System.currentTimeMillis() - time);
}
} catch (Exception e) {
try {
@ -269,6 +293,10 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
}
}
}
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
new SnapshotStatus.Index(indexNumberOfFiles, new SizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime)));
}
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.gateway.none;
import com.google.inject.Inject;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -33,7 +32,7 @@ import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class NoneIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
@ -50,8 +49,8 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES)), new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES)));
}
@Override public void snapshot(Snapshot snapshot) {
// nothing to do here
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
return SnapshotStatus.NA;
}
@Override public boolean requiresSnapshotScheduling() {

View File

@ -67,7 +67,7 @@ public interface IndexShard extends IndexShardComponent, CloseableComponent {
void optimize(Engine.Optimize optimize) throws ElasticSearchException;
void snapshot(Engine.SnapshotHandler snapshotHandler) throws EngineException;
<T> T snapshot(Engine.SnapshotHandler<T> snapshotHandler) throws EngineException;
void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException;

View File

@ -364,9 +364,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.optimize(optimize);
}
public void snapshot(Engine.SnapshotHandler snapshotHandler) throws EngineException {
public <T> T snapshot(Engine.SnapshotHandler<T> snapshotHandler) throws EngineException {
readAllowed();
engine.snapshot(snapshotHandler);
return engine.snapshot(snapshotHandler);
}
public void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException {

View File

@ -268,8 +268,8 @@ public abstract class AbstractSimpleEngineTests {
final ExecutorService executorService = Executors.newCachedThreadPool();
engine.snapshot(new Engine.SnapshotHandler() {
@Override public void snapshot(final SnapshotIndexCommit snapshotIndexCommit1, final Translog.Snapshot translogSnapshot1) {
engine.snapshot(new Engine.SnapshotHandler<Void>() {
@Override public Void snapshot(final SnapshotIndexCommit snapshotIndexCommit1, final Translog.Snapshot translogSnapshot1) {
assertThat(snapshotIndexCommit1, snapshotIndexCommitExists());
assertThat(translogSnapshot1, translogSize(1));
Translog.Create create1 = (Translog.Create) translogSnapshot1.iterator().next();
@ -294,16 +294,18 @@ public abstract class AbstractSimpleEngineTests {
assertThat(snapshotIndexCommit1, snapshotIndexCommitExists());
engine.snapshot(new Engine.SnapshotHandler() {
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit2, Translog.Snapshot translogSnapshot2) throws EngineException {
engine.snapshot(new Engine.SnapshotHandler<Void>() {
@Override public Void snapshot(SnapshotIndexCommit snapshotIndexCommit2, Translog.Snapshot translogSnapshot2) throws EngineException {
assertThat(snapshotIndexCommit1, snapshotIndexCommitExists());
assertThat(snapshotIndexCommit2, snapshotIndexCommitExists());
assertThat(snapshotIndexCommit2.getSegmentsFileName(), not(equalTo(snapshotIndexCommit1.getSegmentsFileName())));
assertThat(translogSnapshot2, translogSize(1));
Translog.Create create3 = (Translog.Create) translogSnapshot2.iterator().next();
assertThat(create3.source(), equalTo(B_3));
return null;
}
});
return null;
}
});