Add translog checksums

Switches TranslogStreams to check a header in the file to determine the
translog format, delegating to the version-specific stream.

Version 1 of the translog format writes a header using Lucene's
CodecUtil at the beginning of the file and appends a checksum for each
translog operation written.

Also refactors much of the translog operations, such as merging
.hasNext() and .next() in FsChannelSnapshot

Relates to #6554
This commit is contained in:
Lee Hinman 2014-07-25 15:23:37 +02:00
parent b745b0151c
commit eaf392163c
29 changed files with 1127 additions and 124 deletions

View File

@ -72,7 +72,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.warmer.IndicesWarmer; import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer; import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -348,14 +347,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
if (!get.loadSource()) { if (!get.loadSource()) {
return new GetResult(true, versionValue.version(), null); return new GetResult(true, versionValue.version(), null);
} }
byte[] data = translog.read(versionValue.translogLocation()); try {
if (data != null) { Translog.Source source = translog.readSource(versionValue.translogLocation());
try { if (source != null) {
Translog.Source source = TranslogStreams.readSource(data);
return new GetResult(true, versionValue.version(), source); return new GetResult(true, versionValue.version(), source);
} catch (IOException e) {
// switched on us, read it from the reader
} }
} catch (IOException e) {
// switched on us, read it from the reader
} }
} }
} }

View File

