Translog: remove ChannelReference and simplify Views

Currently we use ref counting to manage the life cycles of a translog file. This was done to allow the creation of view and snapshots, making sure that the underlying files are available. This commit takes a simpler route based on the observation that a snapshot doesn't need to have it's own life cycle but rather can lift on the lifecycle of it's parent (translog or view). If code failes to adhere to this assumption it will get a channel already closed exception. As such, each file is now owned by a single owner and there is no need for reference counting. As part of the rewrite TranslogReader is renamed to  BaseTranslogReader and ImmutableTranslogReader to TranslogReader

Also, I took the opportunity to clean up legacy translog readers we don't need in master.

Closes #15898
This commit is contained in:
Boaz Leskes 2016-01-11 15:30:51 +01:00
parent 8bf8ca6222
commit a90b551e36
15 changed files with 634 additions and 1093 deletions

View File

@ -231,7 +231,8 @@ public class InternalEngine extends Engine {
protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogGeneration translogGeneration) throws IOException {
int opsRecovered = 0;
final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer();
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
try {
Translog.Snapshot snapshot = translog.newSnapshot();
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
/**
* A base class for all classes that allows reading ops from translog files
*/
public abstract class BaseTranslogReader implements Comparable<BaseTranslogReader> {
protected final long generation;
protected final FileChannel channel;
protected final Path path;
protected final long firstOperationOffset;
public BaseTranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset) {
assert Translog.parseIdFromFileName(path) == generation : "generation missmatch. Path: " + Translog.parseIdFromFileName(path) + " but generation: " + generation;
this.generation = generation;
this.path = path;
this.channel = channel;
this.firstOperationOffset = firstOperationOffset;
}
public long getGeneration() {
return this.generation;
}
public abstract long sizeInBytes();
abstract public int totalOperations();
public final long getFirstOperationOffset() {
return firstOperationOffset;
}
public Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]";
ByteBuffer buffer = ByteBuffer.allocate(location.size);
try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) {
return read(checksumStreamInput);
}
}
/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
protected final int readSize(ByteBuffer reusableBuffer, long position) {
// read op size from disk
assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]";
try {
reusableBuffer.clear();
reusableBuffer.limit(4);
readBytes(reusableBuffer, position);
reusableBuffer.flip();
// Add an extra 4 to account for the operation size integer itself
final int size = reusableBuffer.getInt() + 4;
final long maxSize = sizeInBytes() - position;
if (size < 0 || size > maxSize) {
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
return size;
} catch (IOException e) {
throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.path, e);
}
}
public Translog.Snapshot newSnapshot() {
return new TranslogSnapshot(generation, channel, path, firstOperationOffset, sizeInBytes(), totalOperations());
}
/**
* reads an operation at the given position and returns it. The buffer length is equal to the number
* of bytes reads.
*/
protected final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException {
final ByteBuffer buffer;
if (reusableBuffer.capacity() >= opSize) {
buffer = reusableBuffer;
} else {
buffer = ByteBuffer.allocate(opSize);
}
buffer.clear();
buffer.limit(opSize);
readBytes(buffer, position);
buffer.flip();
return new BufferedChecksumStreamInput(new ByteBufferStreamInput(buffer), reuse);
}
protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException {
return Translog.readOperation(inStream);
}
/**
* reads bytes at position into the given buffer, filling it.
*/
abstract protected void readBytes(ByteBuffer buffer, long position) throws IOException;
@Override
public String toString() {
return "translog [" + generation + "][" + path + "]";
}
@Override
public int compareTo(BaseTranslogReader o) {
return Long.compare(getGeneration(), o.getGeneration());
}
public Path path() {
return path;
}
}

View File

@ -1,71 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
final class ChannelReference extends AbstractRefCounted {
private final Path file;
private final FileChannel channel;
protected final long generation;
private final Callback<ChannelReference> onClose;
ChannelReference(Path file, long generation, FileChannel channel, Callback<ChannelReference> onClose) throws IOException {
super(file.toString());
this.generation = generation;
this.file = file;
this.channel = channel;
this.onClose = onClose;
}
public long getGeneration() {
return generation;
}
public Path getPath() {
return this.file;
}
public FileChannel getChannel() {
return this.channel;
}
@Override
public String toString() {
return "channel: file [" + file + "], ref count [" + refCount() + "]";
}
@Override
protected void closeInternal() {
try {
IOUtils.closeWhileHandlingException(channel);
} finally {
if (onClose != null) {
onClose.handle(this);
}
}
}
}

View File

@ -1,58 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import java.io.IOException;
/**
* Version 0 of the translog format, there is no header in this file
*/
@Deprecated
public final class LegacyTranslogReader extends LegacyTranslogReaderBase {
/**
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
*/
LegacyTranslogReader(long generation, ChannelReference channelReference, long fileLength) {
super(generation, channelReference, 0, fileLength);
}
@Override
protected Translog.Operation read(BufferedChecksumStreamInput in) throws IOException {
// read the opsize before an operation.
// Note that this was written & read out side of the stream when this class was used, but it makes things more consistent
// to read this here
in.readInt();
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation = Translog.newOperationFromType(type);
operation.readFrom(in);
return operation;
}
@Override
protected ImmutableTranslogReader newReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) {
assert totalOperations == -1 : "expected unknown but was: " + totalOperations;
assert firstOperationOffset == 0;
return new LegacyTranslogReader(generation, channelReference, length);
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Version 1 of the translog format, there is checkpoint and therefore no notion of op count
*/
@Deprecated
class LegacyTranslogReaderBase extends ImmutableTranslogReader {
/**
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
*
*/
LegacyTranslogReaderBase(long generation, ChannelReference channelReference, long firstOperationOffset, long fileLength) {
super(generation, channelReference, firstOperationOffset, fileLength, TranslogReader.UNKNOWN_OP_COUNT);
}
@Override
protected Translog.Snapshot newReaderSnapshot(final int totalOperations, ByteBuffer reusableBuffer) {
assert totalOperations == -1 : "legacy we had no idea how many ops: " + totalOperations;
return new ReaderSnapshot(totalOperations, reusableBuffer) {
@Override
public Translog.Operation next() throws IOException {
if (position >= sizeInBytes()) { // this is the legacy case....
return null;
}
try {
return readOperation();
} catch (TruncatedTranslogException ex) {
return null; // legacy case
}
}
};
}
@Override
protected ImmutableTranslogReader newReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) {
assert totalOperations == -1 : "expected unknown but was: " + totalOperations;
return new LegacyTranslogReaderBase(generation, channelReference, firstOperationOffset, length);
}
}

View File

