more work on reuse work dir, improved transaction log to move to stream based

This commit is contained in:
kimchy 2010-07-06 00:12:40 +03:00
parent b078c9206a
commit d4f86899e3
17 changed files with 361 additions and 238 deletions

View File

@ -58,14 +58,14 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
private final long lastIndexVersion;
private final long lastTranslogId;
private final int lastTranslogSize;
private final long lastTranslogLength;
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, int lastTranslogSize) {
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogLength) {
this.indexCommit = indexCommit;
this.translogSnapshot = translogSnapshot;
this.lastIndexVersion = lastIndexVersion;
this.lastTranslogId = lastTranslogId;
this.lastTranslogSize = lastTranslogSize;
this.lastTranslogLength = lastTranslogLength;
}
/**
@ -92,7 +92,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
if (newTranslogCreated()) {
throw new ElasticSearchIllegalStateException("Should not be called when there is a new translog");
}
return translogSnapshot.size() > lastTranslogSize;
return translogSnapshot.length() > lastTranslogLength;
}
public SnapshotIndexCommit indexCommit() {
@ -111,8 +111,8 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
return lastTranslogId;
}
public int lastTranslogSize() {
return lastTranslogSize;
public long lastTranslogPosition() {
return lastTranslogLength;
}
}
@ -210,9 +210,11 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
private long translogId;
private int numberOfOperations;
private ByteSizeValue totalSize;
private long translogLength;
public Translog(long translogId, int numberOfOperations, ByteSizeValue totalSize) {
public Translog(long translogId, long translogLength, int numberOfOperations, ByteSizeValue totalSize) {
this.translogId = translogId;
this.translogLength = translogLength;
this.numberOfOperations = numberOfOperations;
this.totalSize = totalSize;
}
@ -224,6 +226,10 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
return translogId;
}
public long translogLength() {
return translogLength;
}
public int numberOfOperations() {
return numberOfOperations;
}

View File

@ -29,10 +29,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
@ -65,7 +62,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private volatile long lastTranslogId = -1;
private volatile int lastTranslogSize;
private volatile long lastTranslogLength;
private final AtomicBoolean recovered = new AtomicBoolean();
@ -133,7 +130,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = recoveryStatus.translog().translogId();
lastTranslogSize = recoveryStatus.translog().numberOfOperations();
lastTranslogLength = recoveryStatus.translog().translogLength();
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
@ -151,6 +148,18 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));
scheduleSnapshotIfNeeded();
} catch (IndexShardGatewayRecoveryException e) {
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
throw new IgnoreGatewayRecoveryException(shardId, "shard closed");
}
throw e;
} catch (IndexShardClosedException e) {
// got closed on us, just ignore this recovery
throw new IgnoreGatewayRecoveryException(shardId, "shard closed");
} catch (IndexShardNotStartedException e) {
// got closed on us, just ignore this recovery
throw new IgnoreGatewayRecoveryException(shardId, "shard closed");
} finally {
recoveryThrottler.recoveryDone(shardId, "gateway");
}
@ -171,14 +180,14 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
try {
IndexShardGateway.SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler<IndexShardGateway.SnapshotStatus>() {
@Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogLength != translogSnapshot.length()) {
IndexShardGateway.SnapshotStatus snapshotStatus =
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogSize));
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogLength));
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogSize = translogSnapshot.size();
lastTranslogLength = translogSnapshot.length();
return snapshotStatus;
}
return IndexShardGateway.SnapshotStatus.NA;

View File

@ -29,10 +29,11 @@ import org.elasticsearch.common.Hex;
import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.io.FastByteArrayInputStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
@ -55,12 +56,14 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
@ -73,8 +76,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.translog.TranslogStreams.*;
/**
* @author kimchy (shay.banon)
*/
@ -101,6 +102,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final ConcurrentMap<String, String> cachedMd5 = ConcurrentCollections.newConcurrentMap();
private volatile SoftReference<FastByteArrayOutputStream> cachedBos = new SoftReference<FastByteArrayOutputStream>(new FastByteArrayOutputStream());
private volatile AppendableBlobContainer.AppendableBlob translogBlob;
protected BlobStoreIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway,
@ -245,20 +248,25 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
translogBlob.append(new AppendableBlobContainer.AppendBlobListener() {
@Override public void withStream(StreamOutput os) throws IOException {
int deltaNumberOfOperations;
Iterable<Translog.Operation> operationsIt;
if (snapshot.newTranslogCreated()) {
deltaNumberOfOperations = translogSnapshot.size();
operationsIt = translogSnapshot;
} else {
deltaNumberOfOperations = translogSnapshot.size() - snapshot.lastTranslogSize();
operationsIt = translogSnapshot.skipTo(snapshot.lastTranslogSize());
if (!snapshot.newTranslogCreated()) {
translogSnapshot.seekForward(snapshot.lastTranslogPosition());
}
os.writeInt(deltaNumberOfOperations);
for (Translog.Operation operation : operationsIt) {
writeTranslogOperation(os, operation);
FastByteArrayOutputStream bos = cachedBos.get();
if (bos == null) {
bos = new FastByteArrayOutputStream();
cachedBos = new SoftReference<FastByteArrayOutputStream>(bos);
}
translogNumberOfOperations.set(deltaNumberOfOperations);
int totalNumberOfOperations = 0;
OutputStreamStreamOutput bosOs = new OutputStreamStreamOutput(bos);
while (translogSnapshot.hasNext()) {
bos.reset();
TranslogStreams.writeTranslogOperation(bosOs, translogSnapshot.next());
bosOs.flush();
os.writeVInt(bos.size());
os.writeBytes(bos.unsafeByteArray(), bos.size());
totalNumberOfOperations++;
}
translogNumberOfOperations.set(totalNumberOfOperations);
}
@Override public void onCompleted() {
@ -360,54 +368,105 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
translogId = IndexReader.getCurrentVersion(store.directory());
} catch (FileNotFoundException e) {
// no index, that fine
return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0));
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0));
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog, can't read current index version", e);
}
if (!translogContainer.blobExists("translog-" + translogId)) {
return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0));
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0));
}
try {
indexShard.performRecoveryPrepareForTranslog();
int totalOperations = 0;
byte[] translogData = translogContainer.readBlobFully("translog-" + translogId);
BytesStreamInput si = new BytesStreamInput(translogData);
while (true) {
// we recover them in parts, each part container the number of operations, and then the list of them
try {
int numberOfOperations = si.readInt();
ArrayList<Translog.Operation> operations = Lists.newArrayList();
for (int i = 0; i < numberOfOperations; i++) {
operations.add(readTranslogOperation(si));
totalOperations++;
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalOperations = new AtomicInteger();
final AtomicLong totalSize = new AtomicLong();
translogContainer.readBlob("translog-" + translogId, new BlobContainer.ReadBlobListener() {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
boolean ignore = false;
@Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
if (ignore) {
return;
}
indexShard.performRecoveryOperations(operations);
if (si.position() == translogData.length) {
// we have reached the end of the stream, bail
break;
bos.write(data, offset, size);
BytesStreamInput si = new BytesStreamInput(bos.unsafeByteArray(), 0, bos.size());
int position;
while (true) {
try {
position = si.position();
int opSize = si.readVInt();
int curPos = si.position();
if ((si.position() + opSize) > bos.size()) {
break;
}
Translog.Operation operation = TranslogStreams.readTranslogOperation(si);
if ((si.position() - curPos) != opSize) {
logger.warn("mismatch in size, expected [{}], got [{}]", opSize, si.position() - curPos);
}
totalOperations.incrementAndGet();
indexShard.performRecoveryOperation(operation);
if (si.position() >= bos.size()) {
position = si.position();
break;
}
} catch (Exception e) {
logger.warn("failed to retrieve translog after [{}] operations, ignoring the rest, considered corrupted", e, totalOperations.get());
ignore = true;
latch.countDown();
return;
}
}
} catch (IOException e) {
if (si.position() >= translogData.length) {
// we have reached the end of hte stream, we wrote partial operations, ignore the exception...
} else {
throw e;
totalSize.addAndGet(position);
FastByteArrayOutputStream newBos = new FastByteArrayOutputStream();
int leftOver = bos.size() - position;
if (leftOver > 0) {
newBos.write(bos.unsafeByteArray(), position, leftOver);
}
bos = newBos;
}
@Override public synchronized void onCompleted() {
latch.countDown();
}
@Override public synchronized void onFailure(Throwable t) {
failure.set(t);
latch.countDown();
}
});
latch.await();
if (failure.get() != null) {
throw failure.get();
}
indexShard.performRecoveryFinalization();
// only if we can append to an existing translog we should use the current id and continue to append to it
long lastTranslogId = indexShard.translog().currentId();
Translog.Snapshot translogSnapshot = indexShard.translog().snapshot();
long lastTranslogLength = translogSnapshot.length();
translogSnapshot.release();
if (!translogContainer.canAppendToExistingBlob()) {
// flush the index, so we generate a new translog based on a new index version
indexShard.flush(new Engine.Flush());
lastTranslogId = -1;
lastTranslogLength = 0;
}
return new RecoveryStatus.Translog(lastTranslogId, totalOperations, new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog, can't read current index version", e);
return new RecoveryStatus.Translog(lastTranslogId, lastTranslogLength, totalOperations.get(), new ByteSizeValue(totalSize.get(), ByteSizeUnit.BYTES));
} catch (Throwable e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", e);
}
}

View File

@ -32,6 +32,8 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
@ -46,8 +48,14 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
// in the none case, we simply start the shard
// clean the store, there should be nothing there...
try {
indexShard.store().deleteContent();
} catch (IOException e) {
logger.warn("failed to clean store before starting shard", e);
}
indexShard.start();
return new RecoveryStatus(RecoveryStatus.Index.EMPTY, new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)));
return new RecoveryStatus(RecoveryStatus.Index.EMPTY, new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)));
}
@Override public String type() {

View File

@ -244,16 +244,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
// if there is no gateway, clean the store, since we won't recover into it
if (indexGateway.type().equals(NoneGateway.TYPE)) {
Store store = shardInjector.getInstance(Store.class);
try {
store.deleteContent();
} catch (IOException e) {
logger.warn("failed to clean store on shard creation", e);
}
}
indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();

View File

@ -202,12 +202,12 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
throttlingWaitTime.stop();
logger.debug("starting recovery from {}", targetNode);
try {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed");
}
logger.debug("starting recovery from {}", targetNode);
// build a list of the current files located locally, maybe we don't need to recover them...
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(node, markAsRelocated, store.listWithMd5());
@ -253,6 +253,7 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
if (closed) {
throw new IgnoreRecoveryException("Recovery closed", e);
}
logger.trace("recovery from [{}] failed", e, targetNode);
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ActionNotFoundTransportException || cause instanceof IndexShardNotStartedException) {
// the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering
@ -367,11 +368,16 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
existingTotalSize += md.sizeInBytes();
useExisting = true;
if (logger.isTraceEnabled()) {
logger.trace("not recovering [{}], exists in local store and has md5 [{}]", name, md.md5());
logger.trace("recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", node, name, md.md5());
}
}
}
if (!useExisting) {
if (startRecoveryRequest.existingFiles.containsKey(name)) {
logger.trace("recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", node, name, startRecoveryRequest.existingFiles.get(name).md5(), md.md5());
} else {
logger.trace("recovery [phase1] to {}: recovering [{}], does not exists in remote", node, name);
}
recoveryStatus.phase1FileNames.add(name);
recoveryStatus.phase1FileSizes.add(md.sizeInBytes());
totalSize += md.sizeInBytes();
@ -457,17 +463,17 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size());
logger.trace("recovery [phase2] to {}: sending transaction log operations", node);
StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(node, prepareForTranslogOperationsTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet();
sendSnapshot(snapshot);
int totalOperations = sendSnapshot(snapshot);
stopWatch.stop();
logger.trace("recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = snapshot.size();
recoveryStatus.phase2Operations = totalOperations;
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
@ -482,9 +488,9 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size());
logger.trace("recovery [phase3] to {}: sending transaction log operations", node);
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot);
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(node, finalizeRecoveryTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet();
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
@ -499,7 +505,7 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
stopWatch.stop();
logger.trace("recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = snapshot.size();
recoveryStatus.phase3Operations = totalOperations;
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
@ -508,12 +514,25 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
}
private void sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
TranslogOperationsRequest request = new TranslogOperationsRequest();
for (Translog.Operation operation : snapshot) {
request.operations.add(operation);
int translogBatchSize = 10; // TODO make this configurable
int counter = 0;
int totalOperations = 0;
while (snapshot.hasNext()) {
request.operations.add(snapshot.next());
totalOperations++;
if (++counter == translogBatchSize) {
transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0;
request.operations.clear();
}
}
transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet();
// send the leftover
if (!request.operations.isEmpty()) {
transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet();
}
return totalOperations;
}
});
channel.sendResponse(recoveryStatus);
@ -696,7 +715,9 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
if (closed) {
throw new IndexShardClosedException(shardId);
}
indexShard.performRecoveryOperations(snapshot.operations);
for (Translog.Operation operation : snapshot.operations) {
indexShard.performRecoveryOperation(operation);
}
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;

