diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index ee3664a8ae5..640998a4449 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -72,7 +72,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.indices.warmer.IndicesWarmer; import org.elasticsearch.indices.warmer.InternalIndicesWarmer; import org.elasticsearch.threadpool.ThreadPool; @@ -348,14 +347,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin if (!get.loadSource()) { return new GetResult(true, versionValue.version(), null); } - byte[] data = translog.read(versionValue.translogLocation()); - if (data != null) { - try { - Translog.Source source = TranslogStreams.readSource(data); + try { + Translog.Source source = translog.readSource(versionValue.translogLocation()); + if (source != null) { return new GetResult(true, versionValue.version(), source); - } catch (IOException e) { - // switched on us, read it from the reader } + } catch (IOException e) { + // switched on us, read it from the reader } } } diff --git a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 2dd45562bd8..6d628e8b650 100644 --- a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -23,11 +23,11 @@ import com.google.common.collect.Sets; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -42,6 +42,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStream; import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.indices.recovery.RecoveryState; @@ -50,7 +51,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.EOFException; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; import java.util.Set; @@ -63,6 +63,8 @@ import java.util.concurrent.TimeUnit; */ public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { + private static final int RECOVERY_TRANSLOG_RENAME_RETRIES = 3; + private final ThreadPool threadPool; private final MappingUpdatedAction mappingUpdatedAction; private final IndexService indexService; @@ -198,7 +200,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen if (!tmpRecoveringFile.exists()) { File tmpTranslogFile = new File(translogLocation, translogName); if (tmpTranslogFile.exists()) { - for (int i = 0; i < 3; i++) { + for (int i = 0; i < RECOVERY_TRANSLOG_RENAME_RETRIES; i++) { if (tmpTranslogFile.renameTo(tmpRecoveringFile)) { recoveringTranslogFile = tmpRecoveringFile; break; @@ -228,17 +230,15 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen recoveryState.getTranslog().startTime(System.currentTimeMillis()); recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - FileInputStream fs = null; + TranslogStream stream = null; final Set typesToUpdate = Sets.newHashSet(); try { - fs = new FileInputStream(recoveringTranslogFile); - InputStreamStreamInput si = new InputStreamStreamInput(fs); + stream = TranslogStreams.translogStreamFor(recoveringTranslogFile); while (true) { Translog.Operation operation; try { - int opSize = si.readInt(); - operation = TranslogStreams.readTranslogOperation(si); + operation = stream.read(); } catch (EOFException e) { // ignore, not properly written the last op break; @@ -269,7 +269,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e); } finally { try { - fs.close(); + IOUtils.close(stream); } catch (IOException e) { // ignore } diff --git a/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java new file mode 100644 index 00000000000..6c83ed34b6b --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.store.BufferedChecksum; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * Similar to Lucene's BufferedChecksumIndexInput, however this wraps a + * {@link StreamInput} so anything read will update the checksum + */ +public final class BufferedChecksumStreamInput extends StreamInput { + private final StreamInput in; + private final Checksum digest; + + public BufferedChecksumStreamInput(StreamInput in) { + this.in = in; + this.digest = new BufferedChecksum(new CRC32()); + } + + public long getChecksum() { + return this.digest.getValue(); + } + + @Override + public byte readByte() throws IOException { + final byte b = in.readByte(); + digest.update(b); + return b; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + in.readBytes(b, offset, len); + digest.update(b, offset, len); + } + + @Override + public void reset() throws IOException { + in.reset(); + digest.reset(); + } + + @Override + public int read() throws IOException { + return readByte() & 0xFF; + } + + @Override + public void close() throws IOException { + in.close(); + } +} diff --git a/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java new file mode 100644 index 00000000000..3c3fbf1ecbb --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.store.BufferedChecksum; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * Similar to Lucene's BufferedChecksumIndexOutput, however this wraps a + * {@link StreamOutput} so anything written will update the checksum + */ +public final class BufferedChecksumStreamOutput extends StreamOutput { + private final StreamOutput out; + private final Checksum digest; + + public BufferedChecksumStreamOutput(StreamOutput out) { + this.out = out; + this.digest = new BufferedChecksum(new CRC32()); + } + + public long getChecksum() { + return this.digest.getValue(); + } + + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + digest.update(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + out.writeBytes(b, offset, length); + digest.update(b, offset, length); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void reset() throws IOException { + out.reset(); + digest.reset(); + } +} diff --git a/src/main/java/org/elasticsearch/index/translog/ChecksummedTranslogStream.java b/src/main/java/org/elasticsearch/index/translog/ChecksummedTranslogStream.java new file mode 100644 index 00000000000..2aa45bd5e61 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/ChecksummedTranslogStream.java @@ -0,0 +1,150 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.InputStreamDataInput; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; + +/** + * Version 1 of the translog file format. Writes a header to identify the + * format, also writes checksums for each operation + */ +public class ChecksummedTranslogStream implements TranslogStream { + + public static final int VERSION = 1; + + private final InputStreamStreamInput in; + private final boolean fileExists; + + ChecksummedTranslogStream(InputStreamStreamInput in, boolean fileExists) { + this.in = in; + // This could be a new file, in which case we can ignore reading and + // verifying the header + this.fileExists = fileExists; + if (fileExists) { + // The header must be read to advance the input stream + readAndVerifyHeader(); + } + } + + private void readAndVerifyHeader() { + assert this.in != null : "headers are only for translog files read from disk, not streaming operations"; + try { + CodecUtil.checkHeader(new InputStreamDataInput(this.in), TranslogStreams.TRANSLOG_CODEC, VERSION, VERSION); + } catch (IOException e) { + throw new TranslogCorruptedException("translog header corrupted", e); + } + } + + public Translog.Operation read() throws IOException { + if (this.fileExists == false) { + throw new IOException("translog file does not exist"); + } + assert this.fileExists : "cannot read from a stream for a file that does not exist"; + in.readInt(); // ignored operation size + return this.read(in); + } + + private void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { + // This absolutely must come first, or else reading the checksum becomes part of the checksum + long expectedChecksum = in.getChecksum(); + long readChecksum = in.readInt() & 0xFFFF_FFFFL; + if (readChecksum != expectedChecksum) { + throw new TranslogCorruptedException("translog stream is corrupted, expected: 0x" + + Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum)); + } + } + + @Override + public Translog.Operation read(StreamInput inStream) throws IOException { + // This BufferedChecksumStreamInput remains unclosed on purpose, + // because closing it closes the underlying stream, which we don't + // want to do here. + BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(inStream); + Translog.Operation operation; + try { + Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); + operation = TranslogStreams.newOperationFromType(type); + operation.readFrom(in); + } catch (AssertionError|Exception e) { + throw new TranslogCorruptedException("translog corruption while reading from stream", e); + } + verifyChecksum(in); + return operation; + } + + @Override + public Translog.Source readSource(byte[] data) throws IOException { + StreamInput nonChecksummingIn = new BytesStreamInput(data, false); + BufferedChecksumStreamInput in; + Translog.Source source; + try { + // the size header, not used and not part of the checksum + // because it is computed after the operation is written + nonChecksummingIn.readInt(); + // This BufferedChecksumStreamInput remains unclosed on purpose, + // because closing it closes the underlying stream, which we don't + // want to do here. + in = new BufferedChecksumStreamInput(nonChecksummingIn); + Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); + Translog.Operation operation = TranslogStreams.newOperationFromType(type); + source = operation.readSource(in); + } catch (AssertionError|Exception e) { + throw new TranslogCorruptedException("translog corruption while reading from byte array", e); + } + verifyChecksum(in); + return source; + } + + @Override + public void write(StreamOutput outStream, Translog.Operation op) throws IOException { + // This BufferedChecksumStreamOutput remains unclosed on purpose, + // because closing it closes the underlying stream, which we don't + // want to do here. + BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(outStream); + out.writeByte(op.opType().id()); + op.writeTo(out); + long checksum = out.getChecksum(); + out.writeInt((int)checksum); + } + + @Override + public int writeHeader(FileChannel channel) throws IOException { + // This OutputStreamDataOutput is intentionally not closed because + // closing it will close the FileChannel + OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(channel)); + CodecUtil.writeHeader(out, TranslogStreams.TRANSLOG_CODEC, VERSION); + return CodecUtil.headerLength(TranslogStreams.TRANSLOG_CODEC); + } + + @Override + public void close() throws IOException { + this.in.close(); + } +} diff --git a/src/main/java/org/elasticsearch/index/translog/LegacyTranslogStream.java b/src/main/java/org/elasticsearch/index/translog/LegacyTranslogStream.java new file mode 100644 index 00000000000..2f87dabcd96 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/LegacyTranslogStream.java @@ -0,0 +1,82 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.nio.channels.FileChannel; + +/** + * Version 0 of the translog format, there is no header in this file + */ +public class LegacyTranslogStream implements TranslogStream { + + private final InputStreamStreamInput in; + private final boolean fileExists; + + LegacyTranslogStream(InputStreamStreamInput in, boolean fileExists) { + this.in = in; + this.fileExists = fileExists; + } + + public Translog.Operation read() throws IOException { + assert this.fileExists : "cannot read from a stream for a file that does not exist"; + in.readInt(); // ignored operation size + return this.read(in); + } + + @Override + public Translog.Operation read(StreamInput in) throws IOException { + Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); + Translog.Operation operation = TranslogStreams.newOperationFromType(type); + operation.readFrom(in); + return operation; + } + + @Override + public Translog.Source readSource(byte[] data) throws IOException { + BytesStreamInput in = new BytesStreamInput(data, false); + in.readInt(); // the size header + Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); + Translog.Operation operation = TranslogStreams.newOperationFromType(type); + return operation.readSource(in); + } + + @Override + public void write(StreamOutput out, Translog.Operation op) throws IOException { + out.writeByte(op.opType().id()); + op.writeTo(out); + } + + @Override + public int writeHeader(FileChannel channel) { + // nothing, there is no header for version 0 translog files + return 0; + } + + @Override + public void close() throws IOException { + this.in.close(); + } +} diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index b0864b0de35..7ad83806c66 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -23,8 +23,8 @@ import org.apache.lucene.index.Term; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; @@ -41,7 +41,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShardComponent; import java.io.IOException; -import java.io.InputStream; /** @@ -106,6 +105,8 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent, byte[] read(Location location); + Translog.Source readSource(Location location) throws IOException; + /** * Snapshots the current transaction log allowing to safely iterate over the snapshot. */ @@ -165,6 +166,9 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent, */ long translogId(); + /** + * Returns the current position in the translog stream + */ long position(); /** @@ -177,11 +181,15 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent, */ int estimatedTotalOperations(); - boolean hasNext(); - + /** + * Returns the next operation, or null when no more operations are found + */ Operation next(); - void seekForward(long length); + /** + * Seek to the specified position in the translog stream + */ + void seekTo(long position); /** * The length in bytes of this stream. @@ -221,7 +229,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent, case 4: return DELETE_BY_QUERY; default: - throw new IllegalArgumentException("No type mapped for [" + id + "]"); + throw new ElasticsearchIllegalArgumentException("No type mapped for [" + id + "]"); } } } diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogCorruptedException.java b/src/main/java/org/elasticsearch/index/translog/TranslogCorruptedException.java new file mode 100644 index 00000000000..9196e3b0d49 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/TranslogCorruptedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.ElasticsearchException; + +public class TranslogCorruptedException extends ElasticsearchException { + public TranslogCorruptedException(String msg) { + super(msg); + } + + public TranslogCorruptedException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogStream.java b/src/main/java/org/elasticsearch/index/translog/TranslogStream.java new file mode 100644 index 00000000000..4a407129ab6 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/TranslogStream.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.channels.FileChannel; + +/** + * A translog stream that will read and write operations in the + * version-specific format + */ +public interface TranslogStream extends Closeable { + + /** + * Read the next operation from the translog file, the stream must + * have been created through {@link TranslogStreams#translogStreamFor(java.io.File)} + */ + public Translog.Operation read() throws IOException; + + /** + * Read the next translog operation from the input stream + */ + public Translog.Operation read(StreamInput in) throws IOException; + + /** + * Read a translog operation from the given byte array, returning the + * {@link Translog.Source} object from the Operation + */ + public Translog.Source readSource(byte[] data) throws IOException; + + /** + * Write the given translog operation to the output stream + */ + public void write(StreamOutput out, Translog.Operation op) throws IOException; + + /** + * Optionally write a header identifying the translog version to the + * file channel + */ + public int writeHeader(FileChannel channel) throws IOException; + + /** + * Close the stream opened with {@link TranslogStreams#translogStreamFor(java.io.File)} + */ + public void close() throws IOException; +} diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java b/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java index b25418a74b0..260d4b7c7a8 100644 --- a/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java +++ b/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java @@ -19,66 +19,152 @@ package org.elasticsearch.index.translog; -import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.InputStreamDataInput; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; /** - * + * Encapsulating class used for operating on translog streams. Static methods + * on this class use the latest version of the stream. */ public class TranslogStreams { - public static Translog.Operation readTranslogOperation(StreamInput in) throws IOException { - Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); - Translog.Operation operation; + /** V0, no header, no checksums */ + public static TranslogStream V0 = new LegacyTranslogStream(null, false); + /** V1, header, with per-op checksums */ + public static TranslogStream V1 = new ChecksummedTranslogStream(null, false); + + public static TranslogStream LATEST = V1; + + public static final String TRANSLOG_CODEC = "translog"; + private static final byte LUCENE_CODEC_HEADER_BYTE = 0x3f; + private static final byte UNVERSIONED_TRANSLOG_HEADER_BYTE = 0x00; + + /** + * Returns a new empty translog operation for the given {@link Translog.Operation.Type} + */ + static Translog.Operation newOperationFromType(Translog.Operation.Type type) throws IOException { switch (type) { case CREATE: - operation = new Translog.Create(); - break; + return new Translog.Create(); case DELETE: - operation = new Translog.Delete(); - break; + return new Translog.Delete(); case DELETE_BY_QUERY: - operation = new Translog.DeleteByQuery(); - break; + return new Translog.DeleteByQuery(); case SAVE: - operation = new Translog.Index(); - break; + return new Translog.Index(); default: throw new IOException("No type for [" + type + "]"); } - operation.readFrom(in); - return operation; } + /** + * Read the translog operation from the given byte array, returning the + * {@link Translog.Source} from the operation + */ public static Translog.Source readSource(byte[] data) throws IOException { - BytesStreamInput in = new BytesStreamInput(data, false); - in.readInt(); // the size header - Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); - Translog.Operation operation; - switch (type) { - case CREATE: - operation = new Translog.Create(); - break; - case DELETE: - operation = new Translog.Delete(); - break; - case DELETE_BY_QUERY: - operation = new Translog.DeleteByQuery(); - break; - case SAVE: - operation = new Translog.Index(); - break; - default: - throw new IOException("No type for [" + type + "]"); - } - return operation.readSource(in); + return LATEST.readSource(data); } + /** + * Read the next {@link Translog.Operation} from the stream using the + * latest translog version + */ + public static Translog.Operation readTranslogOperation(StreamInput in) throws IOException { + return LATEST.read(in); + } + + /** + * Write the {@link Translog.Operation} to the output stream using the + * latest translog version + */ public static void writeTranslogOperation(StreamOutput out, Translog.Operation op) throws IOException { - out.writeByte(op.opType().id()); - op.writeTo(out); + LATEST.write(out, op); + } + + /** + * Given a file, return a VersionedTranslogStream based on an + * optionally-existing header in the file. If the file does not exist, or + * has zero length, returns the latest version. If the header does not + * exist, assumes Version 0 of the translog file format. + * + * The caller is responsible for closing the TranslogStream. + * + * @throws IOException + */ + public static TranslogStream translogStreamFor(File translogFile) throws IOException { + // This stream will be passed to the translog stream, so closing the + // TranslogStream will close this. It is not closed here on purpose. + InputStreamStreamInput in = new InputStreamStreamInput(new FileInputStream(translogFile)); + + boolean success = false; + try (InputStreamStreamInput headerStream = new InputStreamStreamInput(new FileInputStream(translogFile));) { + if (translogFile.exists() == false || translogFile.length() == 0) { + // if it doesn't exist or has no data, use the latest version, + // there aren't any backwards compatibility issues + success = true; + return new ChecksummedTranslogStream(in, false); + } + // Lucene's CodecUtil writes a magic number of 0x3FD76C17 with the + // header, in binary this looks like: + // + // binary: 0011 1111 1101 0111 0110 1100 0001 0111 + // hex : 3 f d 7 6 c 1 7 + // + // With version 0 of the translog, the first byte is the + // Operation.Type, which will always be between 0-4, so we know if + // we grab the first byte, it can be: + // 0x3f => Lucene's magic number, so we can assume it's version 1 or later + // 0x00 => version 0 of the translog + // + // otherwise the first byte of the translog is corrupted and we + // should bail + byte b1 = headerStream.readByte(); + if (b1 == LUCENE_CODEC_HEADER_BYTE) { + // Read 3 more bytes, meaning a whole integer has been read + byte b2 = headerStream.readByte(); + byte b3 = headerStream.readByte(); + byte b4 = headerStream.readByte(); + // Convert the 4 bytes that were read into an integer + int header = ((b1 & 0xFF) << 24) + ((b2 & 0xFF) << 16) + ((b3 & 0xFF) << 8) + ((b4 & 0xFF) << 0); + // We confirm CodecUtil's CODEC_MAGIC number (0x3FD76C17) + // ourselves here, because it allows us to read the first + // byte separately + if (header != CodecUtil.CODEC_MAGIC) { + throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header"); + } + // Confirm the rest of the header using CodecUtil, extracting + // the translog version + int version = CodecUtil.checkHeaderNoMagic(new InputStreamDataInput(headerStream), TRANSLOG_CODEC, 1, Integer.MAX_VALUE); + switch (version) { + case ChecksummedTranslogStream.VERSION: + success = true; + return new ChecksummedTranslogStream(in, true); + default: + throw new TranslogCorruptedException("No known translog stream version: " + version); + } + } else if (b1 == UNVERSIONED_TRANSLOG_HEADER_BYTE) { + success = true; + return new LegacyTranslogStream(in, true); + } else { + throw new TranslogCorruptedException("Invalid first byte in translog file, got: " + Long.toHexString(b1) + ", expected 0x00 or 0x3f"); + } + } catch (CorruptIndexException e) { + throw new TranslogCorruptedException("Translog header corrupted", e); + } finally { + // something happened, we should close the stream just so it's + // not dangling + if (success == false) { + IOUtils.close(in); + } + } } } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java index aab1061b844..6a803a96ef4 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java @@ -22,6 +22,8 @@ package org.elasticsearch.index.translog.fs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Channels; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.TranslogStream; +import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; @@ -38,6 +40,8 @@ public class BufferingFsTranslogFile implements FsTranslogFile { private final long id; private final ShardId shardId; private final RafReference raf; + private final TranslogStream translogStream; + private final int headerSize; private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -59,6 +63,11 @@ public class BufferingFsTranslogFile implements FsTranslogFile { this.raf = raf; this.buffer = new byte[bufferSize]; raf.raf().setLength(0); + this.translogStream = TranslogStreams.translogStreamFor(this.raf.file()); + this.headerSize = this.translogStream.writeHeader(raf.channel()); + this.lastPosition += headerSize; + this.lastWrittenPosition += headerSize; + this.lastSyncPosition += headerSize; } public long id() { @@ -137,6 +146,7 @@ public class BufferingFsTranslogFile implements FsTranslogFile { try { flushBuffer(); FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter); + snapshot.seekTo(this.headerSize); success = true; return snapshot; } catch (Exception e) { @@ -158,6 +168,11 @@ public class BufferingFsTranslogFile implements FsTranslogFile { return lastPosition != lastSyncPosition; } + @Override + public TranslogStream getStream() { + return this.translogStream; + } + @Override public void sync() throws IOException { if (!syncNeeded()) { @@ -182,6 +197,7 @@ public class BufferingFsTranslogFile implements FsTranslogFile { if (!delete) { try { sync(); + translogStream.close(); } catch (Exception e) { throw new TranslogException(shardId, "failed to sync on close", e); } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java index 1245f7aa614..2a14fa258c0 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java @@ -22,11 +22,12 @@ package org.elasticsearch.index.translog.fs; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStreams; +import org.elasticsearch.index.translog.Translog; import java.io.EOFException; import java.io.FileNotFoundException; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,7 +49,7 @@ public class FsChannelSnapshot implements Translog.Snapshot { private Translog.Operation lastOperationRead = null; - private int position = 0; + private long position = 0; private ByteBuffer cacheBuffer; @@ -92,10 +93,10 @@ public class FsChannelSnapshot implements Translog.Snapshot { } @Override - public boolean hasNext() { + public Translog.Operation next() { try { if (position >= length) { - return false; + return null; } if (cacheBuffer == null) { cacheBuffer = ByteBuffer.allocate(1024); @@ -103,7 +104,8 @@ public class FsChannelSnapshot implements Translog.Snapshot { cacheBuffer.limit(4); int bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer); if (bytesRead < 0) { - // the snapshot is acquired under a write lock. we should never read beyond the EOF + // the snapshot is acquired under a write lock. we should never + // read beyond the EOF, must be an abrupt EOF throw new EOFException("read past EOF. pos [" + position + "] length: [" + cacheBuffer.limit() + "] end: [" + channel.size() + "]"); } assert bytesRead == 4; @@ -111,7 +113,8 @@ public class FsChannelSnapshot implements Translog.Snapshot { int opSize = cacheBuffer.getInt(); position += 4; if ((position + opSize) > length) { - // the snapshot is acquired under a write lock. we should never read beyond the EOF + // the snapshot is acquired under a write lock. we should never + // read beyond the EOF, must be an abrupt EOF position -= 4; throw new EOFException("opSize of [" + opSize + "] pointed beyond EOF. position [" + position + "] length [" + length + "]"); } @@ -122,25 +125,21 @@ public class FsChannelSnapshot implements Translog.Snapshot { cacheBuffer.limit(opSize); bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer); if (bytesRead < 0) { + // the snapshot is acquired under a write lock. we should never + // read beyond the EOF, must be an abrupt EOF throw new EOFException("tried to read past EOF. opSize [" + opSize + "] position [" + position + "] length [" + length + "]"); } cacheBuffer.flip(); position += opSize; - lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true)); - return true; - } catch (Exception e) { - return false; + return TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true)); + } catch (IOException e) { + throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.raf.file(), e); } } @Override - public Translog.Operation next() { - return this.lastOperationRead; - } - - @Override - public void seekForward(long length) { - this.position += length; + public void seekTo(long position) { + this.position = position; } @Override diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 6f7e2d9f9fb..e8783552530 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -34,10 +34,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogException; -import org.elasticsearch.index.translog.TranslogStats; -import org.elasticsearch.index.translog.TranslogStreams; +import org.elasticsearch.index.translog.*; import java.io.File; import java.io.IOException; @@ -315,30 +312,58 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } + /** + * Returns the translog that should be read for the specified location. If + * the transient or current translog does not match, returns null + */ + private FsTranslogFile translogForLocation(Location location) { + if (trans != null && trans.id() == location.translogId) { + return this.trans; + } + if (current.id() == location.translogId) { + return this.current; + } + return null; + } + + /** + * Private read method that reads from either the transient translog (if + * applicable), or the current translog. Acquires the read lock + * before reading. + * @return byte array of read data + */ public byte[] read(Location location) { rwl.readLock().lock(); try { - FsTranslogFile trans = this.trans; - if (trans != null && trans.id() == location.translogId) { + FsTranslogFile trans = translogForLocation(location); + if (trans != null) { try { return trans.read(location); } catch (Exception e) { // ignore } } - if (current.id() == location.translogId) { - try { - return current.read(location); - } catch (Exception e) { - // ignore - } - } return null; } finally { rwl.readLock().unlock(); } } + /** + * Read the Source object from the given location, returns null if the + * source could not be read. + */ + @Override + public Source readSource(Location location) throws IOException { + byte[] data = this.read(location); + if (data == null) { + return null; + } + // Return the source using the current version of the stream based on + // which translog is being read + return this.translogForLocation(location).getStream().readSource(data); + } + @Override public Location add(Operation operation) throws TranslogException { rwl.readLock().lock(); @@ -401,7 +426,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog public Snapshot snapshot(Snapshot snapshot) { FsChannelSnapshot snap = snapshot(); if (snap.translogId() == snapshot.translogId()) { - snap.seekForward(snapshot.position()); + snap.seekTo(snapshot.position()); } return snap; } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java index 4e573f6211c..b390f782412 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.index.translog.TranslogStream; import java.io.IOException; @@ -77,4 +78,6 @@ public interface FsTranslogFile { void sync() throws IOException; boolean syncNeeded(); + + TranslogStream getStream(); } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java index afcfe86516d..e5f899eb759 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java @@ -22,6 +22,8 @@ package org.elasticsearch.index.translog.fs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Channels; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.TranslogStream; +import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; @@ -38,6 +40,8 @@ public class SimpleFsTranslogFile implements FsTranslogFile { private final RafReference raf; private final AtomicBoolean closed = new AtomicBoolean(); private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final TranslogStream translogStream; + private final int headerSize; private volatile int operationCounter = 0; @@ -51,6 +55,11 @@ public class SimpleFsTranslogFile implements FsTranslogFile { this.id = id; this.raf = raf; raf.raf().setLength(0); + this.translogStream = TranslogStreams.translogStreamFor(this.raf.file()); + this.headerSize = this.translogStream.writeHeader(raf.channel()); + this.lastPosition += headerSize; + this.lastWrittenPosition += headerSize; + this.lastSyncPosition += headerSize; } public long id() { @@ -96,6 +105,7 @@ public class SimpleFsTranslogFile implements FsTranslogFile { if (!delete) { try { sync(); + translogStream.close(); } catch (Exception e) { throw new TranslogException(shardId, "failed to sync on close", e); } @@ -115,6 +125,7 @@ public class SimpleFsTranslogFile implements FsTranslogFile { rwl.writeLock().lock(); try { FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter); + snapshot.seekTo(this.headerSize); success = true; return snapshot; } finally { @@ -136,6 +147,11 @@ public class SimpleFsTranslogFile implements FsTranslogFile { return lastWrittenPosition != lastSyncPosition; } + @Override + public TranslogStream getStream() { + return this.translogStream; + } + public void sync() throws IOException { // check if we really need to sync here... if (!syncNeeded()) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 9a8a415de42..805c9c7ac4d 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -384,11 +384,12 @@ public class RecoverySource extends AbstractComponent { long size = 0; int totalOperations = 0; List operations = Lists.newArrayList(); - while (snapshot.hasNext()) { + Translog.Operation operation = snapshot.next(); + while (operation != null) { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } - Translog.Operation operation = snapshot.next(); + operations.add(operation); ops += 1; size += operation.estimateSize(); @@ -409,6 +410,7 @@ public class RecoverySource extends AbstractComponent { size = 0; operations.clear(); } + operation = snapshot.next(); } // send the leftover if (!operations.isEmpty()) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 18567323685..d7e2013ed2e 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -20,11 +20,12 @@ package org.elasticsearch.indices.recovery; import com.google.common.collect.Lists; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStreams; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -68,7 +69,12 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { int size = in.readVInt(); operations = Lists.newArrayListWithExpectedSize(size); for (int i = 0; i < size; i++) { - operations.add(TranslogStreams.readTranslogOperation(in)); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + operations.add(TranslogStreams.V1.read(in)); + } else { + operations.add(TranslogStreams.V0.read(in)); + } + } } @@ -79,7 +85,11 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { shardId.writeTo(out); out.writeVInt(operations.size()); for (Translog.Operation operation : operations) { - TranslogStreams.writeTranslogOperation(out, operation); + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + TranslogStreams.V1.write(out, operation); + } else { + TranslogStreams.V0.write(out, operation); + } } } } diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index fa4adff1707..2017e203fd1 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -746,10 +746,10 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Override public void phase2(Translog.Snapshot snapshot) throws EngineException { - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); + assertThat("translog snapshot should not read null", create != null, equalTo(true)); assertThat(create.source().toBytesArray(), equalTo(B_2)); - assertThat(snapshot.hasNext(), equalTo(false)); + assertThat(snapshot.next(), equalTo(null)); } @Override @@ -777,9 +777,9 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Override public void phase2(Translog.Snapshot snapshot) throws EngineException { - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); - assertThat(snapshot.hasNext(), equalTo(false)); + assertThat(create != null, equalTo(true)); + assertThat(snapshot.next(), equalTo(null)); assertThat(create.source().toBytesArray(), equalTo(B_2)); // add for phase3 @@ -789,9 +789,9 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Override public void phase3(Translog.Snapshot snapshot) throws EngineException { - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); - assertThat(snapshot.hasNext(), equalTo(false)); + assertThat(create != null, equalTo(true)); + assertThat(snapshot.next(), equalTo(null)); assertThat(create.source().toBytesArray(), equalTo(B_3)); } }); diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java new file mode 100644 index 00000000000..b572eb057b3 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java @@ -0,0 +1,145 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.monitor.fs.FsStats; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportModule; +import org.junit.Test; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.Set; +import java.util.TreeSet; + +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Integration test for corrupted translog files + */ +@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes =0) +public class CorruptedTranslogTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + // we really need local GW here since this also checks for corruption etc. + // and we need to make sure primaries are not just trashed if we don't have replicas + .put(super.nodeSettings(nodeOrdinal)).put("gateway.type", "local") + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build(); + } + + @Test + public void testCorruptTranslogFiles() throws Exception { + internalCluster().startNodesAsync(1, ImmutableSettings.EMPTY).get(); + + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", "-1") + .put("index.gateway.local.sync", "1s") // fsync the translog every second + )); + ensureYellow(); + + // Index some documents + int numDocs = scaledRandomIntBetween(100, 1000); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); + } + indexRandom(false, false, builders); + + // Corrupt the translog file(s) + corruptRandomTranslogFiles(); + + // Restart the single node + internalCluster().fullRestart(); + // node needs time to start recovery and discover the translog corruption + sleep(1000); + + try { + client().prepareSearch("test").setQuery(matchAllQuery()).get(); + fail("all shards should be failed due to a corrupted translog"); + } catch (SearchPhaseExecutionException e) { + // Good, all shards should be failed because there is only a + // single shard and its translog is corrupt + } + } + + + private void corruptRandomTranslogFiles() throws IOException { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false); + ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators()); + ShardRouting shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertTrue(shardRouting.primary()); + assertTrue(shardRouting.assignedToNode()); + String nodeId = shardRouting.currentNodeId(); + NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); + Set files = new TreeSet<>(); // treeset makes sure iteration order is deterministic + for (FsStats.Info info : nodeStatses.getNodes()[0].getFs()) { + String path = info.getPath(); + final String relativeDataLocationPath = "indices/test/" + Integer.toString(shardRouting.getId()) + "/translog"; + File file = new File(path, relativeDataLocationPath); + logger.info("--> path: {}", file); + files.addAll(Arrays.asList(file.listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + logger.info("--> File: {}", pathname); + return pathname.isFile() && pathname.getName().startsWith("translog-"); + } + }))); + } + File fileToCorrupt = null; + if (!files.isEmpty()) { + int corruptions = randomIntBetween(5, 20); + for (int i = 0; i < corruptions; i++) { + fileToCorrupt = RandomPicks.randomFrom(getRandom(), files); + try (RandomAccessFile raf = new RandomAccessFile(fileToCorrupt, "rw")) { + raf.seek(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.length() - 1))); + long filePointer = raf.getFilePointer(); + byte b = raf.readByte(); + raf.seek(filePointer); + raf.writeByte(~b); + raf.getFD().sync(); + logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}", fileToCorrupt.getName(), filePointer, Integer.toHexString(b), Integer.toHexString(~b), fileToCorrupt); + } + } + } + assertThat("no file corrupted", fileToCorrupt, notNullValue()); + } +} diff --git a/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java b/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java index 73786bc9c18..521c7cc5f0c 100644 --- a/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java @@ -34,12 +34,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import static com.google.common.collect.Lists.newArrayList; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; /** * @@ -65,6 +71,8 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase protected abstract Translog create(); + protected abstract String translogFileDirectory(); + @Test public void testRead() throws IOException { Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1})); @@ -145,23 +153,23 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase snapshot = translog.snapshot(); - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); + assertThat(create != null, equalTo(true)); assertThat(create.source().toBytes(), equalTo(new byte[]{1})); - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Index index = (Translog.Index) snapshot.next(); + assertThat(index != null, equalTo(true)); assertThat(index.source().toBytes(), equalTo(new byte[]{2})); - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Delete delete = (Translog.Delete) snapshot.next(); + assertThat(delete != null, equalTo(true)); assertThat(delete.uid(), equalTo(newUid("3"))); - assertThat(snapshot.hasNext(), equalTo(true)); Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) snapshot.next(); + assertThat(deleteByQuery != null, equalTo(true)); assertThat(deleteByQuery.source().toBytes(), equalTo(new byte[]{4})); - assertThat(snapshot.hasNext(), equalTo(false)); + assertThat(snapshot.next(), equalTo(null)); snapshot.close(); @@ -188,17 +196,20 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase snapshot.close(); snapshot = translog.snapshot(); - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Create create = (Translog.Create) snapshot.next(); + assertThat(create != null, equalTo(true)); assertThat(create.source().toBytes(), equalTo(new byte[]{1})); snapshot.close(); Translog.Snapshot snapshot1 = translog.snapshot(); - // we use the translogSize to also navigate to the last position on this snapshot - // so snapshot(Snapshot) will work properly MatcherAssert.assertThat(snapshot1, TranslogSizeMatcher.translogSize(1)); assertThat(snapshot1.estimatedTotalOperations(), equalTo(1)); + // seek to the end of the translog snapshot + while (snapshot1.next() != null) { + // spin + } + translog.add(new Translog.Index("test", "2", new byte[]{2})); snapshot = translog.snapshot(snapshot1); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1)); @@ -206,10 +217,10 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase snapshot.close(); snapshot = translog.snapshot(snapshot1); - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Index index = (Translog.Index) snapshot.next(); + assertThat(index != null, equalTo(true)); assertThat(index.source().toBytes(), equalTo(new byte[]{2})); - assertThat(snapshot.hasNext(), equalTo(false)); + assertThat(snapshot.next(), equalTo(null)); assertThat(snapshot.estimatedTotalOperations(), equalTo(2)); snapshot.close(); snapshot1.close(); @@ -235,17 +246,17 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase snapshot.close(); snapshot = translog.snapshot(actualSnapshot); - assertThat(snapshot.hasNext(), equalTo(true)); Translog.Index index = (Translog.Index) snapshot.next(); + assertThat(index != null, equalTo(true)); assertThat(index.source().toBytes(), equalTo(new byte[]{3})); - assertThat(snapshot.hasNext(), equalTo(false)); + assertThat(snapshot.next(), equalTo(null)); actualSnapshot.close(); snapshot.close(); } @Test - public void testSnapshotWithSeekForward() { + public void testSnapshotWithSeekTo() { Translog.Snapshot snapshot = translog.snapshot(); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0)); snapshot.close(); @@ -253,19 +264,23 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase translog.add(new Translog.Create("test", "1", new byte[]{1})); snapshot = translog.snapshot(); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1)); + // seek to the end of the translog snapshot + while (snapshot.next() != null) { + // spin + } long lastPosition = snapshot.position(); snapshot.close(); translog.add(new Translog.Create("test", "2", new byte[]{1})); snapshot = translog.snapshot(); - snapshot.seekForward(lastPosition); + snapshot.seekTo(lastPosition); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1)); snapshot.close(); snapshot = translog.snapshot(); - snapshot.seekForward(lastPosition); - assertThat(snapshot.hasNext(), equalTo(true)); + snapshot.seekTo(lastPosition); Translog.Create create = (Translog.Create) snapshot.next(); + assertThat(create != null, equalTo(true)); assertThat(create.id(), equalTo("2")); snapshot.close(); } @@ -397,6 +412,50 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase } + @Test + public void testTranslogChecksums() throws Exception { + List locations = newArrayList(); + + int translogOperations = randomIntBetween(10, 100); + for (int op = 0; op < translogOperations; op++) { + String ascii = randomAsciiOfLengthBetween(1, 50); + locations.add(translog.add(new Translog.Create("test", "" + op, ascii.getBytes("UTF-8")))); + } + translog.sync(); + + corruptTranslogs(translogFileDirectory()); + + AtomicInteger corruptionsCaught = new AtomicInteger(0); + for (Translog.Location location : locations) { + try { + TranslogStreams.readSource(translog.read(location)); + } catch (TranslogCorruptedException e) { + corruptionsCaught.incrementAndGet(); + } + } + assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1)); + } + + /** + * Randomly overwrite some bytes in the translog files + */ + private void corruptTranslogs(String directory) throws Exception { + File[] files = new File(directory).listFiles(); + if (files != null) { + for (File file : files) { + if (file.getName().startsWith("translog-")) { + logger.info("--> corrupting {}...", file.getName()); + RandomAccessFile f = new RandomAccessFile(file, "rw"); + int corruptions = scaledRandomIntBetween(10, 50); + for (int i = 0; i < corruptions; i++) { + f.seek(randomIntBetween(0, (int)f.length())); + f.write(randomByte()); + } + f.close(); + } + } + } + } private Term newUid(String id) { return new Term("_uid", id); diff --git a/src/test/java/org/elasticsearch/index/translog/TranslogSizeMatcher.java b/src/test/java/org/elasticsearch/index/translog/TranslogSizeMatcher.java index 03cf701a82b..2ee0148204c 100644 --- a/src/test/java/org/elasticsearch/index/translog/TranslogSizeMatcher.java +++ b/src/test/java/org/elasticsearch/index/translog/TranslogSizeMatcher.java @@ -37,11 +37,17 @@ public class TranslogSizeMatcher extends TypeSafeMatcher { @Override public boolean matchesSafely(Translog.Snapshot snapshot) { int count = 0; - while (snapshot.hasNext()) { - snapshot.next(); - count++; + long startingPosition = snapshot.position(); + try { + while (snapshot.next() != null) { + count++; + } + return size == count; + } finally { + // Since counting the translog size consumes the stream, reset it + // back to the origin position after reading + snapshot.seekTo(startingPosition); } - return size == count; } @Override diff --git a/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java new file mode 100644 index 00000000000..cb60e953221 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.index.VersionType; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.EOFException; +import java.io.File; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for reading old and new translog files + */ +public class TranslogVersionTests extends ElasticsearchTestCase { + + @Test + public void testV0LegacyTranslogVersion() throws Exception { + File translogFile = getResource("/org/elasticsearch/index/translog/translog-v0.binary"); + assertThat("test file should exist", translogFile.exists(), equalTo(true)); + TranslogStream stream = TranslogStreams.translogStreamFor(translogFile); + assertThat("a version0 stream is returned", stream instanceof LegacyTranslogStream, equalTo(true)); + + Translog.Operation operation = stream.read(); + + assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.SAVE, equalTo(true)); + Translog.Index op = (Translog.Index) operation; + assertThat(op.id(), equalTo("1")); + assertThat(op.type(), equalTo("doc")); + assertThat(op.source().toUtf8(), equalTo("{\"body\": \"worda wordb wordc wordd \\\"worde\\\" wordf\"}")); + assertThat(op.routing(), equalTo(null)); + assertThat(op.parent(), equalTo(null)); + assertThat(op.version(), equalTo(1L)); + assertThat(op.timestamp(), equalTo(1407312091791L)); + assertThat(op.ttl(), equalTo(-1L)); + assertThat(op.versionType(), equalTo(VersionType.INTERNAL)); + + try { + stream.read(); + fail("should have been the end of the file"); + } catch (EOFException e) { + // success + } finally { + stream.close(); + } + } + + @Test + public void testV1ChecksummedTranslogVersion() throws Exception { + File translogFile = getResource("/org/elasticsearch/index/translog/translog-v1.binary"); + assertThat("test file should exist", translogFile.exists(), equalTo(true)); + TranslogStream stream = TranslogStreams.translogStreamFor(translogFile); + assertThat("a version1 stream is returned", stream instanceof ChecksummedTranslogStream, equalTo(true)); + + Translog.Operation operation = stream.read(); + + assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.CREATE, equalTo(true)); + Translog.Create op = (Translog.Create) operation; + assertThat(op.id(), equalTo("Bwiq98KFSb6YjJQGeSpeiw")); + assertThat(op.type(), equalTo("doc")); + assertThat(op.source().toUtf8(), equalTo("{\"body\": \"foo\"}")); + assertThat(op.routing(), equalTo(null)); + assertThat(op.parent(), equalTo(null)); + assertThat(op.version(), equalTo(1L)); + assertThat(op.timestamp(), equalTo(1408627184844L)); + assertThat(op.ttl(), equalTo(-1L)); + assertThat(op.versionType(), equalTo(VersionType.INTERNAL)); + + // There are more operations + int opNum = 1; + while (true) { + try { + stream.read(); + opNum++; + } catch (EOFException e) { + break; + } + } + assertThat("there should be 5 translog operations", opNum, equalTo(5)); + stream.close(); + } + + @Test + public void testCorruptedTranslogs() throws Exception { + try { + File translogFile = getResource("/org/elasticsearch/index/translog/translog-v1-corrupted-magic.binary"); + assertThat("test file should exist", translogFile.exists(), equalTo(true)); + TranslogStream stream = TranslogStreams.translogStreamFor(translogFile); + fail("should have thrown an exception about the header being corrupt"); + } catch (TranslogCorruptedException e) { + assertThat("translog corruption from header: " + e.getMessage(), + e.getMessage().contains("translog looks like version 1 or later, but has corrupted header"), equalTo(true)); + } + + try { + File translogFile = getResource("/org/elasticsearch/index/translog/translog-invalid-first-byte.binary"); + assertThat("test file should exist", translogFile.exists(), equalTo(true)); + TranslogStream stream = TranslogStreams.translogStreamFor(translogFile); + fail("should have thrown an exception about the header being corrupt"); + } catch (TranslogCorruptedException e) { + assertThat("translog corruption from header: " + e.getMessage(), + e.getMessage().contains("Invalid first byte in translog file, got: 1, expected 0x00 or 0x3f"), equalTo(true)); + } + + try { + File translogFile = getResource("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary"); + assertThat("test file should exist", translogFile.exists(), equalTo(true)); + TranslogStream stream = TranslogStreams.translogStreamFor(translogFile); + while (true) { + try { + stream.read(); + } catch (EOFException e) { + break; + } + } + fail("should have thrown an exception about the body being corrupted"); + } catch (TranslogCorruptedException e) { + assertThat("translog corruption from body: " + e.getMessage(), + e.getMessage().contains("translog stream is corrupted"), equalTo(true)); + } + + } +} diff --git a/src/test/java/org/elasticsearch/index/translog/fs/FsBufferedTranslogTests.java b/src/test/java/org/elasticsearch/index/translog/fs/FsBufferedTranslogTests.java index 93bf8863102..346d8cfd18b 100644 --- a/src/test/java/org/elasticsearch/index/translog/fs/FsBufferedTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/translog/fs/FsBufferedTranslogTests.java @@ -39,10 +39,15 @@ public class FsBufferedTranslogTests extends AbstractSimpleTranslogTests { .put("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name()) .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024)) .build(), - new File("data/fs-buf-translog") + new File(translogFileDirectory()) ); } + @Override + protected String translogFileDirectory() { + return "data/fs-buf-translog"; + } + @AfterClass public static void cleanup() { FileSystemUtils.deleteRecursively(new File("data/fs-buf-translog"), true); diff --git a/src/test/java/org/elasticsearch/index/translog/fs/FsSimpleTranslogTests.java b/src/test/java/org/elasticsearch/index/translog/fs/FsSimpleTranslogTests.java index de1a3d22ef8..ceb02640e5e 100644 --- a/src/test/java/org/elasticsearch/index/translog/fs/FsSimpleTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/translog/fs/FsSimpleTranslogTests.java @@ -36,7 +36,12 @@ public class FsSimpleTranslogTests extends AbstractSimpleTranslogTests { protected Translog create() { return new FsTranslog(shardId, ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.SIMPLE.name()).build(), - new File("data/fs-simple-translog")); + new File(translogFileDirectory())); + } + + @Override + protected String translogFileDirectory() { + return "data/fs-simple-translog"; } @AfterClass diff --git a/src/test/resources/org/elasticsearch/index/translog/translog-invalid-first-byte.binary b/src/test/resources/org/elasticsearch/index/translog/translog-invalid-first-byte.binary new file mode 100644 index 00000000000..2eb76cf956f Binary files /dev/null and b/src/test/resources/org/elasticsearch/index/translog/translog-invalid-first-byte.binary differ diff --git a/src/test/resources/org/elasticsearch/index/translog/translog-v0.binary b/src/test/resources/org/elasticsearch/index/translog/translog-v0.binary new file mode 100644 index 00000000000..303bb2ef50e Binary files /dev/null and b/src/test/resources/org/elasticsearch/index/translog/translog-v0.binary differ diff --git a/src/test/resources/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary b/src/test/resources/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary new file mode 100644 index 00000000000..d74970f18b2 Binary files /dev/null and b/src/test/resources/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary differ diff --git a/src/test/resources/org/elasticsearch/index/translog/translog-v1-corrupted-magic.binary b/src/test/resources/org/elasticsearch/index/translog/translog-v1-corrupted-magic.binary new file mode 100644 index 00000000000..9f23966a413 Binary files /dev/null and b/src/test/resources/org/elasticsearch/index/translog/translog-v1-corrupted-magic.binary differ diff --git a/src/test/resources/org/elasticsearch/index/translog/translog-v1.binary b/src/test/resources/org/elasticsearch/index/translog/translog-v1.binary new file mode 100644 index 00000000000..f166c8a571e Binary files /dev/null and b/src/test/resources/org/elasticsearch/index/translog/translog-v1.binary differ