@ -19,12 +19,8 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasables;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Arrays;
/**
* A snapshot composed out of multiple snapshots
@ -32,8 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
final class MultiSnapshot implements Translog.Snapshot {
private final Translog.Snapshot[] translogs;
private AtomicBoolean closed = new AtomicBoolean(false);
private final int estimatedTotalOperations;
private final int totalOperations;
private int index;
/**
@ -41,30 +36,18 @@ final class MultiSnapshot implements Translog.Snapshot {
*/
MultiSnapshot(Translog.Snapshot[] translogs) {
this.translogs = translogs;
int ops = 0;
for (Translog.Snapshot translog : translogs) {
final int tops = translog.estimatedTotalOperations();
if (tops == TranslogReader.UNKNOWN_OP_COUNT) {
ops = TranslogReader.UNKNOWN_OP_COUNT;
break;
}
assert tops >= 0 : "tops must be positive but was: " + tops;
ops += tops;
}
estimatedTotalOperations = ops;
totalOperations = Arrays.stream(translogs).mapToInt(Translog.Snapshot::totalOperations).sum();
index = 0;
}
@Override
public int estimatedTotalOperations() {
return estimatedTotalOperations;
public int totalOperations() {
return totalOperations;
}
@Override
public Translog.Operation next() throws IOException {
ensureOpen();
for (; index < translogs.length; index++) {
final Translog.Snapshot current = translogs[index];
Translog.Operation op = current.next();
@ -74,17 +57,4 @@ final class MultiSnapshot implements Translog.Snapshot {
}
return null;
}
protected void ensureOpen() {
if (closed.get()) {
throw new AlreadyClosedException("snapshot already closed");
}
}
@Override
public void close() throws ElasticsearchException {
if (closed.compareAndSet(false, true)) {
Releasables.close(translogs);
}
}
}

View File

@ -34,12 +34,9 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@ -53,7 +50,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@ -69,6 +65,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* A Translog is a per index shard component that records all non-committed index operations in a durable manner.
@ -112,29 +110,25 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
private final List<ImmutableTranslogReader> recoveredTranslogs;
// the list of translog readers is guaranteed to be in order of translog generation
private final List<TranslogReader> readers = new ArrayList<>();
private volatile ScheduledFuture<?> syncScheduler;
// this is a concurrent set and is not protected by any of the locks. The main reason
// is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed)
// is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed)
private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet();
private BigArrays bigArrays;
protected final ReleasableLock readLock;
protected final ReleasableLock writeLock;
private final Path location;
private TranslogWriter current;
private volatile ImmutableTranslogReader currentCommittingTranslog;
private volatile long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion.
private final static long NOT_SET_GENERATION = -1; // -1 is safe as it will not cause a translog deletion.
private volatile long currentCommittingGeneration = NOT_SET_GENERATION;
private volatile long lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;
private final AtomicBoolean closed = new AtomicBoolean();
private final TranslogConfig config;
private final String translogUUID;
private Callback<View> onViewClose = new Callback<View>() {
@Override
public void handle(View view) {
logger.trace("closing view starting at translog [{}]", view.minTranslogGeneration());
boolean removed = outstandingViews.remove(view);
assert removed : "View was never set but was supposed to be removed";
}
};
/**
@ -176,11 +170,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// if not we don't even try to clean it up and wait until we fail creating it
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName());
}
this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint);
if (recoveredTranslogs.isEmpty()) {
this.readers.addAll(recoverFromFiles(translogGeneration, checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
}
boolean success = false;
@ -193,11 +187,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// for instance if we have a lot of tlog and we can't create the writer we keep on holding
// on to all the uncommitted tlog files if we don't close
if (success == false) {
IOUtils.closeWhileHandlingException(recoveredTranslogs);
IOUtils.closeWhileHandlingException(readers);
}
}
} else {
this.recoveredTranslogs = Collections.emptyList();
IOUtils.rm(location);
logger.debug("wipe translog location - creating new translog");
Files.createDirectories(location);
@ -205,21 +198,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Checkpoint checkpoint = new Checkpoint(0, 0, generation);
Checkpoint.write(location.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
current = createWriter(generation);
this.lastCommittedTranslogFileGeneration = -1; // playing safe
this.lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;
}
// now that we know which files are there, create a new current one.
} catch (Throwable t) {
// close the opened translog files if we fail to create a new translog...
IOUtils.closeWhileHandlingException(currentCommittingTranslog, current);
IOUtils.closeWhileHandlingException(current);
IOUtils.closeWhileHandlingException(readers);
throw t;
}
}
/** recover all translog files found on disk */
private final ArrayList<ImmutableTranslogReader> recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException {
private final ArrayList<TranslogReader> recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException {
boolean success = false;
ArrayList<ImmutableTranslogReader> foundTranslogs = new ArrayList<>();
ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
boolean tempFileRenamed = false;
try (ReleasableLock lock = writeLock.acquire()) {
@ -230,7 +224,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
if (Files.exists(committedTranslogFile) == false) {
throw new IllegalStateException("translog file doesn't exist with generation: " + i + " lastCommitted: " + lastCommittedTranslogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
}
final ImmutableTranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
foundTranslogs.add(reader);
logger.debug("recovered local translog from checkpoint {}", checkpoint);
}
@ -267,17 +261,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return foundTranslogs;
}
ImmutableTranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
final long generation;
try {
generation = parseIdFromFileName(path);
} catch (IllegalArgumentException ex) {
throw new TranslogException(shardId, "failed to parse generation from file name matching pattern " + path, ex);
}
TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
try {
final ChannelReference raf = new ChannelReference(path, generation, channel, new OnCloseRunnable());
ImmutableTranslogReader reader = ImmutableTranslogReader.open(raf, checkpoint, translogUUID);
assert Translog.parseIdFromFileName(path) == checkpoint.generation : "expected generation: " + Translog.parseIdFromFileName(path) + " but got: " + checkpoint.generation;
TranslogReader reader = TranslogReader.open(channel, path, checkpoint, translogUUID);
channel = null;
return reader;
} finally {
@ -315,12 +303,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try {
current.sync();
} finally {
try {
IOUtils.close(current, currentCommittingTranslog);
} finally {
IOUtils.close(recoveredTranslogs);
recoveredTranslogs.clear();
}
closeFilesIfNoPendingViews();
}
} finally {
FutureUtils.cancel(syncScheduler);
@ -349,41 +332,49 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
* Note: may return -1 if unknown
*/
public int totalOperations() {
int ops = 0;
try (ReleasableLock lock = readLock.acquire()) {
ops += current.totalOperations();
if (currentCommittingTranslog != null) {
int tops = currentCommittingTranslog.totalOperations();
assert tops != TranslogReader.UNKNOWN_OP_COUNT;
assert tops >= 0;
ops += tops;
}
}
return ops;
return totalOperations(lastCommittedTranslogFileGeneration);
}
/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
public long sizeInBytes() {
long size = 0;
try (ReleasableLock lock = readLock.acquire()) {
size += current.sizeInBytes();
if (currentCommittingTranslog != null) {
size += currentCommittingTranslog.sizeInBytes();
}
return sizeInBytes(lastCommittedTranslogFileGeneration);
}
/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
*/
private int totalOperations(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
.filter(r -> r.getGeneration() >= minGeneration)
.mapToInt(BaseTranslogReader::totalOperations)
.sum();
}
}
/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
private long sizeInBytes(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
.filter(r -> r.getGeneration() >= minGeneration)
.mapToLong(BaseTranslogReader::sizeInBytes)
.sum();
}
return size;
}
TranslogWriter createWriter(long fileGeneration) throws IOException {
TranslogWriter newFile;
try {
newFile = TranslogWriter.create(shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), getChannelFactory(), config.getBufferSize());
newFile = TranslogWriter.create(shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), getChannelFactory(), config.getBufferSize());
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
@ -398,12 +389,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
public Translog.Operation read(Location location) {
try (ReleasableLock lock = readLock.acquire()) {
final TranslogReader reader;
final BaseTranslogReader reader;
final long currentGeneration = current.getGeneration();
if (currentGeneration == location.generation) {
reader = current;
} else if (currentCommittingTranslog != null && currentCommittingTranslog.getGeneration() == location.generation) {
reader = currentCommittingTranslog;
} else if (readers.isEmpty() == false && readers.get(readers.size() - 1).getGeneration() == location.generation) {
reader = readers.get(readers.size() - 1);
} else if (currentGeneration < location.generation) {
throw new IllegalStateException("location generation [" + location.generation + "] is greater than the current generation [" + currentGeneration + "]");
} else {
@ -467,33 +458,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* Snapshots are fixed in time and will not be updated with future operations.
*/
public Snapshot newSnapshot() {
ensureOpen();
try (ReleasableLock lock = readLock.acquire()) {
ArrayList<TranslogReader> toOpen = new ArrayList<>();
toOpen.addAll(recoveredTranslogs);
if (currentCommittingTranslog != null) {
toOpen.add(currentCommittingTranslog);
}
toOpen.add(current);
return createSnapshot(toOpen.toArray(new TranslogReader[toOpen.size()]));
}
return createSnapshot(Long.MIN_VALUE);
}
private static Snapshot createSnapshot(TranslogReader... translogs) {
Snapshot[] snapshots = new Snapshot[translogs.length];
boolean success = false;
try {
for (int i = 0; i < translogs.length; i++) {
snapshots[i] = translogs[i].newSnapshot();
}
Snapshot snapshot = new MultiSnapshot(snapshots);
success = true;
return snapshot;
} finally {
if (success == false) {
Releasables.close(snapshots);
}
private Snapshot createSnapshot(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Snapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> reader.getGeneration() >= minGeneration)
.map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new);
return new MultiSnapshot(snapshots);
}
}
@ -502,25 +476,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* while receiving future ones as well
*/
public Translog.View newView() {
// we need to acquire the read lock to make sure no new translog is created
// and will be missed by the view we're making
try (ReleasableLock lock = readLock.acquire()) {
ArrayList<TranslogReader> translogs = new ArrayList<>();
try {
if (currentCommittingTranslog != null) {
translogs.add(currentCommittingTranslog.clone());
}
translogs.add(current.newReaderFromWriter());
View view = new View(translogs, onViewClose);
// this is safe as we know that no new translog is being made at the moment
// (we hold a read lock) and the view will be notified of any future one
outstandingViews.add(view);
translogs.clear();
return view;
} finally {
// close if anything happend and we didn't reach the clear
IOUtils.closeWhileHandlingException(translogs);
}
ensureOpen();
View view = new View(lastCommittedTranslogFileGeneration);
outstandingViews.add(view);
return view;
}
}
@ -561,7 +521,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
if (location.generation == current.generation) { // if we have a new one it's already synced
if (location.generation == current.getGeneration()) { // if we have a new one it's already synced
ensureOpen();
return current.syncUpTo(location.translogLocation + location.size);
}
@ -604,151 +564,67 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return config;
}
private final class OnCloseRunnable implements Callback<ChannelReference> {
@Override
public void handle(ChannelReference channelReference) {
if (isReferencedGeneration(channelReference.getGeneration()) == false) {
Path translogPath = channelReference.getPath();
assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath;
// if the given translogPath is not the current we can safely delete the file since all references are released
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
IOUtils.deleteFilesIgnoringExceptions(translogPath);
IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
}
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location)) {
for (Path path : stream) {
Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString());
if (matcher.matches()) {
long generation = Long.parseLong(matcher.group(1));
if (isReferencedGeneration(generation) == false) {
logger.trace("delete translog file - not referenced and not current anymore {}", path);
IOUtils.deleteFilesIgnoringExceptions(path);
IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
}
}
}
} catch (IOException e) {
logger.warn("failed to delete unreferenced translog files", e);
}
}
}
/**
* a view into the translog, capturing all translog file at the moment of creation
* and updated with any future translog.
*/
public static final class View implements Closeable {
public static final Translog.View EMPTY_VIEW = new View(Collections.emptyList(), null);
/**
* a view into the translog, capturing all translog file at the moment of creation
* and updated with any future translog.
*/
public class View implements Closeable {
boolean closed;
// last in this list is always FsTranslog.current
final List<TranslogReader> orderedTranslogs;
private final Callback<View> onClose;
AtomicBoolean closed = new AtomicBoolean();
final long minGeneration;
View(List<TranslogReader> orderedTranslogs, Callback<View> onClose) {
// clone so we can safely mutate..
this.orderedTranslogs = new ArrayList<>(orderedTranslogs);
this.onClose = onClose;
}
/**
* Called by the parent class when ever the current translog changes
*
* @param oldCurrent a new read only reader for the old current (should replace the previous reference)
* @param newCurrent a reader into the new current.
*/
synchronized void onNewTranslog(TranslogReader oldCurrent, TranslogReader newCurrent) throws IOException {
// even though the close method removes this view from outstandingViews, there is no synchronisation in place
// between that operation and an ongoing addition of a new translog, already having an iterator.
// As such, this method can be called despite of the fact that we are closed. We need to check and ignore.
if (closed) {
// we have to close the new references created for as as we will not hold them
IOUtils.close(oldCurrent, newCurrent);
return;
}
orderedTranslogs.remove(orderedTranslogs.size() - 1).close();
orderedTranslogs.add(oldCurrent);
orderedTranslogs.add(newCurrent);
View(long minGeneration) {
this.minGeneration = minGeneration;
}
/** this smallest translog generation in this view */
public synchronized long minTranslogGeneration() {
ensureOpen();
return orderedTranslogs.get(0).getGeneration();
public long minTranslogGeneration() {
return minGeneration;
}
/**
* The total number of operations in the view.
*/
public synchronized int totalOperations() {
int ops = 0;
for (TranslogReader translog : orderedTranslogs) {
int tops = translog.totalOperations();
if (tops == TranslogReader.UNKNOWN_OP_COUNT) {
return -1;
}
assert tops >= 0;
ops += tops;
}
return ops;
public int totalOperations() {
return Translog.this.totalOperations(minGeneration);
}
/**
* Returns the size in bytes of the files behind the view.
*/
public synchronized long sizeInBytes() {
long size = 0;
for (TranslogReader translog : orderedTranslogs) {
size += translog.sizeInBytes();
}
return size;
public long sizeInBytes() {
return Translog.this.sizeInBytes(minGeneration);
}
/** create a snapshot from this view */
public synchronized Snapshot snapshot() {
public Snapshot snapshot() {
ensureOpen();
return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()]));
return Translog.this.createSnapshot(minGeneration);
}
void ensureOpen() {
if (closed) {
throw new ElasticsearchException("View is already closed");
if (closed.get()) {
throw new AlreadyClosedException("View is already closed");
}
}
@Override
public void close() {
final List<TranslogReader> toClose = new ArrayList<>();
try {
synchronized (this) {
if (closed == false) {
try {
if (onClose != null) {
onClose.handle(this);
}
} finally {
closed = true;
toClose.addAll(orderedTranslogs);
orderedTranslogs.clear();
}
}
}
} finally {
try {
// Close out of lock to prevent deadlocks between channel close which checks for
// references in InternalChannelReference.closeInternal (waiting on a read lock)
// and other FsTranslog#newTranslog calling FsView.onNewTranslog (while having a write lock)
IOUtils.close(toClose);
} catch (Exception e) {
throw new ElasticsearchException("failed to close view", e);
}
public void close() throws IOException {
if (closed.getAndSet(true) == false) {
logger.trace("closing view starting at translog [{}]", minTranslogGeneration());
boolean removed = outstandingViews.remove(this);
assert removed : "View was never set but was supposed to be removed";
trimUnreferencedReaders();
closeFilesIfNoPendingViews();
}
}
}
public static class Location implements Accountable, Comparable<Location> {
public final long generation;
@ -817,12 +693,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/**
* A snapshot of the transaction log, allows to iterate over all the transaction log operations.
*/
public interface Snapshot extends Releasable {
public interface Snapshot {
/**
* The total number of operations in the translog.
*/
int estimatedTotalOperations();
int totalOperations();
/**
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
@ -1320,13 +1196,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public void prepareCommit() throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (currentCommittingTranslog != null) {
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
if (currentCommittingGeneration != NOT_SET_GENERATION) {
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration);
}
final TranslogWriter oldCurrent = current;
oldCurrent.ensureOpen();
oldCurrent.sync();
currentCommittingTranslog = current.immutableReader();
currentCommittingGeneration = current.getGeneration();
TranslogReader currentCommittingTranslog = current.closeIntoReader();
readers.add(currentCommittingTranslog);
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration()));
@ -1335,14 +1210,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
IOUtils.fsync(commitCheckpoint.getParent(), true);
// create a new translog file - this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
// notify all outstanding views of the new translog (no views are created now as
// we hold a write lock).
for (View view : outstandingViews) {
view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter());
}
IOUtils.close(oldCurrent);
logger.trace("current translog set to [{}]", current.getGeneration());
assert oldCurrent.syncNeeded() == false : "old translog oldCurrent must not need a sync";
} catch (Throwable t) {
IOUtils.closeWhileHandlingException(this); // tragic event
@ -1352,24 +1220,53 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public void commit() throws IOException {
ImmutableTranslogReader toClose = null;
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (currentCommittingTranslog == null) {
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
assert currentCommittingGeneration != NOT_SET_GENERATION;
assert readers.stream().filter(r -> r.getGeneration() == currentCommittingGeneration).findFirst().isPresent()
: "reader list doesn't contain committing generation [" + currentCommittingGeneration + "]";
lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up
if (recoveredTranslogs.isEmpty() == false) {
IOUtils.close(recoveredTranslogs);
recoveredTranslogs.clear();
}
toClose = this.currentCommittingTranslog;
this.currentCommittingTranslog = null;
} finally {
IOUtils.close(toClose);
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}
}
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
// we're shutdown potentially on some tragic event - don't delete anything
return;
}
long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
final long finalMinReferencedGen = minReferencedGen;
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
Path translogPath = unreferencedReader.path();
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
IOUtils.closeWhileHandlingException(unreferencedReader);
IOUtils.deleteFilesIgnoringExceptions(translogPath,
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
}
readers.removeAll(unreferenced);
}
}
void closeFilesIfNoPendingViews() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get() && outstandingViews.isEmpty()) {
logger.trace("closing files. translog is closed and there are no pending views");
ArrayList<Closeable> toClose = new ArrayList<>(readers);
toClose.add(current);
IOUtils.close(toClose);
}
}
}
@Override
public void rollback() throws IOException {
ensureOpen();
@ -1435,9 +1332,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return TranslogWriter.ChannelFactory.DEFAULT;
}
/** If this {@code Translog} was closed as a side-effect of a tragic exception,
* e.g. disk full while flushing a new segment, this returns the root cause exception.
* Otherwise (no tragic exception has occurred) it returns null. */
/**
* If this {@code Translog} was closed as a side-effect of a tragic exception,
* e.g. disk full while flushing a new segment, this returns the root cause exception.
* Otherwise (no tragic exception has occurred) it returns null.
*/
public Throwable getTragicException() {
return current.getTragicException();
}

View File

@ -27,161 +27,46 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A base class for all classes that allows reading ops from translog files
* an immutable translog filereader
*/
public abstract class TranslogReader implements Closeable, Comparable<TranslogReader> {
public static final int UNKNOWN_OP_COUNT = -1;
public class TranslogReader extends BaseTranslogReader implements Closeable {
private static final byte LUCENE_CODEC_HEADER_BYTE = 0x3f;
private static final byte UNVERSIONED_TRANSLOG_HEADER_BYTE = 0x00;
protected final long generation;
protected final ChannelReference channelReference;
protected final FileChannel channel;
private final int totalOperations;
protected final long length;
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected final long firstOperationOffset;
public TranslogReader(long generation, ChannelReference channelReference, long firstOperationOffset) {
this.generation = generation;
this.channelReference = channelReference;
this.channel = channelReference.getChannel();
this.firstOperationOffset = firstOperationOffset;
}
public long getGeneration() {
return this.generation;
}
public abstract long sizeInBytes();
abstract public int totalOperations();
public final long getFirstOperationOffset() {
return firstOperationOffset;
}
public Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]";
ByteBuffer buffer = ByteBuffer.allocate(location.size);
try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) {
return read(checksumStreamInput);
}
}
/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
private final int readSize(ByteBuffer reusableBuffer, long position) {
// read op size from disk
assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]";
try {
reusableBuffer.clear();
reusableBuffer.limit(4);
readBytes(reusableBuffer, position);
reusableBuffer.flip();
// Add an extra 4 to account for the operation size integer itself
final int size = reusableBuffer.getInt() + 4;
final long maxSize = sizeInBytes() - position;
if (size < 0 || size > maxSize) {
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
return size;
} catch (IOException e) {
throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.channelReference.getPath(), e);
}
}
public Translog.Snapshot newSnapshot() {
final ByteBuffer reusableBuffer = ByteBuffer.allocate(1024);
final int totalOperations = totalOperations();
channelReference.incRef();
return newReaderSnapshot(totalOperations, reusableBuffer);
/**
* Create a reader of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
*/
public TranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) {
super(generation, channel, path, firstOperationOffset);
this.length = length;
this.totalOperations = totalOperations;
}
/**
* reads an operation at the given position and returns it. The buffer length is equal to the number
* of bytes reads.
* Given a file, opens an {@link TranslogReader}, taking of checking and validating the file header.
*/
private final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException {
final ByteBuffer buffer;
if (reusableBuffer.capacity() >= opSize) {
buffer = reusableBuffer;
} else {
buffer = ByteBuffer.allocate(opSize);
}
buffer.clear();
buffer.limit(opSize);
readBytes(buffer, position);
buffer.flip();
return new BufferedChecksumStreamInput(new ByteBufferStreamInput(buffer), reuse);
}
protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException {
return Translog.readOperation(inStream);
}
/**
* reads bytes at position into the given buffer, filling it.
*/
abstract protected void readBytes(ByteBuffer buffer, long position) throws IOException;
@Override
public final void close() throws IOException {
if (closed.compareAndSet(false, true)) {
channelReference.decRef();
}
}
protected final boolean isClosed() {
return closed.get();
}
protected void ensureOpen() {
if (isClosed()) {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed");
}
}
@Override
public String toString() {
return "translog [" + generation + "][" + channelReference.getPath() + "]";
}
@Override
public int compareTo(TranslogReader o) {
return Long.compare(getGeneration(), o.getGeneration());
}
/**
* Given a file, return a VersionedTranslogStream based on an
* optionally-existing header in the file. If the file does not exist, or
* has zero length, returns the latest version. If the header does not
* exist, assumes Version 0 of the translog file format.
*/
public static ImmutableTranslogReader open(ChannelReference channelReference, Checkpoint checkpoint, String translogUUID) throws IOException {
final FileChannel channel = channelReference.getChannel();
final Path path = channelReference.getPath();
assert channelReference.getGeneration() == checkpoint.generation : "expected generation: " + channelReference.getGeneration() + " but got: " + checkpoint.generation;
public static TranslogReader open(FileChannel channel, Path path, Checkpoint checkpoint, String translogUUID) throws IOException {
try {
if (checkpoint.offset == 0 && checkpoint.numOps == TranslogReader.UNKNOWN_OP_COUNT) { // only old files can be empty
return new LegacyTranslogReader(channelReference.getGeneration(), channelReference, 0);
}
InputStreamStreamInput headerStream = new InputStreamStreamInput(Channels.newInputStream(channel)); // don't close
InputStreamStreamInput headerStream = new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel)); // don't close
// Lucene's CodecUtil writes a magic number of 0x3FD76C17 with the
// header, in binary this looks like:
//
@ -208,20 +93,17 @@ public abstract class TranslogReader implements Closeable, Comparable<TranslogRe
// ourselves here, because it allows us to read the first
// byte separately
if (header != CodecUtil.CODEC_MAGIC) {
throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header");
throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header. path:" + path);
}
// Confirm the rest of the header using CodecUtil, extracting
// the translog version
int version = CodecUtil.checkHeaderNoMagic(new InputStreamDataInput(headerStream), TranslogWriter.TRANSLOG_CODEC, 1, Integer.MAX_VALUE);
switch (version) {
case TranslogWriter.VERSION_CHECKSUMS:
assert checkpoint.numOps == TranslogReader.UNKNOWN_OP_COUNT : "expected unknown op count but got: " + checkpoint.numOps;
assert checkpoint.offset == Files.size(path) : "offset(" + checkpoint.offset + ") != file_size(" + Files.size(path) + ") for: " + path;
// legacy - we still have to support it somehow
return new LegacyTranslogReaderBase(channelReference.getGeneration(), channelReference, CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC), checkpoint.offset);
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
case TranslogWriter.VERSION_CHECKPOINTS:
assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path;
assert checkpoint.numOps > TranslogReader.UNKNOWN_OP_COUNT: "expected at least 0 operatin but got: " + checkpoint.numOps;
assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps;
assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint;
int len = headerStream.readInt();
if (len > channel.size()) {
@ -232,78 +114,61 @@ public abstract class TranslogReader implements Closeable, Comparable<TranslogRe
headerStream.read(ref.bytes, ref.offset, ref.length);
BytesRef uuidBytes = new BytesRef(translogUUID);
if (uuidBytes.bytesEquals(ref) == false) {
throw new TranslogCorruptedException("expected shard UUID [" + uuidBytes + "] but got: [" + ref + "] this translog file belongs to a different translog");
throw new TranslogCorruptedException("expected shard UUID [" + uuidBytes + "] but got: [" + ref + "] this translog file belongs to a different translog. path:" + path);
}
return new ImmutableTranslogReader(channelReference.getGeneration(), channelReference, ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + RamUsageEstimator.NUM_BYTES_INT, checkpoint.offset, checkpoint.numOps);
return new TranslogReader(checkpoint.generation, channel, path, ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + RamUsageEstimator.NUM_BYTES_INT, checkpoint.offset, checkpoint.numOps);
default:
throw new TranslogCorruptedException("No known translog stream version: " + version + " path:" + path);
}
} else if (b1 == UNVERSIONED_TRANSLOG_HEADER_BYTE) {
assert checkpoint.numOps == TranslogReader.UNKNOWN_OP_COUNT : "expected unknown op count but got: " + checkpoint.numOps;
assert checkpoint.offset == Files.size(path) : "offset(" + checkpoint.offset + ") != file_size(" + Files.size(path) + ") for: " + path;
return new LegacyTranslogReader(channelReference.getGeneration(), channelReference, checkpoint.offset);
throw new IllegalStateException("pre-1.4 translog found [" + path + "]");
} else {
throw new TranslogCorruptedException("Invalid first byte in translog file, got: " + Long.toHexString(b1) + ", expected 0x00 or 0x3f");
throw new TranslogCorruptedException("Invalid first byte in translog file, got: " + Long.toHexString(b1) + ", expected 0x00 or 0x3f. path:" + path);
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
throw new TranslogCorruptedException("Translog header corrupted", e);
throw new TranslogCorruptedException("Translog header corrupted. path:" + path, e);
}
}
public Path path() {
return channelReference.getPath();
public long sizeInBytes() {
return length;
}
protected Translog.Snapshot newReaderSnapshot(int totalOperations, ByteBuffer reusableBuffer) {
return new ReaderSnapshot(totalOperations, reusableBuffer);
public int totalOperations() {
return totalOperations;
}
class ReaderSnapshot implements Translog.Snapshot {
private final AtomicBoolean closed;
private final int totalOperations;
private final ByteBuffer reusableBuffer;
long position;
int readOperations;
private BufferedChecksumStreamInput reuse;
public ReaderSnapshot(int totalOperations, ByteBuffer reusableBuffer) {
this.totalOperations = totalOperations;
this.reusableBuffer = reusableBuffer;
closed = new AtomicBoolean(false);
position = firstOperationOffset;
readOperations = 0;
reuse = null;
/**
* reads an operation at the given position into the given buffer.
*/
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]");
}
@Override
public final int estimatedTotalOperations() {
return totalOperations;
if (position < firstOperationOffset) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + firstOperationOffset + "]");
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
}
@Override
public Translog.Operation next() throws IOException {
if (readOperations < totalOperations) {
assert readOperations < totalOperations : "readOpeartions must be less than totalOperations";
return readOperation();
} else {
return null;
}
public Checkpoint getInfo() {
return new Checkpoint(length, totalOperations, getGeneration());
}
@Override
public final void close() throws IOException {
if (closed.compareAndSet(false, true)) {
channel.close();
}
}
protected final Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);
position += opSize;
readOperations++;
return op;
}
protected final boolean isClosed() {
return closed.get();
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
channelReference.decRef();
}
protected void ensureOpen() {
if (isClosed()) {
throw new AlreadyClosedException(toString() + " is already closed");
}
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.common.io.Channels;
@ -24,68 +23,82 @@ import org.elasticsearch.common.io.Channels;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
/**
* a translog reader which is fixed in length
*/
public class ImmutableTranslogReader extends TranslogReader {
public class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
private final int totalOperations;
protected final long length;
private final ByteBuffer reusableBuffer;
private long position;
private int readOperations;
private BufferedChecksumStreamInput reuse;
/**
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
*/
public ImmutableTranslogReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) {
super(generation, channelReference, firstOperationOffset);
public TranslogSnapshot(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) {
super(generation, channel, path, firstOperationOffset);
this.length = length;
this.totalOperations = totalOperations;
this.reusableBuffer = ByteBuffer.allocate(1024);
readOperations = 0;
position = firstOperationOffset;
reuse = null;
}
@Override
public final TranslogReader clone() {
if (channelReference.tryIncRef()) {
try {
ImmutableTranslogReader reader = newReader(generation, channelReference, firstOperationOffset, length, totalOperations);
channelReference.incRef(); // for the new object
return reader;
} finally {
channelReference.decRef();
}
public final int totalOperations() {
return totalOperations;
}
@Override
public Translog.Operation next() throws IOException {
if (readOperations < totalOperations) {
return readOperation();
} else {
throw new IllegalStateException("can't increment translog [" + generation + "] channel ref count");
return null;
}
}
protected ImmutableTranslogReader newReader(long generation, ChannelReference channelReference, long offset, long length, int totalOperations) {
return new ImmutableTranslogReader(generation, channelReference, offset, length, totalOperations);
protected final Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);
position += opSize;
readOperations++;
return op;
}
public long sizeInBytes() {
return length;
}
public int totalOperations() {
return totalOperations;
}
/**
* reads an operation at the given position into the given buffer.
*/
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]");
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" + getGeneration() + "], path: [" + path + "]");
}
if (position < firstOperationOffset) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + firstOperationOffset + "]");
if (position < getFirstOperationOffset()) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
}
public Checkpoint getInfo() {
return new Checkpoint(length, totalOperations, getGeneration());
@Override
public String toString() {
return "TranslogSnapshot{" +
"readOperations=" + readOperations +
", position=" + position +
", totalOperations=" + totalOperations +
", length=" + length +
", reusableBuffer=" + reusableBuffer +
'}';
}
}
}

View File

@ -25,24 +25,23 @@ import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.index.shard.ShardId;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean;
public class TranslogWriter extends TranslogReader {
public class TranslogWriter extends BaseTranslogReader implements Closeable {
public static final String TRANSLOG_CODEC = "translog";
public static final int VERSION_CHECKSUMS = 1;
@ -61,11 +60,14 @@ public class TranslogWriter extends TranslogReader {
/* the total offset of this file including the bytes written to the file as well as into the buffer */
private volatile long totalOffset;
public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, ByteSizeValue bufferSize) throws IOException {
super(generation, channelReference, channelReference.getChannel().position());
protected final AtomicBoolean closed = new AtomicBoolean(false);
public TranslogWriter(ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException {
super(generation, channel, path, channel.position());
this.shardId = shardId;
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channelReference.getChannel()), bufferSize.bytesAsInt());
this.lastSyncedOffset = channelReference.getChannel().position();
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
this.lastSyncedOffset = channel.position();
totalOffset = lastSyncedOffset;
}
@ -74,10 +76,10 @@ public class TranslogWriter extends TranslogReader {
}
private static int getHeaderLength(int uuidLength) {
return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT;
return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT;
}
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = getHeaderLength(ref.length);
final FileChannel channel = channelFactory.open(file);
@ -90,7 +92,7 @@ public class TranslogWriter extends TranslogReader {
out.writeBytes(ref.bytes, ref.offset, ref.length);
channel.force(true);
writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize);
final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, channel, file, bufferSize);
return writer;
} catch (Throwable throwable) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
@ -99,9 +101,12 @@ public class TranslogWriter extends TranslogReader {
throw throwable;
}
}
/** If this {@code TranslogWriter} was closed as a side-effect of a tragic exception,
* e.g. disk full while flushing a new segment, this returns the root cause exception.
* Otherwise (no tragic exception has occurred) it returns null. */
/**
* If this {@code TranslogWriter} was closed as a side-effect of a tragic exception,
* e.g. disk full while flushing a new segment, this returns the root cause exception.
* Otherwise (no tragic exception has occurred) it returns null.
*/
public Throwable getTragicException() {
return tragedy;
}
@ -110,7 +115,9 @@ public class TranslogWriter extends TranslogReader {
assert throwable != null : "throwable must not be null in a tragic event";
if (tragedy == null) {
tragedy = throwable;
} else {
} else if (tragedy != throwable) {
// it should be safe to call closeWithTragicEvents on multiple layers without
// worrying about self suppression.
tragedy.addSuppressed(throwable);
}
close();
@ -134,29 +141,27 @@ public class TranslogWriter extends TranslogReader {
}
/**
* write all buffered ops to disk and fsync file
* write all buffered ops to disk and fsync file.
*
* Note: any exception during the sync process will be interpreted as a tragic exception and the writer will be closed before
* raising the exception.
*/
public void sync() throws IOException {
if (syncNeeded()) {
synchronized (this) {
ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event
channelReference.incRef();
ensureOpen();
final long offsetToSync;
final int opsCounter;
try {
final long offsetToSync;
final int opsCounter;
outputStream.flush();
offsetToSync = totalOffset;
opsCounter = operationCounter;
try {
checkpoint(offsetToSync, opsCounter, channelReference);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
}
lastSyncedOffset = offsetToSync;
} finally {
channelReference.decRef();
checkpoint(offsetToSync, opsCounter, generation, channel, path);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
}
lastSyncedOffset = offsetToSync;
}
}
}
@ -177,76 +182,36 @@ public class TranslogWriter extends TranslogReader {
}
/**
* returns a new reader that follows the current writes (most importantly allows making
* repeated snapshots that includes new content)
* closes this writer and transfers it's underlying file channel to a new immutable reader
*/
public TranslogReader newReaderFromWriter() {
ensureOpen();
channelReference.incRef();
boolean success = false;
public synchronized TranslogReader closeIntoReader() throws IOException {
try {
final TranslogReader reader = new InnerReader(this.generation, firstOperationOffset, channelReference);
success = true;
return reader;
} finally {
if (!success) {
channelReference.decRef();
}
sync(); // sync before we close..
} catch (IOException e) {
closeWithTragicEvent(e);
throw e;
}
}
/**
* returns a new immutable reader which only exposes the current written operation *
*/
public ImmutableTranslogReader immutableReader() throws TranslogException {
if (channelReference.tryIncRef()) {
synchronized (this) {
try {
ensureOpen();
outputStream.flush();
ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, getWrittenOffset(), operationCounter);
channelReference.incRef(); // for new reader
return reader;
} catch (Exception e) {
throw new TranslogException(shardId, "exception while creating an immutable reader", e);
} finally {
channelReference.decRef();
}
}
if (closed.compareAndSet(false, true)) {
return new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter);
} else {
throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] ref count");
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy);
}
}
@Override
public synchronized Translog.Snapshot newSnapshot() {
ensureOpen();
try {
sync();
} catch (IOException e) {
throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
}
return super.newSnapshot();
}
private long getWrittenOffset() throws IOException {
return channelReference.getChannel().position();
}
/**
* this class is used when one wants a reference to this file which exposes all recently written operation.
* as such it needs access to the internals of the current reader
*/
final class InnerReader extends TranslogReader {
public InnerReader(long generation, long fistOperationOffset, ChannelReference channelReference) {
super(generation, channelReference, fistOperationOffset);
}
@Override
public long sizeInBytes() {
return TranslogWriter.this.sizeInBytes();
}
@Override
public int totalOperations() {
return TranslogWriter.this.totalOperations();
}
@Override
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
TranslogWriter.this.readBytes(buffer, position);
}
return channel.position();
}
/**
@ -264,13 +229,13 @@ public class TranslogWriter extends TranslogReader {
@Override
protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
if (position+targetBuffer.remaining() > getWrittenOffset()) {
if (position + targetBuffer.remaining() > getWrittenOffset()) {
synchronized (this) {
// we only flush here if it's really really needed - try to minimize the impact of the read operation
// in some cases ie. a tragic event we might still be able to read the relevant value
// which is not really important in production but some test can make most strict assumptions
// if we don't fail in this call unless absolutely necessary.
if (position+targetBuffer.remaining() > getWrittenOffset()) {
if (position + targetBuffer.remaining() > getWrittenOffset()) {
outputStream.flush();
}
}
@ -280,9 +245,9 @@ public class TranslogWriter extends TranslogReader {
Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
}
private synchronized void checkpoint(long lastSyncPosition, int operationCounter, ChannelReference channelReference) throws IOException {
channelReference.getChannel().force(false);
writeCheckpoint(lastSyncPosition, operationCounter, channelReference.getPath().getParent(), channelReference.getGeneration(), StandardOpenOption.WRITE);
private synchronized void checkpoint(long lastSyncPosition, int operationCounter, long generation, FileChannel translogFileChannel, Path translogFilePath) throws IOException {
translogFileChannel.force(false);
writeCheckpoint(lastSyncPosition, operationCounter, translogFilePath.getParent(), generation, StandardOpenOption.WRITE);
}
private static void writeCheckpoint(long syncPosition, int numOperations, Path translogFile, long generation, OpenOption... options) throws IOException {
@ -307,6 +272,17 @@ public class TranslogWriter extends TranslogReader {
}
}
@Override
public final void close() throws IOException {
if (closed.compareAndSet(false, true)) {
channel.close();
}
}
protected final boolean isClosed() {
return closed.get();
}
private final class BufferedChannelOutputStream extends BufferedOutputStream {

View File

@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -82,7 +83,7 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
}
}
private RecoveryResponse recover(final StartRecoveryRequest request) {
private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().index().name());
final IndexShard shard = indexService.getShard(request.shardId().id());

View File

@ -120,7 +120,7 @@ public class RecoverySourceHandler {
/**
* performs the recovery from the local engine to the target
*/
public RecoveryResponse recoverToTarget() {
public RecoveryResponse recoverToTarget() throws IOException {
try (Translog.View translogView = shard.acquireTranslogView()) {
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
final IndexCommit phase1Snapshot;
@ -144,8 +144,8 @@ public class RecoverySourceHandler {
}
logger.trace("snapshot translog for recovery. current size is [{}]", translogView.totalOperations());
try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
phase2(phase2Snapshot);
try {
phase2(translogView.snapshot());
} catch (Throwable e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
@ -308,7 +308,7 @@ public class RecoverySourceHandler {
});
}
prepareTargetForTranslog(translogView);
prepareTargetForTranslog(translogView.totalOperations());
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
@ -320,8 +320,7 @@ public class RecoverySourceHandler {
}
protected void prepareTargetForTranslog(final Translog.View translogView) {
protected void prepareTargetForTranslog(final int totalTranslogOps) {
StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
final long startEngineStart = stopWatch.totalTime().millis();
@ -332,7 +331,7 @@ public class RecoverySourceHandler {
// operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), translogView.totalOperations()),
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@ -463,14 +462,14 @@ public class RecoverySourceHandler {
// make sense to re-enable throttling in this phase
cancellableThreads.execute(() -> {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
request.recoveryId(), request.shardId(), operations, snapshot.totalOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
snapshot.estimatedTotalOperations(),
snapshot.totalOperations(),
request.targetNode());
}
@ -488,7 +487,7 @@ public class RecoverySourceHandler {
if (!operations.isEmpty()) {
cancellableThreads.execute(() -> {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
request.recoveryId(), request.shardId(), operations, snapshot.totalOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
@ -497,7 +496,7 @@ public class RecoverySourceHandler {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent final batch of [{}][{}] (total: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
snapshot.estimatedTotalOperations(),
snapshot.totalOperations(),
request.targetNode());
}
return totalOperations;

View File

@ -58,7 +58,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
shard.failShard("failed to close engine (phase1)", e);
}
}
prepareTargetForTranslog(Translog.View.EMPTY_VIEW);
prepareTargetForTranslog(0);
finalizeRecovery();
return response;
} catch (Throwable t) {

View File

@ -138,8 +138,8 @@ public class TranslogTests extends ESTestCase {
private TranslogConfig getTranslogConfig(Path path) {
Settings build = Settings.settingsBuilder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
}
@ -234,19 +234,16 @@ public class TranslogTests extends ESTestCase {
ArrayList<Translog.Operation> ops = new ArrayList<>();
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size()));
snapshot.close();
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
addToTranslogAndList(translog, ops, new Translog.Delete(newUid("2")));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size()));
snapshot.close();
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
snapshot = translog.newSnapshot();
@ -260,22 +257,18 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot.next(), equalTo(null));
snapshot.close();
long firstId = translog.currentFileGeneration();
translog.prepareCommit();
assertThat(translog.currentFileGeneration(), Matchers.not(equalTo(firstId)));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size()));
snapshot.close();
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
translog.commit();
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
assertThat(snapshot.estimatedTotalOperations(), equalTo(0));
snapshot.close();
assertThat(snapshot.totalOperations(), equalTo(0));
}
protected TranslogStats stats() throws IOException {
@ -337,9 +330,9 @@ public class TranslogTests extends ESTestCase {
assertEquals(6, copy.estimatedNumberOfOperations());
assertEquals(431, copy.getTranslogSizeInBytes());
assertEquals("\"translog\"{\n" +
" \"operations\" : 6,\n" +
" \"size_in_bytes\" : 431\n" +
"}", copy.toString().trim());
" \"operations\" : 6,\n" +
" \"size_in_bytes\" : 431\n" +
"}", copy.toString().trim());
try {
new TranslogStats(1, -1);
@ -359,51 +352,43 @@ public class TranslogTests extends ESTestCase {
ArrayList<Translog.Operation> ops = new ArrayList<>();
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
snapshot.close();
assertThat(snapshot.totalOperations(), equalTo(1));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
// snapshot while another is open
Translog.Snapshot snapshot1 = translog.newSnapshot();
assertThat(snapshot1, SnapshotMatchers.size(1));
assertThat(snapshot1.estimatedTotalOperations(), equalTo(1));
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(1));
snapshot.close();
snapshot1.close();
assertThat(snapshot1, SnapshotMatchers.size(1));
assertThat(snapshot1.totalOperations(), equalTo(1));
}
public void testSnapshotWithNewTranslog() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot snapshot1 = translog.newSnapshot();
addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2}));
assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0)));
translog.prepareCommit();
addToTranslogAndList(translog, ops, new Translog.Index("test", "3", new byte[]{3}));
Translog.Snapshot snapshot2 = translog.newSnapshot();
translog.commit();
assertThat(snapshot2, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot2.estimatedTotalOperations(), equalTo(ops.size()));
assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0)));
snapshot1.close();
snapshot2.close();
try (Translog.View view = translog.newView()) {
Translog.Snapshot snapshot2 = translog.newSnapshot();
translog.commit();
assertThat(snapshot2, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
}
}
public void testSnapshotOnClosedTranslog() throws IOException {
@ -418,39 +403,6 @@ public class TranslogTests extends ESTestCase {
}
}
public void testDeleteOnSnapshotRelease() throws Exception {
ArrayList<Translog.Operation> firstOps = new ArrayList<>();
addToTranslogAndList(translog, firstOps, new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot firstSnapshot = translog.newSnapshot();
assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1));
translog.commit();
assertFileIsPresent(translog, 1);
ArrayList<Translog.Operation> secOps = new ArrayList<>();
addToTranslogAndList(translog, secOps, new Translog.Index("test", "2", new byte[]{2}));
assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1));
Translog.Snapshot secondSnapshot = translog.newSnapshot();
translog.add(new Translog.Index("test", "3", new byte[]{3}));
assertThat(secondSnapshot, SnapshotMatchers.equalsTo(secOps));
assertThat(secondSnapshot.estimatedTotalOperations(), equalTo(1));
assertFileIsPresent(translog, 1);
assertFileIsPresent(translog, 2);
firstSnapshot.close();
assertFileDeleted(translog, 1);
assertFileIsPresent(translog, 2);
secondSnapshot.close();
assertFileIsPresent(translog, 2); // it's the current nothing should be deleted
translog.commit();
assertFileIsPresent(translog, 3); // it's the current nothing should be deleted
assertFileDeleted(translog, 2);
}
public void assertFileIsPresent(Translog translog, long id) {
if (Files.exists(translogDir.resolve(Translog.getFilename(id)))) {
return;
@ -624,14 +576,8 @@ public class TranslogTests extends ESTestCase {
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(1));
assertFileIsPresent(translog, 1);
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
if (randomBoolean()) {
translog.close();
snapshot.close();
} else {
snapshot.close();
translog.close();
}
assertThat(snapshot.totalOperations(), equalTo(1));
translog.close();
assertFileIsPresent(translog, 1);
}
@ -708,16 +654,21 @@ public class TranslogTests extends ESTestCase {
public void onFailure(Throwable t) {
logger.error("--> reader [{}] had an error", t, threadId);
errors.add(t);
closeView();
try {
closeView();
} catch (IOException e) {
logger.error("unexpected error while closing view, after failure");
t.addSuppressed(e);
}
}
void closeView() {
void closeView() throws IOException {
if (view != null) {
view.close();
}
}
void newView() {
void newView() throws IOException {
closeView();
view = translog.newView();
// captures the currently written ops so we know what to expect from the view
@ -738,17 +689,16 @@ public class TranslogTests extends ESTestCase {
// these are what we expect the snapshot to return (and potentially some more).
Set<Translog.Operation> expectedOps = new HashSet<>(writtenOps.keySet());
expectedOps.removeAll(writtenOpsAtView);
try (Translog.Snapshot snapshot = view.snapshot()) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
expectedOps.remove(op);
}
Translog.Snapshot snapshot = view.snapshot();
Translog.Operation op;
while ((op = snapshot.next()) != null) {
expectedOps.remove(op);
}
if (expectedOps.isEmpty() == false) {
StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()).append(" operations");
boolean failed = false;
for (Translog.Operation op : expectedOps) {
final Translog.Location loc = writtenOps.get(op);
for (Translog.Operation expectedOp : expectedOps) {
final Translog.Location loc = writtenOps.get(expectedOp);
if (loc.generation < view.minTranslogGeneration()) {
// writtenOps is only updated after the op was written to the translog. This mean
// that ops written to the translog before the view was taken (and will be missing from the view)
@ -756,7 +706,7 @@ public class TranslogTests extends ESTestCase {
continue;
}
failed = true;
missed.append("\n --> [").append(op).append("] written at ").append(loc);
missed.append("\n --> [").append(expectedOp).append("] written at ").append(loc);
}
if (failed) {
fail(missed.toString());
@ -803,7 +753,6 @@ public class TranslogTests extends ESTestCase {
}
}
public void testSyncUpTo() throws IOException {
int translogOperations = randomIntBetween(10, 100);
int count = 0;
@ -875,7 +824,7 @@ public class TranslogTests extends ESTestCase {
final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (final ImmutableTranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
assertEquals(lastSynced + 1, reader.totalOperations());
for (int op = 0; op < translogOperations; op++) {
Translog.Location location = locations.get(op);
@ -913,7 +862,7 @@ public class TranslogTests extends ESTestCase {
}
writer.sync();
final TranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)));
final BaseTranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)));
for (int i = 0; i < numOps; i++) {
ByteBuffer buffer = ByteBuffer.allocate(4);
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i);
@ -926,7 +875,7 @@ public class TranslogTests extends ESTestCase {
out.writeInt(2048);
writer.add(new BytesArray(bytes));
if (reader instanceof ImmutableTranslogReader) {
if (reader instanceof TranslogReader) {
ByteBuffer buffer = ByteBuffer.allocate(4);
try {
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * numOps);
@ -934,6 +883,7 @@ public class TranslogTests extends ESTestCase {
} catch (EOFException ex) {
// expected
}
((TranslogReader) reader).close();
} else {
// live reader!
ByteBuffer buffer = ByteBuffer.allocate(4);
@ -943,7 +893,7 @@ public class TranslogTests extends ESTestCase {
final int value = buffer.getInt();
assertEquals(2048, value);
}
IOUtils.close(writer, reader);
IOUtils.close(writer);
}
public void testBasicRecovery() throws IOException {
@ -971,19 +921,17 @@ public class TranslogTests extends ESTestCase {
assertEquals(0, translog.stats().estimatedNumberOfOperations());
assertEquals(1, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertNull(snapshot.next());
}
Translog.Snapshot snapshot = translog.newSnapshot();
assertNull(snapshot.next());
} else {
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
for (int i = minUncommittedOp; i < translogOperations; i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = translog.newSnapshot();
for (int i = minUncommittedOp; i < translogOperations; i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
@ -1014,13 +962,12 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = translog.newSnapshot();
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
if (randomBoolean()) { // recover twice
@ -1028,13 +975,12 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = translog.newSnapshot();
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
@ -1071,14 +1017,14 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = translog.newSnapshot();
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
if (randomBoolean()) { // recover twice
@ -1086,13 +1032,12 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = translog.newSnapshot();
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
@ -1132,13 +1077,12 @@ public class TranslogTests extends ESTestCase {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = translog.newSnapshot();
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
@ -1209,14 +1153,13 @@ public class TranslogTests extends ESTestCase {
}
config.setTranslogGeneration(translogGeneration);
this.translog = new Translog(config);
try (Translog.Snapshot snapshot = this.translog.newSnapshot()) {
for (int i = firstUncommitted; i < translogOperations; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("" + i, next);
assertEquals(Integer.parseInt(next.getSource().source.toUtf8()), i);
}
assertNull(snapshot.next());
Translog.Snapshot snapshot = this.translog.newSnapshot();
for (int i = firstUncommitted; i < translogOperations; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("" + i, next);
assertEquals(Integer.parseInt(next.getSource().source.toUtf8()), i);
}
assertNull(snapshot.next());
}
public void testFailOnClosedWrite() throws IOException {
@ -1287,12 +1230,12 @@ public class TranslogTests extends ESTestCase {
case CREATE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
1 + randomInt(100000),
randomFrom(VersionType.values()));
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
default:
throw new ElasticsearchException("not supported op type");
@ -1383,14 +1326,13 @@ public class TranslogTests extends ESTestCase {
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
assertEquals(opsSynced, snapshot.estimatedTotalOperations());
for (int i = 0; i < opsSynced; i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = tlog.newSnapshot();
assertEquals(opsSynced, snapshot.totalOperations());
for (int i = 0; i < opsSynced; i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
@ -1401,13 +1343,12 @@ public class TranslogTests extends ESTestCase {
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertEquals(opsAdded + 1, snapshot.estimatedTotalOperations());
for (int i = 0; i < opsAdded; i++) {
assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
}
Translog.Snapshot snapshot = this.translog.newSnapshot();
assertEquals(opsAdded + 1, snapshot.totalOperations());
for (int i = 0; i < opsAdded; i++) {
assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
}
}
}
@ -1511,20 +1452,20 @@ public class TranslogTests extends ESTestCase {
}
config.setTranslogGeneration(translog.getGeneration());
try (Translog tlog = new Translog(config)) {
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
if (writtenOperations.size() != snapshot.estimatedTotalOperations()) {
for (int i = 0; i < threadCount; i++) {
if (threadExceptions[i] != null)
threadExceptions[i].printStackTrace();
Translog.Snapshot snapshot = tlog.newSnapshot();
if (writtenOperations.size() != snapshot.totalOperations()) {
for (int i = 0; i < threadCount; i++) {
if (threadExceptions[i] != null) {
threadExceptions[i].printStackTrace();
}
}
assertEquals(writtenOperations.size(), snapshot.estimatedTotalOperations());
for (int i = 0; i < writtenOperations.size(); i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(next, writtenOperations.get(i).operation);
}
}
assertEquals(writtenOperations.size(), snapshot.totalOperations());
for (int i = 0; i < writtenOperations.size(); i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(next, writtenOperations.get(i).operation);
}
}
}
@ -1537,6 +1478,7 @@ public class TranslogTests extends ESTestCase {
private static class FailSwitch {
private volatile int failRate;
private volatile boolean onceFailedFailAlways = false;
public boolean fail() {
boolean fail = randomIntBetween(1, 100) <= failRate;
if (fail && onceFailedFailAlways) {
@ -1716,24 +1658,22 @@ public class TranslogTests extends ESTestCase {
try (Translog tlog = new Translog(config)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
for (int i = 0; i < 1; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = tlog.newSnapshot();
for (int i = 0; i < 1; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
}
try (Translog tlog = new Translog(config)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
for (int i = 0; i < 2; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = tlog.newSnapshot();
for (int i = 0; i < 2; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
@ -1749,7 +1689,7 @@ public class TranslogTests extends ESTestCase {
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
config.setTranslogGeneration(translogGeneration);
try {
try {
Translog tlog = new Translog(config);
fail("file already exists?");
} catch (TranslogException ex) {
@ -1758,6 +1698,7 @@ public class TranslogTests extends ESTestCase {
assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class);
}
}
public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException {
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
@ -1774,17 +1715,16 @@ public class TranslogTests extends ESTestCase {
try (Translog tlog = new Translog(config)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
for (int i = 0; i < 1; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
Translog.Snapshot snapshot = tlog.newSnapshot();
for (int i = 0; i < 1; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
}
try {
try {
Translog tlog = new Translog(config);
fail("file already exists?");
} catch (TranslogException ex) {
@ -1863,13 +1803,12 @@ public class TranslogTests extends ESTestCase {
}
try (Translog translog = new Translog(config)) {
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertEquals(syncedDocs.size(), snapshot.estimatedTotalOperations());
for (int i = 0; i < syncedDocs.size(); i++) {
Translog.Operation next = snapshot.next();
assertEquals(syncedDocs.get(i), next.getSource().source.toUtf8());
assertNotNull("operation " + i + " must be non-null", next);
}
Translog.Snapshot snapshot = translog.newSnapshot();
assertEquals(syncedDocs.size(), snapshot.totalOperations());
for (int i = 0; i < syncedDocs.size(); i++) {
Translog.Operation next = snapshot.next();
assertEquals(syncedDocs.get(i), next.getSource().source.toUtf8());
assertNotNull("operation " + i + " must be non-null", next);
}
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
@ -29,66 +28,32 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests for reading old and new translog files
*/
public class TranslogVersionTests extends ESTestCase {
public void testV0LegacyTranslogVersion() throws Exception {
Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v0.binary");
assertThat("test file should exist", Files.exists(translogFile), equalTo(true));
try (ImmutableTranslogReader reader = openReader(translogFile, 0)) {
assertThat("a version0 stream is returned", reader instanceof LegacyTranslogReader, equalTo(true));
try (final Translog.Snapshot snapshot = reader.newSnapshot()) {
final Translog.Operation operation = snapshot.next();
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true));
Translog.Index op = (Translog.Index) operation;
assertThat(op.id(), equalTo("1"));
assertThat(op.type(), equalTo("doc"));
assertThat(op.source().toUtf8(), equalTo("{\"body\": \"worda wordb wordc wordd \\\"worde\\\" wordf\"}"));
assertThat(op.routing(), equalTo(null));
assertThat(op.parent(), equalTo(null));
assertThat(op.version(), equalTo(1L));
assertThat(op.timestamp(), equalTo(1407312091791L));
assertThat(op.ttl(), equalTo(-1L));
assertThat(op.versionType(), equalTo(VersionType.INTERNAL));
assertNull(snapshot.next());
}
private void checkFailsToOpen(String file, String expectedMessage) throws IOException {
Path translogFile = getDataPath(file);
assertThat("test file should exist", Files.exists(translogFile), equalTo(true));
try {
openReader(translogFile, 0);
fail("should be able to open an old translog");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString(expectedMessage));
}
}
public void testV0LegacyTranslogVersion() throws Exception {
checkFailsToOpen("/org/elasticsearch/index/translog/translog-v0.binary", "pre-1.4 translog");
}
public void testV1ChecksummedTranslogVersion() throws Exception {
Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v1.binary");
assertThat("test file should exist", Files.exists(translogFile), equalTo(true));
try (ImmutableTranslogReader reader = openReader(translogFile, 0)) {
try (final Translog.Snapshot snapshot = reader.newSnapshot()) {
assertThat("a version1 stream is returned", reader instanceof ImmutableTranslogReader, equalTo(true));
Translog.Operation operation = snapshot.next();
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true));
Translog.Index op = (Translog.Index) operation;
assertThat(op.id(), equalTo("Bwiq98KFSb6YjJQGeSpeiw"));
assertThat(op.type(), equalTo("doc"));
assertThat(op.source().toUtf8(), equalTo("{\"body\": \"foo\"}"));
assertThat(op.routing(), equalTo(null));
assertThat(op.parent(), equalTo(null));
assertThat(op.version(), equalTo(1L));
assertThat(op.timestamp(), equalTo(1408627184844L));
assertThat(op.ttl(), equalTo(-1L));
assertThat(op.versionType(), equalTo(VersionType.INTERNAL));
// There are more operations
int opNum = 1;
while (snapshot.next() != null) {
opNum++;
}
assertThat("there should be 5 translog operations", opNum, equalTo(5));
}
}
checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1.binary", "pre-2.0 translog");
}
public void testCorruptedTranslogs() throws Exception {
@ -112,47 +77,17 @@ public class TranslogVersionTests extends ESTestCase {
e.getMessage().contains("Invalid first byte in translog file, got: 1, expected 0x00 or 0x3f"), equalTo(true));
}
try {
Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary");
assertThat("test file should exist", Files.exists(translogFile), equalTo(true));
try (ImmutableTranslogReader reader = openReader(translogFile, 0)) {
try (final Translog.Snapshot snapshot = reader.newSnapshot()) {
while(snapshot.next() != null) {
}
}
}
fail("should have thrown an exception about the body being corrupted");
} catch (TranslogCorruptedException e) {
assertThat("translog corruption from body: " + e.getMessage(),
e.getMessage().contains("translog corruption while reading from stream"), equalTo(true));
}
checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary", "pre-2.0 translog");
}
public void testTruncatedTranslog() throws Exception {
try {
Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v1-truncated.binary");
assertThat("test file should exist", Files.exists(translogFile), equalTo(true));
try (ImmutableTranslogReader reader = openReader(translogFile, 0)) {
try (final Translog.Snapshot snapshot = reader.newSnapshot()) {
while(snapshot.next() != null) {
}
}
}
fail("should have thrown an exception about the body being truncated");
} catch (TranslogCorruptedException e) {
assertThat("translog truncated: " + e.getMessage(),
e.getMessage().contains("operation size is corrupted must be"), equalTo(true));
}
checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-truncated.binary", "pre-2.0 translog");
}
public ImmutableTranslogReader openReader(Path path, long id) throws IOException {
public TranslogReader openReader(Path path, long id) throws IOException {
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
try {
final ChannelReference raf = new ChannelReference(path, id, channel, null);
ImmutableTranslogReader reader = ImmutableTranslogReader.open(raf, new Checkpoint(Files.size(path), TranslogReader.UNKNOWN_OP_COUNT, id), null);
TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id), null);
channel = null;
return reader;
} finally {