View File

@ -98,7 +98,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.queryParserService = queryParserService;
this.indexCache = indexCache;
state = IndexShardState.CREATED;
logger.debug("Moved to state [CREATED]");
logger.debug("state: [CREATED]");
}
public Store store() {
@ -123,7 +123,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
if (this.shardRouting != null) {
if (!shardRouting.primary() && this.shardRouting.primary()) {
logger.warn("Suspect illegal state: Trying to move shard from primary mode to backup mode");
logger.warn("suspect illegal state: trying to move shard from primary mode to backup mode");
}
}
this.shardRouting = shardRouting;
@ -146,8 +146,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state == IndexShardState.RECOVERING) {
throw new IndexShardRecoveringException(shardId);
}
logger.debug("state: [{}]->[{}]", state, IndexShardState.RECOVERING);
state = IndexShardState.RECOVERING;
logger.debug("Moved to state [RECOVERING]");
return returnValue;
}
}
@ -157,7 +157,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (this.state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
logger.debug("Restored to state [{}] from state [{}]", stateToRestore, state);
logger.debug("state: [{}]->[{}], restored after recovery", state, stateToRestore);
this.state = stateToRestore;
}
return this;
@ -168,7 +168,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
logger.debug("Moved to state [RELOCATED]");
logger.debug("state: [{}]->[{}]", state, IndexShardState.RELOCATED);
state = IndexShardState.RELOCATED;
}
return this;
@ -187,7 +187,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
engine.start();
scheduleRefresherIfNeeded();
logger.debug("Moved to state [STARTED]");
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
}
return this;
@ -217,7 +217,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
ParsedDocument doc = docMapper.parse(type, id, source);
if (logger.isTraceEnabled()) {
logger.trace("Indexing {}", doc);
logger.trace("index {}", doc);
}
engine.create(new Engine.Create(doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source()));
return doc;
@ -235,7 +235,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
ParsedDocument doc = docMapper.parse(type, id, source);
if (logger.isTraceEnabled()) {
logger.trace("Indexing {}", doc);
logger.trace("index {}", doc);
}
engine.index(new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source()));
return doc;
@ -257,7 +257,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private void innerDelete(Term uid) {
if (logger.isTraceEnabled()) {
logger.trace("Deleting [{}]", uid.text());
logger.trace("delete [{}]", uid.text());
}
engine.delete(new Engine.Delete(uid));
}
@ -282,7 +282,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
query = filterByTypesIfNeeded(query, types);
if (logger.isTraceEnabled()) {
logger.trace("Deleting By Query [{}]", query);
logger.trace("delete_by_query [{}]", query);
}
engine.delete(new Engine.DeleteByQuery(query, querySource, queryParserName, types));
@ -299,13 +299,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
int docId = Lucene.docId(searcher.reader(), docMapper.uidMapper().term(type, id));
if (docId == Lucene.NO_DOC) {
if (logger.isTraceEnabled()) {
logger.trace("Get for [{}#{}] returned no result", type, id);
logger.trace("get for [{}#{}] returned no result", type, id);
}
return null;
}
Document doc = searcher.reader().document(docId, docMapper.sourceMapper().fieldSelector());
if (logger.isTraceEnabled()) {
logger.trace("Get for [{}#{}] returned [{}]", type, id, doc);
logger.trace("get for [{}#{}] returned [{}]", type, id, doc);
}
return docMapper.sourceMapper().value(doc);
} catch (IOException e) {
@ -338,7 +338,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
try {
long count = Lucene.count(searcher.searcher(), query, minScore);
if (logger.isTraceEnabled()) {
logger.trace("Count of [{}] is [{}]", query, count);
logger.trace("count of [{}] is [{}]", query, count);
}
return count;
} catch (IOException e) {
@ -351,7 +351,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public void refresh(Engine.Refresh refresh) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
logger.trace("Refresh with {}", refresh);
logger.trace("refresh with {}", refresh);
}
engine.refresh(refresh);
}
@ -359,7 +359,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public void flush(Engine.Flush flush) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
logger.trace("Flush with {}", flush);
logger.trace("flush with {}", flush);
}
engine.flush(flush);
}
@ -367,7 +367,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public void optimize(Engine.Optimize optimize) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
logger.trace("Optimize with {}", optimize);
logger.trace("optimize with {}", optimize);
}
engine.optimize(optimize);
}
@ -399,7 +399,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
refreshScheduledFuture = null;
}
}
logger.debug("Moved to state [CLOSED]");
logger.debug("state: [{}]->[{}]", state, IndexShardState.CLOSED);
state = IndexShardState.CLOSED;
}
}
@ -416,42 +416,36 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public void performRecoveryFinalization() throws ElasticSearchException {
synchronized (mutex) {
logger.debug("Moved to state [STARTED] post recovery (from another shard)");
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();
engine.refresh(new Engine.Refresh(true));
}
public void performRecoveryOperations(Iterable<Translog.Operation> operations) throws ElasticSearchException {
public void performRecoveryOperation(Translog.Operation operation) throws ElasticSearchException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
applyTranslogOperations(operations);
}
private void applyTranslogOperations(Iterable<Translog.Operation> snapshot) {
for (Translog.Operation operation : snapshot) {
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
innerCreate(create.type(), create.id(), create.source());
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
innerIndex(index.type(), index.id(), index.source());
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
innerDelete(delete.uid());
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
innerDeleteByQuery(deleteByQuery.source(), deleteByQuery.queryParserName(), deleteByQuery.types());
break;
default:
throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]");
}
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
innerCreate(create.type(), create.id(), create.source());
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
innerIndex(index.type(), index.id(), index.source());
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
innerDelete(delete.uid());
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
innerDeleteByQuery(deleteByQuery.source(), deleteByQuery.queryParserName(), deleteByQuery.types());
break;
default:
throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]");
}
}