@ -23,11 +23,11 @@ import com.google.common.collect.Sets;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -42,6 +42,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStream;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
@ -50,7 +51,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Set; import java.util.Set;
@ -63,6 +63,8 @@ import java.util.concurrent.TimeUnit;
*/ */
public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
private static final int RECOVERY_TRANSLOG_RENAME_RETRIES = 3;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final MappingUpdatedAction mappingUpdatedAction; private final MappingUpdatedAction mappingUpdatedAction;
private final IndexService indexService; private final IndexService indexService;
@ -198,7 +200,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
if (!tmpRecoveringFile.exists()) { if (!tmpRecoveringFile.exists()) {
File tmpTranslogFile = new File(translogLocation, translogName); File tmpTranslogFile = new File(translogLocation, translogName);
if (tmpTranslogFile.exists()) { if (tmpTranslogFile.exists()) {
for (int i = 0; i < 3; i++) { for (int i = 0; i < RECOVERY_TRANSLOG_RENAME_RETRIES; i++) {
if (tmpTranslogFile.renameTo(tmpRecoveringFile)) { if (tmpTranslogFile.renameTo(tmpRecoveringFile)) {
recoveringTranslogFile = tmpRecoveringFile; recoveringTranslogFile = tmpRecoveringFile;
break; break;
@ -228,17 +230,15 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
recoveryState.getTranslog().startTime(System.currentTimeMillis()); recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
FileInputStream fs = null; TranslogStream stream = null;
final Set<String> typesToUpdate = Sets.newHashSet(); final Set<String> typesToUpdate = Sets.newHashSet();
try { try {
fs = new FileInputStream(recoveringTranslogFile); stream = TranslogStreams.translogStreamFor(recoveringTranslogFile);
InputStreamStreamInput si = new InputStreamStreamInput(fs);
while (true) { while (true) {
Translog.Operation operation; Translog.Operation operation;
try { try {
int opSize = si.readInt(); operation = stream.read();
operation = TranslogStreams.readTranslogOperation(si);
} catch (EOFException e) { } catch (EOFException e) {
// ignore, not properly written the last op // ignore, not properly written the last op
break; break;
@ -269,7 +269,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e); throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e);
} finally { } finally {
try { try {
fs.close(); IOUtils.close(stream);
} catch (IOException e) { } catch (IOException e) {
// ignore // ignore
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.apache.lucene.store.BufferedChecksum;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* Similar to Lucene's BufferedChecksumIndexInput, however this wraps a
* {@link StreamInput} so anything read will update the checksum
*/
public final class BufferedChecksumStreamInput extends StreamInput {
private final StreamInput in;
private final Checksum digest;
public BufferedChecksumStreamInput(StreamInput in) {
this.in = in;
this.digest = new BufferedChecksum(new CRC32());
}
public long getChecksum() {
return this.digest.getValue();
}
@Override
public byte readByte() throws IOException {
final byte b = in.readByte();
digest.update(b);
return b;
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
digest.update(b, offset, len);
}
@Override
public void reset() throws IOException {
in.reset();
digest.reset();
}
@Override
public int read() throws IOException {
return readByte() & 0xFF;
}
@Override
public void close() throws IOException {
in.close();
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.apache.lucene.store.BufferedChecksum;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* Similar to Lucene's BufferedChecksumIndexOutput, however this wraps a
* {@link StreamOutput} so anything written will update the checksum
*/
public final class BufferedChecksumStreamOutput extends StreamOutput {
private final StreamOutput out;
private final Checksum digest;
public BufferedChecksumStreamOutput(StreamOutput out) {
this.out = out;
this.digest = new BufferedChecksum(new CRC32());
}
public long getChecksum() {
return this.digest.getValue();
}
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
digest.update(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
digest.update(b, offset, length);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
out.close();
}
@Override
public void reset() throws IOException {
out.reset();
digest.reset();
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
/**
* Version 1 of the translog file format. Writes a header to identify the
* format, also writes checksums for each operation
*/
public class ChecksummedTranslogStream implements TranslogStream {
public static final int VERSION = 1;
private final InputStreamStreamInput in;
private final boolean fileExists;
ChecksummedTranslogStream(InputStreamStreamInput in, boolean fileExists) {
this.in = in;
// This could be a new file, in which case we can ignore reading and
// verifying the header
this.fileExists = fileExists;
if (fileExists) {
// The header must be read to advance the input stream
readAndVerifyHeader();
}
}
private void readAndVerifyHeader() {
assert this.in != null : "headers are only for translog files read from disk, not streaming operations";
try {
CodecUtil.checkHeader(new InputStreamDataInput(this.in), TranslogStreams.TRANSLOG_CODEC, VERSION, VERSION);
} catch (IOException e) {
throw new TranslogCorruptedException("translog header corrupted", e);
}
}
public Translog.Operation read() throws IOException {
if (this.fileExists == false) {
throw new IOException("translog file does not exist");
}
assert this.fileExists : "cannot read from a stream for a file that does not exist";
in.readInt(); // ignored operation size
return this.read(in);
}
private void verifyChecksum(BufferedChecksumStreamInput in) throws IOException {
// This absolutely must come first, or else reading the checksum becomes part of the checksum
long expectedChecksum = in.getChecksum();
long readChecksum = in.readInt() & 0xFFFF_FFFFL;
if (readChecksum != expectedChecksum) {
throw new TranslogCorruptedException("translog stream is corrupted, expected: 0x" +
Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum));
}
}
@Override
public Translog.Operation read(StreamInput inStream) throws IOException {
// This BufferedChecksumStreamInput remains unclosed on purpose,
// because closing it closes the underlying stream, which we don't
// want to do here.
BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(inStream);
Translog.Operation operation;
try {
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
operation = TranslogStreams.newOperationFromType(type);
operation.readFrom(in);
} catch (AssertionError|Exception e) {
throw new TranslogCorruptedException("translog corruption while reading from stream", e);
}
verifyChecksum(in);
return operation;
}
@Override
public Translog.Source readSource(byte[] data) throws IOException {
StreamInput nonChecksummingIn = new BytesStreamInput(data, false);
BufferedChecksumStreamInput in;
Translog.Source source;
try {
// the size header, not used and not part of the checksum
// because it is computed after the operation is written
nonChecksummingIn.readInt();
// This BufferedChecksumStreamInput remains unclosed on purpose,
// because closing it closes the underlying stream, which we don't
// want to do here.
in = new BufferedChecksumStreamInput(nonChecksummingIn);
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation = TranslogStreams.newOperationFromType(type);
source = operation.readSource(in);
} catch (AssertionError|Exception e) {
throw new TranslogCorruptedException("translog corruption while reading from byte array", e);
}
verifyChecksum(in);
return source;
}
@Override
public void write(StreamOutput outStream, Translog.Operation op) throws IOException {
// This BufferedChecksumStreamOutput remains unclosed on purpose,
// because closing it closes the underlying stream, which we don't
// want to do here.
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(outStream);
out.writeByte(op.opType().id());
op.writeTo(out);
long checksum = out.getChecksum();
out.writeInt((int)checksum);
}
@Override
public int writeHeader(FileChannel channel) throws IOException {
// This OutputStreamDataOutput is intentionally not closed because
// closing it will close the FileChannel
OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(channel));
CodecUtil.writeHeader(out, TranslogStreams.TRANSLOG_CODEC, VERSION);
return CodecUtil.headerLength(TranslogStreams.TRANSLOG_CODEC);
}
@Override
public void close() throws IOException {
this.in.close();
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.nio.channels.FileChannel;
/**
* Version 0 of the translog format, there is no header in this file
*/
public class LegacyTranslogStream implements TranslogStream {
private final InputStreamStreamInput in;
private final boolean fileExists;
LegacyTranslogStream(InputStreamStreamInput in, boolean fileExists) {
this.in = in;
this.fileExists = fileExists;
}
public Translog.Operation read() throws IOException {
assert this.fileExists : "cannot read from a stream for a file that does not exist";
in.readInt(); // ignored operation size
return this.read(in);
}
@Override
public Translog.Operation read(StreamInput in) throws IOException {
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation = TranslogStreams.newOperationFromType(type);
operation.readFrom(in);
return operation;
}
@Override
public Translog.Source readSource(byte[] data) throws IOException {
BytesStreamInput in = new BytesStreamInput(data, false);
in.readInt(); // the size header
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation = TranslogStreams.newOperationFromType(type);
return operation.readSource(in);
}
@Override
public void write(StreamOutput out, Translog.Operation op) throws IOException {
out.writeByte(op.opType().id());
op.writeTo(out);
}
@Override
public int writeHeader(FileChannel channel) {
// nothing, there is no header for version 0 translog files
return 0;
}
@Override
public void close() throws IOException {
this.in.close();
}
}

View File

@ -23,8 +23,8 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
@ -41,7 +41,6 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
/** /**
@ -106,6 +105,8 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
byte[] read(Location location); byte[] read(Location location);
Translog.Source readSource(Location location) throws IOException;
/** /**
* Snapshots the current transaction log allowing to safely iterate over the snapshot. * Snapshots the current transaction log allowing to safely iterate over the snapshot.
*/ */
@ -165,6 +166,9 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
*/ */
long translogId(); long translogId();
/**
* Returns the current position in the translog stream
*/
long position(); long position();
/** /**
@ -177,11 +181,15 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
*/ */
int estimatedTotalOperations(); int estimatedTotalOperations();
boolean hasNext(); /**
* Returns the next operation, or null when no more operations are found
*/
Operation next(); Operation next();
void seekForward(long length); /**
* Seek to the specified position in the translog stream
*/
void seekTo(long position);
/** /**
* The length in bytes of this stream. * The length in bytes of this stream.
@ -221,7 +229,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
case 4: case 4:
return DELETE_BY_QUERY; return DELETE_BY_QUERY;
default: default:
throw new IllegalArgumentException("No type mapped for [" + id + "]"); throw new ElasticsearchIllegalArgumentException("No type mapped for [" + id + "]");
} }
} }
} }

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.ElasticsearchException;
public class TranslogCorruptedException extends ElasticsearchException {
public TranslogCorruptedException(String msg) {
super(msg);
}
public TranslogCorruptedException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.FileChannel;
/**
* A translog stream that will read and write operations in the
* version-specific format
*/
public interface TranslogStream extends Closeable {
/**
* Read the next operation from the translog file, the stream <b>must</b>
* have been created through {@link TranslogStreams#translogStreamFor(java.io.File)}
*/
public Translog.Operation read() throws IOException;
/**
* Read the next translog operation from the input stream
*/
public Translog.Operation read(StreamInput in) throws IOException;
/**
* Read a translog operation from the given byte array, returning the
* {@link Translog.Source} object from the Operation
*/
public Translog.Source readSource(byte[] data) throws IOException;
/**
* Write the given translog operation to the output stream
*/
public void write(StreamOutput out, Translog.Operation op) throws IOException;
/**
* Optionally write a header identifying the translog version to the
* file channel
*/
public int writeHeader(FileChannel channel) throws IOException;
/**
* Close the stream opened with {@link TranslogStreams#translogStreamFor(java.io.File)}
*/
public void close() throws IOException;
}

View File

@ -19,66 +19,152 @@
package org.elasticsearch.index.translog; package org.elasticsearch.index.translog;
import org.elasticsearch.common.io.stream.BytesStreamInput; import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
/** /**
* * Encapsulating class used for operating on translog streams. Static methods
* on this class use the latest version of the stream.
*/ */
public class TranslogStreams { public class TranslogStreams {
public static Translog.Operation readTranslogOperation(StreamInput in) throws IOException { /** V0, no header, no checksums */
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); public static TranslogStream V0 = new LegacyTranslogStream(null, false);
Translog.Operation operation; /** V1, header, with per-op checksums */
public static TranslogStream V1 = new ChecksummedTranslogStream(null, false);
public static TranslogStream LATEST = V1;
public static final String TRANSLOG_CODEC = "translog";
private static final byte LUCENE_CODEC_HEADER_BYTE = 0x3f;
private static final byte UNVERSIONED_TRANSLOG_HEADER_BYTE = 0x00;
/**
* Returns a new empty translog operation for the given {@link Translog.Operation.Type}
*/
static Translog.Operation newOperationFromType(Translog.Operation.Type type) throws IOException {
switch (type) { switch (type) {
case CREATE: case CREATE:
operation = new Translog.Create(); return new Translog.Create();
break;
case DELETE: case DELETE:
operation = new Translog.Delete(); return new Translog.Delete();
break;
case DELETE_BY_QUERY: case DELETE_BY_QUERY:
operation = new Translog.DeleteByQuery(); return new Translog.DeleteByQuery();
break;
case SAVE: case SAVE:
operation = new Translog.Index(); return new Translog.Index();
break;
default: default:
throw new IOException("No type for [" + type + "]"); throw new IOException("No type for [" + type + "]");
} }
operation.readFrom(in);
return operation;
} }
/**
* Read the translog operation from the given byte array, returning the
* {@link Translog.Source} from the operation
*/
public static Translog.Source readSource(byte[] data) throws IOException { public static Translog.Source readSource(byte[] data) throws IOException {
BytesStreamInput in = new BytesStreamInput(data, false); return LATEST.readSource(data);
in.readInt(); // the size header
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation;
switch (type) {
case CREATE:
operation = new Translog.Create();
break;
case DELETE:
operation = new Translog.Delete();
break;
case DELETE_BY_QUERY:
operation = new Translog.DeleteByQuery();
break;
case SAVE:
operation = new Translog.Index();
break;
default:
throw new IOException("No type for [" + type + "]");
}
return operation.readSource(in);
} }
/**
* Read the next {@link Translog.Operation} from the stream using the
* latest translog version
*/
public static Translog.Operation readTranslogOperation(StreamInput in) throws IOException {
return LATEST.read(in);
}
/**
* Write the {@link Translog.Operation} to the output stream using the
* latest translog version
*/
public static void writeTranslogOperation(StreamOutput out, Translog.Operation op) throws IOException { public static void writeTranslogOperation(StreamOutput out, Translog.Operation op) throws IOException {
out.writeByte(op.opType().id()); LATEST.write(out, op);
op.writeTo(out); }
/**
* Given a file, return a VersionedTranslogStream based on an
* optionally-existing header in the file. If the file does not exist, or
* has zero length, returns the latest version. If the header does not
* exist, assumes Version 0 of the translog file format.
*
* The caller is responsible for closing the TranslogStream.
*
* @throws IOException
*/
public static TranslogStream translogStreamFor(File translogFile) throws IOException {
// This stream will be passed to the translog stream, so closing the
// TranslogStream will close this. It is not closed here on purpose.
InputStreamStreamInput in = new InputStreamStreamInput(new FileInputStream(translogFile));
boolean success = false;
try (InputStreamStreamInput headerStream = new InputStreamStreamInput(new FileInputStream(translogFile));) {
if (translogFile.exists() == false || translogFile.length() == 0) {
// if it doesn't exist or has no data, use the latest version,
// there aren't any backwards compatibility issues
success = true;
return new ChecksummedTranslogStream(in, false);
}
// Lucene's CodecUtil writes a magic number of 0x3FD76C17 with the
// header, in binary this looks like:
//
// binary: 0011 1111 1101 0111 0110 1100 0001 0111
// hex : 3 f d 7 6 c 1 7
//
// With version 0 of the translog, the first byte is the
// Operation.Type, which will always be between 0-4, so we know if
// we grab the first byte, it can be:
// 0x3f => Lucene's magic number, so we can assume it's version 1 or later
// 0x00 => version 0 of the translog
//
// otherwise the first byte of the translog is corrupted and we
// should bail
byte b1 = headerStream.readByte();
if (b1 == LUCENE_CODEC_HEADER_BYTE) {
// Read 3 more bytes, meaning a whole integer has been read
byte b2 = headerStream.readByte();
byte b3 = headerStream.readByte();
byte b4 = headerStream.readByte();
// Convert the 4 bytes that were read into an integer
int header = ((b1 & 0xFF) << 24) + ((b2 & 0xFF) << 16) + ((b3 & 0xFF) << 8) + ((b4 & 0xFF) << 0);
// We confirm CodecUtil's CODEC_MAGIC number (0x3FD76C17)
// ourselves here, because it allows us to read the first
// byte separately
if (header != CodecUtil.CODEC_MAGIC) {
throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header");
}
// Confirm the rest of the header using CodecUtil, extracting
// the translog version
int version = CodecUtil.checkHeaderNoMagic(new InputStreamDataInput(headerStream), TRANSLOG_CODEC, 1, Integer.MAX_VALUE);
switch (version) {
case ChecksummedTranslogStream.VERSION:
success = true;
return new ChecksummedTranslogStream(in, true);
default:
throw new TranslogCorruptedException("No known translog stream version: " + version);
}
} else if (b1 == UNVERSIONED_TRANSLOG_HEADER_BYTE) {
success = true;
return new LegacyTranslogStream(in, true);
} else {
throw new TranslogCorruptedException("Invalid first byte in translog file, got: " + Long.toHexString(b1) + ", expected 0x00 or 0x3f");
}
} catch (CorruptIndexException e) {
throw new TranslogCorruptedException("Translog header corrupted", e);
} finally {
// something happened, we should close the stream just so it's
// not dangling
if (success == false) {
IOUtils.close(in);
}
}
} }
} }

View File

@ -22,6 +22,8 @@ package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.TranslogStream;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.index.translog.TranslogException;
@ -38,6 +40,8 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
private final long id; private final long id;
private final ShardId shardId; private final ShardId shardId;
private final RafReference raf; private final RafReference raf;
private final TranslogStream translogStream;
private final int headerSize;
private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
@ -59,6 +63,11 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
this.raf = raf; this.raf = raf;
this.buffer = new byte[bufferSize]; this.buffer = new byte[bufferSize];
raf.raf().setLength(0); raf.raf().setLength(0);
this.translogStream = TranslogStreams.translogStreamFor(this.raf.file());
this.headerSize = this.translogStream.writeHeader(raf.channel());
this.lastPosition += headerSize;
this.lastWrittenPosition += headerSize;
this.lastSyncPosition += headerSize;
} }
public long id() { public long id() {
@ -137,6 +146,7 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
try { try {
flushBuffer(); flushBuffer();
FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter); FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter);
snapshot.seekTo(this.headerSize);
success = true; success = true;
return snapshot; return snapshot;
} catch (Exception e) { } catch (Exception e) {
@ -158,6 +168,11 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
return lastPosition != lastSyncPosition; return lastPosition != lastSyncPosition;
} }
@Override
public TranslogStream getStream() {
return this.translogStream;
}
@Override @Override
public void sync() throws IOException { public void sync() throws IOException {
if (!syncNeeded()) { if (!syncNeeded()) {
@ -182,6 +197,7 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
if (!delete) { if (!delete) {
try { try {
sync(); sync();
translogStream.close();
} catch (Exception e) { } catch (Exception e) {
throw new TranslogException(shardId, "failed to sync on close", e); throw new TranslogException(shardId, "failed to sync on close", e);
} }

View File

@ -22,11 +22,12 @@ package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.Translog;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -48,7 +49,7 @@ public class FsChannelSnapshot implements Translog.Snapshot {
private Translog.Operation lastOperationRead = null; private Translog.Operation lastOperationRead = null;
private int position = 0; private long position = 0;
private ByteBuffer cacheBuffer; private ByteBuffer cacheBuffer;
@ -92,10 +93,10 @@ public class FsChannelSnapshot implements Translog.Snapshot {
} }
@Override @Override
public boolean hasNext() { public Translog.Operation next() {
try { try {
if (position >= length) { if (position >= length) {
return false; return null;
} }
if (cacheBuffer == null) { if (cacheBuffer == null) {
cacheBuffer = ByteBuffer.allocate(1024); cacheBuffer = ByteBuffer.allocate(1024);
@ -103,7 +104,8 @@ public class FsChannelSnapshot implements Translog.Snapshot {
cacheBuffer.limit(4); cacheBuffer.limit(4);
int bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer); int bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer);
if (bytesRead < 0) { if (bytesRead < 0) {
// the snapshot is acquired under a write lock. we should never read beyond the EOF // the snapshot is acquired under a write lock. we should never
// read beyond the EOF, must be an abrupt EOF
throw new EOFException("read past EOF. pos [" + position + "] length: [" + cacheBuffer.limit() + "] end: [" + channel.size() + "]"); throw new EOFException("read past EOF. pos [" + position + "] length: [" + cacheBuffer.limit() + "] end: [" + channel.size() + "]");
} }
assert bytesRead == 4; assert bytesRead == 4;
@ -111,7 +113,8 @@ public class FsChannelSnapshot implements Translog.Snapshot {
int opSize = cacheBuffer.getInt(); int opSize = cacheBuffer.getInt();
position += 4; position += 4;
if ((position + opSize) > length) { if ((position + opSize) > length) {
// the snapshot is acquired under a write lock. we should never read beyond the EOF // the snapshot is acquired under a write lock. we should never
// read beyond the EOF, must be an abrupt EOF
position -= 4; position -= 4;
throw new EOFException("opSize of [" + opSize + "] pointed beyond EOF. position [" + position + "] length [" + length + "]"); throw new EOFException("opSize of [" + opSize + "] pointed beyond EOF. position [" + position + "] length [" + length + "]");
} }
@ -122,25 +125,21 @@ public class FsChannelSnapshot implements Translog.Snapshot {
cacheBuffer.limit(opSize); cacheBuffer.limit(opSize);
bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer); bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer);
if (bytesRead < 0) { if (bytesRead < 0) {
// the snapshot is acquired under a write lock. we should never
// read beyond the EOF, must be an abrupt EOF
throw new EOFException("tried to read past EOF. opSize [" + opSize + "] position [" + position + "] length [" + length + "]"); throw new EOFException("tried to read past EOF. opSize [" + opSize + "] position [" + position + "] length [" + length + "]");
} }
cacheBuffer.flip(); cacheBuffer.flip();
position += opSize; position += opSize;
lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true)); return TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true));
return true; } catch (IOException e) {
} catch (Exception e) { throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.raf.file(), e);
return false;
} }
} }
@Override @Override
public Translog.Operation next() { public void seekTo(long position) {
return this.lastOperationRead; this.position = position;
}
@Override
public void seekForward(long length) {
this.position += length;
} }
@Override @Override

View File

@ -34,10 +34,7 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.*;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -315,30 +312,58 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
} }
/**
* Returns the translog that should be read for the specified location. If
* the transient or current translog does not match, returns null
*/
private FsTranslogFile translogForLocation(Location location) {
if (trans != null && trans.id() == location.translogId) {
return this.trans;
}
if (current.id() == location.translogId) {
return this.current;
}
return null;
}
/**
* Private read method that reads from either the transient translog (if
* applicable), or the current translog. Acquires the read lock
* before reading.
* @return byte array of read data
*/
public byte[] read(Location location) { public byte[] read(Location location) {
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
FsTranslogFile trans = this.trans; FsTranslogFile trans = translogForLocation(location);
if (trans != null && trans.id() == location.translogId) { if (trans != null) {
try { try {
return trans.read(location); return trans.read(location);
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }
} }
if (current.id() == location.translogId) {
try {
return current.read(location);
} catch (Exception e) {
// ignore
}
}
return null; return null;
} finally { } finally {
rwl.readLock().unlock(); rwl.readLock().unlock();
} }
} }
/**
* Read the Source object from the given location, returns null if the
* source could not be read.
*/
@Override
public Source readSource(Location location) throws IOException {
byte[] data = this.read(location);
if (data == null) {
return null;
}
// Return the source using the current version of the stream based on
// which translog is being read
return this.translogForLocation(location).getStream().readSource(data);
}
@Override @Override
public Location add(Operation operation) throws TranslogException { public Location add(Operation operation) throws TranslogException {
rwl.readLock().lock(); rwl.readLock().lock();
@ -401,7 +426,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
public Snapshot snapshot(Snapshot snapshot) { public Snapshot snapshot(Snapshot snapshot) {
FsChannelSnapshot snap = snapshot(); FsChannelSnapshot snap = snapshot();
if (snap.translogId() == snapshot.translogId()) { if (snap.translogId() == snapshot.translogId()) {
snap.seekForward(snapshot.position()); snap.seekTo(snapshot.position());
} }
return snap; return snap;
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStream;
import java.io.IOException; import java.io.IOException;
@ -77,4 +78,6 @@ public interface FsTranslogFile {
void sync() throws IOException; void sync() throws IOException;
boolean syncNeeded(); boolean syncNeeded();
TranslogStream getStream();
} }

View File

@ -22,6 +22,8 @@ package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.TranslogStream;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.index.translog.TranslogException;
@ -38,6 +40,8 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
private final RafReference raf; private final RafReference raf;
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final TranslogStream translogStream;
private final int headerSize;
private volatile int operationCounter = 0; private volatile int operationCounter = 0;
@ -51,6 +55,11 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
this.id = id; this.id = id;
this.raf = raf; this.raf = raf;
raf.raf().setLength(0); raf.raf().setLength(0);
this.translogStream = TranslogStreams.translogStreamFor(this.raf.file());
this.headerSize = this.translogStream.writeHeader(raf.channel());
this.lastPosition += headerSize;
this.lastWrittenPosition += headerSize;
this.lastSyncPosition += headerSize;
} }
public long id() { public long id() {
@ -96,6 +105,7 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
if (!delete) { if (!delete) {
try { try {
sync(); sync();
translogStream.close();
} catch (Exception e) { } catch (Exception e) {
throw new TranslogException(shardId, "failed to sync on close", e); throw new TranslogException(shardId, "failed to sync on close", e);
} }
@ -115,6 +125,7 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
rwl.writeLock().lock(); rwl.writeLock().lock();
try { try {
FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter); FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter);
snapshot.seekTo(this.headerSize);
success = true; success = true;
return snapshot; return snapshot;
} finally { } finally {
@ -136,6 +147,11 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
return lastWrittenPosition != lastSyncPosition; return lastWrittenPosition != lastSyncPosition;
} }
@Override
public TranslogStream getStream() {
return this.translogStream;
}
public void sync() throws IOException { public void sync() throws IOException {
// check if we really need to sync here... // check if we really need to sync here...
if (!syncNeeded()) { if (!syncNeeded()) {

View File

@ -384,11 +384,12 @@ public class RecoverySource extends AbstractComponent {
long size = 0; long size = 0;
int totalOperations = 0; int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList(); List<Translog.Operation> operations = Lists.newArrayList();
while (snapshot.hasNext()) { Translog.Operation operation = snapshot.next();
while (operation != null) {
if (shard.state() == IndexShardState.CLOSED) { if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId()); throw new IndexShardClosedException(request.shardId());
} }
Translog.Operation operation = snapshot.next();
operations.add(operation); operations.add(operation);
ops += 1; ops += 1;
size += operation.estimateSize(); size += operation.estimateSize();
@ -409,6 +410,7 @@ public class RecoverySource extends AbstractComponent {
size = 0; size = 0;
operations.clear(); operations.clear();
} }
operation = snapshot.next();
} }
// send the leftover // send the leftover
if (!operations.isEmpty()) { if (!operations.isEmpty()) {

View File

@ -20,11 +20,12 @@
package org.elasticsearch.indices.recovery; package org.elasticsearch.indices.recovery;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
import java.io.IOException; import java.io.IOException;
@ -68,7 +69,12 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
int size = in.readVInt(); int size = in.readVInt();
operations = Lists.newArrayListWithExpectedSize(size); operations = Lists.newArrayListWithExpectedSize(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
operations.add(TranslogStreams.readTranslogOperation(in)); if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
operations.add(TranslogStreams.V1.read(in));
} else {
operations.add(TranslogStreams.V0.read(in));
}
} }
} }
@ -79,7 +85,11 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
shardId.writeTo(out); shardId.writeTo(out);
out.writeVInt(operations.size()); out.writeVInt(operations.size());
for (Translog.Operation operation : operations) { for (Translog.Operation operation : operations) {
TranslogStreams.writeTranslogOperation(out, operation); if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
TranslogStreams.V1.write(out, operation);
} else {
TranslogStreams.V0.write(out, operation);
}
} }
} }
} }

View File

@ -746,10 +746,10 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Override @Override
public void phase2(Translog.Snapshot snapshot) throws EngineException { public void phase2(Translog.Snapshot snapshot) throws EngineException {
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next(); Translog.Create create = (Translog.Create) snapshot.next();
assertThat("translog snapshot should not read null", create != null, equalTo(true));
assertThat(create.source().toBytesArray(), equalTo(B_2)); assertThat(create.source().toBytesArray(), equalTo(B_2));
assertThat(snapshot.hasNext(), equalTo(false)); assertThat(snapshot.next(), equalTo(null));
} }
@Override @Override
@ -777,9 +777,9 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Override @Override
public void phase2(Translog.Snapshot snapshot) throws EngineException { public void phase2(Translog.Snapshot snapshot) throws EngineException {
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next(); Translog.Create create = (Translog.Create) snapshot.next();
assertThat(snapshot.hasNext(), equalTo(false)); assertThat(create != null, equalTo(true));
assertThat(snapshot.next(), equalTo(null));
assertThat(create.source().toBytesArray(), equalTo(B_2)); assertThat(create.source().toBytesArray(), equalTo(B_2));
// add for phase3 // add for phase3
@ -789,9 +789,9 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Override @Override
public void phase3(Translog.Snapshot snapshot) throws EngineException { public void phase3(Translog.Snapshot snapshot) throws EngineException {
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next(); Translog.Create create = (Translog.Create) snapshot.next();
assertThat(snapshot.hasNext(), equalTo(false)); assertThat(create != null, equalTo(true));
assertThat(snapshot.next(), equalTo(null));
assertThat(create.source().toBytesArray(), equalTo(B_3)); assertThat(create.source().toBytesArray(), equalTo(B_3));
} }
}); });

View File

@ -0,0 +1,145 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.junit.Test;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.notNullValue;
/**
* Integration test for corrupted translog files
*/
@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes =0)
public class CorruptedTranslogTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
// we really need local GW here since this also checks for corruption etc.
// and we need to make sure primaries are not just trashed if we don't have replicas
.put(super.nodeSettings(nodeOrdinal)).put("gateway.type", "local")
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build();
}
@Test
public void testCorruptTranslogFiles() throws Exception {
internalCluster().startNodesAsync(1, ImmutableSettings.EMPTY).get();
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "-1")
.put("index.gateway.local.sync", "1s") // fsync the translog every second
));
ensureYellow();
// Index some documents
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
}
indexRandom(false, false, builders);
// Corrupt the translog file(s)
corruptRandomTranslogFiles();
// Restart the single node
internalCluster().fullRestart();
// node needs time to start recovery and discover the translog corruption
sleep(1000);
try {
client().prepareSearch("test").setQuery(matchAllQuery()).get();
fail("all shards should be failed due to a corrupted translog");
} catch (SearchPhaseExecutionException e) {
// Good, all shards should be failed because there is only a
// single shard and its translog is corrupt
}
}
private void corruptRandomTranslogFiles() throws IOException {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators());
ShardRouting shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
assertTrue(shardRouting.primary());
assertTrue(shardRouting.assignedToNode());
String nodeId = shardRouting.currentNodeId();
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
Set<File> files = new TreeSet<>(); // treeset makes sure iteration order is deterministic
for (FsStats.Info info : nodeStatses.getNodes()[0].getFs()) {
String path = info.getPath();
final String relativeDataLocationPath = "indices/test/" + Integer.toString(shardRouting.getId()) + "/translog";
File file = new File(path, relativeDataLocationPath);
logger.info("--> path: {}", file);
files.addAll(Arrays.asList(file.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
logger.info("--> File: {}", pathname);
return pathname.isFile() && pathname.getName().startsWith("translog-");
}
})));
}
File fileToCorrupt = null;
if (!files.isEmpty()) {
int corruptions = randomIntBetween(5, 20);
for (int i = 0; i < corruptions; i++) {
fileToCorrupt = RandomPicks.randomFrom(getRandom(), files);
try (RandomAccessFile raf = new RandomAccessFile(fileToCorrupt, "rw")) {
raf.seek(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.length() - 1)));
long filePointer = raf.getFilePointer();
byte b = raf.readByte();
raf.seek(filePointer);
raf.writeByte(~b);
raf.getFD().sync();
logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}", fileToCorrupt.getName(), filePointer, Integer.toHexString(b), Integer.toHexString(~b), fileToCorrupt);
}
}
}
assertThat("no file corrupted", fileToCorrupt, notNullValue());
}
}