View File

@ -37,7 +37,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
@ThreadSafe
public interface Translog extends IndexShardComponent {
@ -89,19 +89,25 @@ public interface Translog extends IndexShardComponent {
* A snapshot of the transaction log, allows to iterate over all the transaction log operations.
*/
@NotThreadSafe
static interface Snapshot extends Iterable<Operation>, Releasable, Streamable {
static interface Snapshot extends Releasable {
/**
* The id of the translog the snapshot was taken with.
*/
long translogId();
/**
* The number of translog operations in the snapshot.
*/
int size();
long position();
Iterable<Operation> skipTo(int skipTo);
/**
* Returns the internal length (*not* number of operations) of this snapshot.
*/
long length();
boolean hasNext();
Operation next();
void seekForward(long position);
}
/**

View File

@ -20,37 +20,28 @@
package org.elasticsearch.index.translog.memory;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.collect.Iterables;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import static org.elasticsearch.index.translog.TranslogStreams.*;
import java.util.Queue;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class MemorySnapshot implements Translog.Snapshot {
private long id;
private final long id;
Translog.Operation[] operations;
private final Iterator<Translog.Operation> operationsIt;
public MemorySnapshot() {
}
private final long length;
public MemorySnapshot(Translog.Snapshot snapshot) {
this(snapshot.translogId(), Iterables.toArray(snapshot, Translog.Operation.class));
}
private long position = 0;
public MemorySnapshot(long id, Translog.Operation[] operations) {
public MemorySnapshot(long id, Queue<Translog.Operation> operations, long length) {
this.id = id;
this.operations = operations;
this.operationsIt = operations.iterator();
this.length = length;
}
@Override public long translogId() {
@ -61,34 +52,29 @@ public class MemorySnapshot implements Translog.Snapshot {
return true;
}
@Override public int size() {
return operations.length;
@Override public long length() {
return length;
}
@Override public Iterator<Translog.Operation> iterator() {
return Arrays.asList(operations).iterator();
@Override public long position() {
return this.position;
}
@Override public Iterable<Translog.Operation> skipTo(int skipTo) {
if (operations.length < skipTo) {
throw new ElasticSearchIllegalArgumentException("skipTo [" + skipTo + "] is bigger than size [" + size() + "]");
}
return Arrays.asList(Arrays.copyOfRange(operations, skipTo, operations.length));
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readLong();
operations = new Translog.Operation[in.readVInt()];
for (int i = 0; i < operations.length; i++) {
operations[i] = readTranslogOperation(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
out.writeVInt(operations.length);
for (Translog.Operation op : operations) {
writeTranslogOperation(out, op);
@Override public boolean hasNext() {
return position < length;
}
@Override public Translog.Operation next() {
Translog.Operation operation = operationsIt.next();
position++;
return operation;
}
@Override public void seekForward(long position) {
long numberToSeek = this.position + position;
while (numberToSeek-- != 0) {
operationsIt.next();
}
this.position = position;
}
}

View File

@ -31,8 +31,8 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -47,11 +47,10 @@ public class MemoryTranslog extends AbstractIndexShardComponent implements Trans
private volatile long id;
// we use LinkedBlockingQueue and not LinkedTransferQueue since we clear it on #newTranslog
// and with LinkedTransferQueue, nodes are not really cleared, just marked causing for memory
// not to be cleaned properly (besides, clear is heavy..., "while ... poll").
private volatile Queue<Operation> operations;
private final AtomicInteger operationCounter = new AtomicInteger();
@Inject public MemoryTranslog(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
@ -72,18 +71,20 @@ public class MemoryTranslog extends AbstractIndexShardComponent implements Trans
synchronized (mutex) {
estimatedMemorySize.set(0);
operations = new LinkedTransferQueue<Operation>();
operationCounter.set(0);
this.id = id;
}
}
@Override public void add(Operation operation) throws TranslogException {
operations.add(operation);
operationCounter.incrementAndGet();
estimatedMemorySize.addAndGet(operation.estimateSize() + 50);
}
@Override public Snapshot snapshot() {
synchronized (mutex) {
return new MemorySnapshot(currentId(), operations.toArray(new Operation[0]));
return new MemorySnapshot(currentId(), operations, operationCounter.get());
}
}
@ -93,19 +94,12 @@ public class MemoryTranslog extends AbstractIndexShardComponent implements Trans
if (currentId() != snapshot.translogId()) {
return snapshot();
}
ArrayList<Operation> retVal = new ArrayList<Operation>();
int counter = 0;
int snapshotSize = memorySnapshot.operations.length;
for (Operation operation : operations) {
if (++counter > snapshotSize) {
retVal.add(operation);
}
}
return new MemorySnapshot(currentId(), retVal.toArray(new Operation[retVal.size()]));
MemorySnapshot newSnapshot = new MemorySnapshot(currentId(), operations, operationCounter.get());
newSnapshot.seekForward(memorySnapshot.length());
return newSnapshot;
}
}
@Override public void close() {
}
}

View File

@ -260,9 +260,10 @@ public abstract class AbstractSimpleEngineTests {
engine.snapshot(new Engine.SnapshotHandler<Void>() {
@Override public Void snapshot(final SnapshotIndexCommit snapshotIndexCommit1, final Translog.Snapshot translogSnapshot1) {
assertThat(snapshotIndexCommit1, snapshotIndexCommitExists());
assertThat(translogSnapshot1, translogSize(1));
Translog.Create create1 = (Translog.Create) translogSnapshot1.iterator().next();
assertThat(translogSnapshot1.hasNext(), equalTo(true));
Translog.Create create1 = (Translog.Create) translogSnapshot1.next();
assertThat(create1.source(), equalTo(B_1));
assertThat(translogSnapshot1.hasNext(), equalTo(false));
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override public Object call() throws Exception {
@ -288,9 +289,10 @@ public abstract class AbstractSimpleEngineTests {
assertThat(snapshotIndexCommit1, snapshotIndexCommitExists());
assertThat(snapshotIndexCommit2, snapshotIndexCommitExists());
assertThat(snapshotIndexCommit2.getSegmentsFileName(), not(equalTo(snapshotIndexCommit1.getSegmentsFileName())));
assertThat(translogSnapshot2, translogSize(1));
Translog.Create create3 = (Translog.Create) translogSnapshot2.iterator().next();
assertThat(translogSnapshot2.hasNext(), equalTo(true));
Translog.Create create3 = (Translog.Create) translogSnapshot2.next();
assertThat(create3.source(), equalTo(B_3));
assertThat(translogSnapshot2.hasNext(), equalTo(false));
return null;
}
});
@ -351,9 +353,10 @@ public abstract class AbstractSimpleEngineTests {
}
@Override public void phase2(Translog.Snapshot snapshot) throws EngineException {
assertThat(snapshot, translogSize(1));
Translog.Create create = (Translog.Create) snapshot.iterator().next();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.source(), equalTo(B_2));
assertThat(snapshot.hasNext(), equalTo(false));
}
@Override public void phase3(Translog.Snapshot snapshot) throws EngineException {
@ -375,8 +378,9 @@ public abstract class AbstractSimpleEngineTests {
}
@Override public void phase2(Translog.Snapshot snapshot) throws EngineException {
assertThat(snapshot, translogSize(1));
Translog.Create create = (Translog.Create) snapshot.iterator().next();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(snapshot.hasNext(), equalTo(false));
assertThat(create.source(), equalTo(B_2));
// add for phase3
@ -384,8 +388,9 @@ public abstract class AbstractSimpleEngineTests {
}
@Override public void phase3(Translog.Snapshot snapshot) throws EngineException {
assertThat(snapshot, translogSize(1));
Translog.Create create = (Translog.Create) snapshot.iterator().next();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(snapshot.hasNext(), equalTo(false));
assertThat(create.source(), equalTo(B_3));
}
});

View File

@ -27,8 +27,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Iterator;
import static org.elasticsearch.index.translog.TranslogSizeMatcher.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -55,7 +53,6 @@ public abstract class AbstractSimpleTranslogTests {
@Test public void testSimpleOperations() {
Translog.Snapshot snapshot = translog.snapshot();
assertThat(snapshot, translogSize(0));
snapshot.release();
@ -80,15 +77,25 @@ public abstract class AbstractSimpleTranslogTests {
snapshot.release();
snapshot = translog.snapshot();
Iterator<Translog.Operation> it = snapshot.iterator();
Translog.Create create = (Translog.Create) it.next();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.source(), equalTo(new byte[]{1}));
Translog.Index index = (Translog.Index) it.next();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{2}));
Translog.Delete delete = (Translog.Delete) it.next();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Delete delete = (Translog.Delete) snapshot.next();
assertThat(delete.uid(), equalTo(newUid("3")));
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) it.next();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) snapshot.next();
assertThat(deleteByQuery.source(), equalTo(new byte[]{4}));
assertThat(snapshot.hasNext(), equalTo(false));
snapshot.release();
long firstId = translog.currentId();
@ -108,16 +115,27 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
Translog.Create create = (Translog.Create) snapshot.iterator().next();
snapshot.release();
snapshot = translog.snapshot();
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.source(), equalTo(new byte[]{1}));
snapshot.release();
Translog.Snapshot snapshot1 = translog.snapshot();
translog.add(new Translog.Index("test", "2", new byte[]{2}));
snapshot = translog.snapshot(snapshot);
snapshot = translog.snapshot(snapshot1);
assertThat(snapshot, translogSize(1));
Translog.Index index = (Translog.Index) snapshot.iterator().next();
assertThat(index.source(), equalTo(new byte[]{2}));
snapshot.release();
snapshot = translog.snapshot(snapshot1);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{2}));
assertThat(snapshot.hasNext(), equalTo(false));
snapshot.release();
snapshot1.release();
}
@Test public void testSnapshotWithNewTranslog() {
@ -136,8 +154,13 @@ public abstract class AbstractSimpleTranslogTests {
snapshot = translog.snapshot(actualSnapshot);
assertThat(snapshot, translogSize(1));
Translog.Index index = (Translog.Index) snapshot.iterator().next();
snapshot.release();
snapshot = translog.snapshot(actualSnapshot);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{3}));
assertThat(snapshot.hasNext(), equalTo(false));
actualSnapshot.release();
snapshot.release();

View File

@ -36,7 +36,8 @@ public class TranslogSizeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
@Override public boolean matchesSafely(Translog.Snapshot snapshot) {
int count = 0;
for (Translog.Operation op : snapshot) {
while (snapshot.hasNext()) {
snapshot.next();
count++;
}
return size == count;

View File

@ -25,7 +25,7 @@ import org.elasticsearch.index.translog.Translog;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class MemorySimpleTranslogTests extends AbstractSimpleTranslogTests {

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.gateway.fs;
import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTests;
/**
* @author kimchy (shay.banon)
*/
public class BlobStoreSmallBufferSizeFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
}

View File

@ -0,0 +1,10 @@
cluster:
routing:
schedule: 100ms
gateway:
type: fs
fs:
buffer_size: 3b
index:
number_of_shards: 1
number_of_replicas: 1

View File

@ -24,12 +24,11 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayInputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@ -251,18 +250,10 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
try {
long time = System.currentTimeMillis();
BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached();
streamOutput.writeInt(translogSnapshot.size());
for (Translog.Operation operation : translogSnapshot) {
translogNumberOfOperations++;
writeTranslogOperation(streamOutput, operation);
if (true) {
throw new ElasticSearchIllegalStateException("cloud plugin is currently disabled");
}
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
blobStoreContext.getBlobStore().putBlob(container, blob);
currentTranslogPartToWrite++;
translogTime = System.currentTimeMillis() - time;
@ -274,18 +265,10 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
try {
long time = System.currentTimeMillis();
BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached();
streamOutput.writeInt(translogSnapshot.size() - snapshot.lastTranslogSize());
for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
translogNumberOfOperations++;
writeTranslogOperation(streamOutput, operation);
if (true) {
throw new ElasticSearchIllegalStateException("cloud plugin is currently disabled");
}
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
blobStoreContext.getBlobStore().putBlob(container, blob);
currentTranslogPartToWrite++;
translogTime = System.currentTimeMillis() - time;
@ -433,7 +416,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (latestTranslogId == -1) {
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES));
return new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES));
}
@ -462,10 +445,9 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
currentTranslogPartToWrite = index;
indexShard.performRecoveryPrepareForTranslog();
indexShard.performRecoveryOperations(operations);
indexShard.performRecoveryFinalization();
return new RecoveryStatus.Translog(latestTranslogId, operations.size(), new ByteSizeValue(size, ByteSizeUnit.BYTES));
return new RecoveryStatus.Translog(latestTranslogId, operations.size(), 0, new ByteSizeValue(size, ByteSizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
}