View File

@ -34,12 +34,18 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/** /**
* *
@ -65,6 +71,8 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
protected abstract Translog create(); protected abstract Translog create();
protected abstract String translogFileDirectory();
@Test @Test
public void testRead() throws IOException { public void testRead() throws IOException {
Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1})); Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1}));
@ -145,23 +153,23 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
snapshot = translog.snapshot(); snapshot = translog.snapshot();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next(); Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create != null, equalTo(true));
assertThat(create.source().toBytes(), equalTo(new byte[]{1})); assertThat(create.source().toBytes(), equalTo(new byte[]{1}));
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next(); Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index != null, equalTo(true));
assertThat(index.source().toBytes(), equalTo(new byte[]{2})); assertThat(index.source().toBytes(), equalTo(new byte[]{2}));
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Delete delete = (Translog.Delete) snapshot.next(); Translog.Delete delete = (Translog.Delete) snapshot.next();
assertThat(delete != null, equalTo(true));
assertThat(delete.uid(), equalTo(newUid("3"))); assertThat(delete.uid(), equalTo(newUid("3")));
assertThat(snapshot.hasNext(), equalTo(true));
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) snapshot.next(); Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) snapshot.next();
assertThat(deleteByQuery != null, equalTo(true));
assertThat(deleteByQuery.source().toBytes(), equalTo(new byte[]{4})); assertThat(deleteByQuery.source().toBytes(), equalTo(new byte[]{4}));
assertThat(snapshot.hasNext(), equalTo(false)); assertThat(snapshot.next(), equalTo(null));
snapshot.close(); snapshot.close();
@ -188,17 +196,20 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
snapshot.close(); snapshot.close();
snapshot = translog.snapshot(); snapshot = translog.snapshot();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next(); Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create != null, equalTo(true));
assertThat(create.source().toBytes(), equalTo(new byte[]{1})); assertThat(create.source().toBytes(), equalTo(new byte[]{1}));
snapshot.close(); snapshot.close();
Translog.Snapshot snapshot1 = translog.snapshot(); Translog.Snapshot snapshot1 = translog.snapshot();
// we use the translogSize to also navigate to the last position on this snapshot
// so snapshot(Snapshot) will work properly
MatcherAssert.assertThat(snapshot1, TranslogSizeMatcher.translogSize(1)); MatcherAssert.assertThat(snapshot1, TranslogSizeMatcher.translogSize(1));
assertThat(snapshot1.estimatedTotalOperations(), equalTo(1)); assertThat(snapshot1.estimatedTotalOperations(), equalTo(1));
// seek to the end of the translog snapshot
while (snapshot1.next() != null) {
// spin
}
translog.add(new Translog.Index("test", "2", new byte[]{2})); translog.add(new Translog.Index("test", "2", new byte[]{2}));
snapshot = translog.snapshot(snapshot1); snapshot = translog.snapshot(snapshot1);
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1)); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1));
@ -206,10 +217,10 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
snapshot.close(); snapshot.close();
snapshot = translog.snapshot(snapshot1); snapshot = translog.snapshot(snapshot1);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next(); Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index != null, equalTo(true));
assertThat(index.source().toBytes(), equalTo(new byte[]{2})); assertThat(index.source().toBytes(), equalTo(new byte[]{2}));
assertThat(snapshot.hasNext(), equalTo(false)); assertThat(snapshot.next(), equalTo(null));
assertThat(snapshot.estimatedTotalOperations(), equalTo(2)); assertThat(snapshot.estimatedTotalOperations(), equalTo(2));
snapshot.close(); snapshot.close();
snapshot1.close(); snapshot1.close();
@ -235,17 +246,17 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
snapshot.close(); snapshot.close();
snapshot = translog.snapshot(actualSnapshot); snapshot = translog.snapshot(actualSnapshot);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Index index = (Translog.Index) snapshot.next(); Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index != null, equalTo(true));
assertThat(index.source().toBytes(), equalTo(new byte[]{3})); assertThat(index.source().toBytes(), equalTo(new byte[]{3}));
assertThat(snapshot.hasNext(), equalTo(false)); assertThat(snapshot.next(), equalTo(null));
actualSnapshot.close(); actualSnapshot.close();
snapshot.close(); snapshot.close();
} }
@Test @Test
public void testSnapshotWithSeekForward() { public void testSnapshotWithSeekTo() {
Translog.Snapshot snapshot = translog.snapshot(); Translog.Snapshot snapshot = translog.snapshot();
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0)); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0));
snapshot.close(); snapshot.close();
@ -253,19 +264,23 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
translog.add(new Translog.Create("test", "1", new byte[]{1})); translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot(); snapshot = translog.snapshot();
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1)); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1));
// seek to the end of the translog snapshot
while (snapshot.next() != null) {
// spin
}
long lastPosition = snapshot.position(); long lastPosition = snapshot.position();
snapshot.close(); snapshot.close();
translog.add(new Translog.Create("test", "2", new byte[]{1})); translog.add(new Translog.Create("test", "2", new byte[]{1}));
snapshot = translog.snapshot(); snapshot = translog.snapshot();
snapshot.seekForward(lastPosition); snapshot.seekTo(lastPosition);
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1)); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1));
snapshot.close(); snapshot.close();
snapshot = translog.snapshot(); snapshot = translog.snapshot();
snapshot.seekForward(lastPosition); snapshot.seekTo(lastPosition);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next(); Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create != null, equalTo(true));
assertThat(create.id(), equalTo("2")); assertThat(create.id(), equalTo("2"));
snapshot.close(); snapshot.close();
} }
@ -397,6 +412,50 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
} }
@Test
public void testTranslogChecksums() throws Exception {
List<Translog.Location> locations = newArrayList();
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAsciiOfLengthBetween(1, 50);
locations.add(translog.add(new Translog.Create("test", "" + op, ascii.getBytes("UTF-8"))));
}
translog.sync();
corruptTranslogs(translogFileDirectory());
AtomicInteger corruptionsCaught = new AtomicInteger(0);
for (Translog.Location location : locations) {
try {
TranslogStreams.readSource(translog.read(location));
} catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet();
}
}
assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1));
}
/**
* Randomly overwrite some bytes in the translog files
*/
private void corruptTranslogs(String directory) throws Exception {
File[] files = new File(directory).listFiles();
if (files != null) {
for (File file : files) {
if (file.getName().startsWith("translog-")) {
logger.info("--> corrupting {}...", file.getName());
RandomAccessFile f = new RandomAccessFile(file, "rw");
int corruptions = scaledRandomIntBetween(10, 50);
for (int i = 0; i < corruptions; i++) {
f.seek(randomIntBetween(0, (int)f.length()));
f.write(randomByte());
}
f.close();
}
}
}
}
private Term newUid(String id) { private Term newUid(String id) {
return new Term("_uid", id); return new Term("_uid", id);

View File

@ -37,11 +37,17 @@ public class TranslogSizeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
@Override @Override
public boolean matchesSafely(Translog.Snapshot snapshot) { public boolean matchesSafely(Translog.Snapshot snapshot) {
int count = 0; int count = 0;
while (snapshot.hasNext()) { long startingPosition = snapshot.position();
snapshot.next(); try {
count++; while (snapshot.next() != null) {
count++;
}
return size == count;
} finally {
// Since counting the translog size consumes the stream, reset it
// back to the origin position after reading
snapshot.seekTo(startingPosition);
} }
return size == count;
} }
@Override @Override

View File

@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.EOFException;
import java.io.File;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests for reading old and new translog files
*/
public class TranslogVersionTests extends ElasticsearchTestCase {
@Test
public void testV0LegacyTranslogVersion() throws Exception {
File translogFile = getResource("/org/elasticsearch/index/translog/translog-v0.binary");
assertThat("test file should exist", translogFile.exists(), equalTo(true));
TranslogStream stream = TranslogStreams.translogStreamFor(translogFile);
assertThat("a version0 stream is returned", stream instanceof LegacyTranslogStream, equalTo(true));
Translog.Operation operation = stream.read();
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.SAVE, equalTo(true));
Translog.Index op = (Translog.Index) operation;
assertThat(op.id(), equalTo("1"));
assertThat(op.type(), equalTo("doc"));
assertThat(op.source().toUtf8(), equalTo("{\"body\": \"worda wordb wordc wordd \\\"worde\\\" wordf\"}"));
assertThat(op.routing(), equalTo(null));
assertThat(op.parent(), equalTo(null));
assertThat(op.version(), equalTo(1L));
assertThat(op.timestamp(), equalTo(1407312091791L));
assertThat(op.ttl(), equalTo(-1L));
assertThat(op.versionType(), equalTo(VersionType.INTERNAL));
try {
stream.read();
fail("should have been the end of the file");
} catch (EOFException e) {
// success
} finally {
stream.close();
}
}
@Test
public void testV1ChecksummedTranslogVersion() throws Exception {
File translogFile = getResource("/org/elasticsearch/index/translog/translog-v1.binary");
assertThat("test file should exist", translogFile.exists(), equalTo(true));
TranslogStream stream = TranslogStreams.translogStreamFor(translogFile);
assertThat("a version1 stream is returned", stream instanceof ChecksummedTranslogStream, equalTo(true));
Translog.Operation operation = stream.read();
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.CREATE, equalTo(true));
Translog.Create op = (Translog.Create) operation;
assertThat(op.id(), equalTo("Bwiq98KFSb6YjJQGeSpeiw"));
assertThat(op.type(), equalTo("doc"));
assertThat(op.source().toUtf8(), equalTo("{\"body\": \"foo\"}"));
assertThat(op.routing(), equalTo(null));
assertThat(op.parent(), equalTo(null));
assertThat(op.version(), equalTo(1L));
assertThat(op.timestamp(), equalTo(1408627184844L));
assertThat(op.ttl(), equalTo(-1L));
assertThat(op.versionType(), equalTo(VersionType.INTERNAL));
// There are more operations
int opNum = 1;
while (true) {
try {
stream.read();
opNum++;
} catch (EOFException e) {
break;
}
}
assertThat("there should be 5 translog operations", opNum, equalTo(5));
stream.close();
}
@Test
public void testCorruptedTranslogs() throws Exception {
try {
File translogFile = getResource("/org/elasticsearch/index/translog/translog-v1-corrupted-magic.binary");
assertThat("test file should exist", translogFile.exists(), equalTo(true));
TranslogStream stream = TranslogStreams.translogStreamFor(translogFile);
fail("should have thrown an exception about the header being corrupt");
} catch (TranslogCorruptedException e) {
assertThat("translog corruption from header: " + e.getMessage(),
e.getMessage().contains("translog looks like version 1 or later, but has corrupted header"), equalTo(true));
}
try {
File translogFile = getResource("/org/elasticsearch/index/translog/translog-invalid-first-byte.binary");
assertThat("test file should exist", translogFile.exists(), equalTo(true));
TranslogStream stream = TranslogStreams.translogStreamFor(translogFile);
fail("should have thrown an exception about the header being corrupt");
} catch (TranslogCorruptedException e) {
assertThat("translog corruption from header: " + e.getMessage(),
e.getMessage().contains("Invalid first byte in translog file, got: 1, expected 0x00 or 0x3f"), equalTo(true));
}
try {
File translogFile = getResource("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary");
assertThat("test file should exist", translogFile.exists(), equalTo(true));
TranslogStream stream = TranslogStreams.translogStreamFor(translogFile);
while (true) {
try {
stream.read();
} catch (EOFException e) {
break;
}
}
fail("should have thrown an exception about the body being corrupted");
} catch (TranslogCorruptedException e) {
assertThat("translog corruption from body: " + e.getMessage(),
e.getMessage().contains("translog stream is corrupted"), equalTo(true));
}
}
}

View File

@ -39,10 +39,15 @@ public class FsBufferedTranslogTests extends AbstractSimpleTranslogTests {
.put("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name()) .put("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name())
.put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024)) .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024))
.build(), .build(),
new File("data/fs-buf-translog") new File(translogFileDirectory())
); );
} }
@Override
protected String translogFileDirectory() {
return "data/fs-buf-translog";
}
@AfterClass @AfterClass
public static void cleanup() { public static void cleanup() {
FileSystemUtils.deleteRecursively(new File("data/fs-buf-translog"), true); FileSystemUtils.deleteRecursively(new File("data/fs-buf-translog"), true);

View File

@ -36,7 +36,12 @@ public class FsSimpleTranslogTests extends AbstractSimpleTranslogTests {
protected Translog create() { protected Translog create() {
return new FsTranslog(shardId, return new FsTranslog(shardId,
ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.SIMPLE.name()).build(), ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.SIMPLE.name()).build(),
new File("data/fs-simple-translog")); new File(translogFileDirectory()));
}
@Override
protected String translogFileDirectory() {
return "data/fs-simple-translog";
} }
@AfterClass @AfterClass