Better support for partial buffer reads/writes in translog infrastructure

Some IO api can return after writing & reading only a part of the requested data. On these rare occasions, we should call the methods again to read/write the rest of the data. This has cause rare translog corruption while writing huge documents on Windows.

Noteful parts of the commit:
- A new Channels class with utility methods for reading and writing to channels
- Writing or reading to channels is added to the forbidden API list
- Added locking to SimpleFsTranslogFile
- Removed FileChannelInputStream which was not used

Closes #6441 , #6576
This commit is contained in:
Boaz Leskes 2014-06-20 15:33:17 +02:00
parent 5668b1cfc5
commit 72d2ac1328
19 changed files with 855 additions and 271 deletions

View File

@ -56,3 +56,13 @@ java.lang.System#gc()
@defaultMessage Use Long.compare instead we are on Java7
com.google.common.primitives.Longs#compare(long,long)
@defaultMessage Use Channels.* methods to write to channels. Do not write directly.
java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
java.nio.channels.FileChannel#write(java.nio.ByteBuffer, long)
java.nio.channels.GatheringByteChannel#write(java.nio.ByteBuffer[], int, int)
java.nio.channels.GatheringByteChannel#write(java.nio.ByteBuffer[])
java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
java.nio.channels.ScatteringByteChannel#read(java.nio.ByteBuffer[])
java.nio.channels.ScatteringByteChannel.read(java.nio.ByteBuffer[], int, int)
java.nio.channels.FileChannel#read(java.nio.ByteBuffer, long)

View File

@ -1155,6 +1155,9 @@
<!-- start exclude for FilteredQuery -->
<exclude>org/elasticsearch/common/lucene/search/XFilteredQuery.class</exclude>
<!-- end exclude for FilteredQuery -->
<!-- start exclude for Channels utility class -->
<exclude>org/elasticsearch/common/io/Channels.class</exclude>
<!-- end exclude for Channels -->
</excludes>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'targetVersion': -->

View File

@ -19,6 +19,15 @@
package org.elasticsearch.common.bytes;
import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@ -28,15 +37,6 @@ import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
import com.google.common.base.Charsets;
/**
*/
public class ByteBufferBytesReference implements BytesReference {
@ -86,7 +86,7 @@ public class ByteBufferBytesReference implements BytesReference {
@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(buffer);
Channels.writeToChannel(buffer, channel);
}
@Override

View File

@ -23,6 +23,7 @@ import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
@ -30,7 +31,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;
@ -109,7 +109,7 @@ public class BytesArray implements BytesReference {
@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes, offset, length()));
Channels.writeToChannel(bytes, offset, length(), channel);
}
@Override

View File

@ -20,6 +20,7 @@ package org.elasticsearch.common.bytes;
import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.jboss.netty.buffer.ChannelBuffer;
@ -27,7 +28,6 @@ import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;
/**
*/
@ -66,7 +66,7 @@ public class ChannelBufferBytesReference implements BytesReference {
@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
buffer.getBytes(buffer.readerIndex(), channel, length());
Channels.writeToChannel(buffer, buffer.readerIndex(), length(), channel);
}
@Override

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.bytes;
import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
@ -28,7 +29,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
/**
@ -77,7 +77,7 @@ public class HashedBytesArray implements BytesReference {
@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes));
Channels.writeToChannel(bytes, 0, bytes.length, channel);
}
@Override

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
@ -32,7 +33,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;
@ -95,7 +95,7 @@ public class PagedBytesReference implements BytesReference {
BytesRef ref = new BytesRef();
int written = 0;
// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
@ -122,53 +122,21 @@ public class PagedBytesReference implements BytesReference {
return;
}
ByteBuffer[] buffers;
ByteBuffer currentBuffer = null;
int currentLength = length;
int currentOffset = offset;
BytesRef ref = new BytesRef();
int pos = 0;
// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
bytearray.get(offset, fragmentSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize);
pos += fragmentSize;
while (currentLength > 0) {
// try to align to the underlying pages while writing, so no new arrays will be created.
int fragmentSize = Math.min(currentLength, PAGE_SIZE - (currentOffset % PAGE_SIZE));
boolean newArray = bytearray.get(currentOffset, fragmentSize, ref);
assert !newArray : "PagedBytesReference failed to align with underlying bytearray. offset [" + currentOffset + "], size [" + fragmentSize + "]";
Channels.writeToChannel(ref.bytes, ref.offset, ref.length, channel);
currentLength -= ref.length;
currentOffset += ref.length;
}
// we only have a single page
if (pos == length && currentBuffer != null) {
channel.write(currentBuffer);
return;
}
// a slice > pagesize will likely require extra buffers for initial/trailing fragments
int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);
buffers = new ByteBuffer[numBuffers];
int bufferSlot = 0;
if (currentBuffer != null) {
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
}
// handle remainder of pages + trailing fragment
while (pos < length) {
int remaining = length - pos;
int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
bytearray.get(offset + pos, bulkSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize);
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
pos += bulkSize;
}
// this would indicate that our numBuffer calculation is off by one.
assert (numBuffers == bufferSlot);
// finally write all buffers
channel.write(buffers);
assert currentLength == 0;
}
@Override
@ -205,8 +173,7 @@ public class PagedBytesReference implements BytesReference {
if (copied) {
// BigArray has materialized for us, no need to do it again
return new BytesArray(ref.bytes, ref.offset, ref.length);
}
else {
} else {
// here we need to copy the bytes even when shared
byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length);
return new BytesArray(copy);
@ -223,7 +190,7 @@ public class PagedBytesReference implements BytesReference {
ChannelBuffer[] buffers;
ChannelBuffer currentBuffer = null;
BytesRef ref = new BytesRef();
int pos = 0;
int pos = 0;
// are we a slice?
if (offset != 0) {
@ -349,10 +316,10 @@ public class PagedBytesReference implements BytesReference {
}
if (!(obj instanceof PagedBytesReference)) {
return BytesReference.Helper.bytesEqual(this, (BytesReference)obj);
return BytesReference.Helper.bytesEqual(this, (BytesReference) obj);
}
PagedBytesReference other = (PagedBytesReference)obj;
PagedBytesReference other = (PagedBytesReference) obj;
if (length != other.length) {
return false;
}
@ -422,7 +389,7 @@ public class PagedBytesReference implements BytesReference {
@Override
public int read() throws IOException {
return (pos < length) ? bytearray.get(offset + pos++) : -1;
return (pos < length) ? bytearray.get(offset + pos++) : -1;
}
@Override
@ -445,7 +412,7 @@ public class PagedBytesReference implements BytesReference {
while (copiedBytes < numBytesToCopy) {
long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulkSize = (int)Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
int bulkSize = (int) Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents

View File

@ -0,0 +1,228 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;
public final class Channels {
private Channels() {
}
/**
* The maximum chunk size for reads in bytes
*/
private static final int READ_CHUNK_SIZE = 16384;
/**
* The maximum chunk size for writes in bytes
*/
private static final int WRITE_CHUNK_SIZE = 8192;
/**
* read <i>length</i> bytes from <i>position</i> of a file channel
*/
public static byte[] readFromFileChannel(FileChannel channel, long position, int length) throws IOException {
byte[] res = new byte[length];
readFromFileChannelWithEofException(channel, position, res, 0, length);
return res;
}
/**
* read <i>length</i> bytes from <i>position</i> of a file channel. An EOFException will be thrown if you
* attempt to read beyond the end of file.
*
* @param channel channel to read from
* @param channelPosition position to read from
* @param dest destination byte array to put data in
* @param destOffset offset in dest to read into
* @param length number of bytes to read
*/
public static void readFromFileChannelWithEofException(FileChannel channel, long channelPosition, byte[] dest, int destOffset, int length) throws IOException {
int read = readFromFileChannel(channel, channelPosition, dest, destOffset, length);
if (read < 0) {
throw new EOFException("read past EOF. pos [" + channelPosition + "] length: [" + length + "] end: [" + channel.size() + "]");
}
}
/**
* read <i>length</i> bytes from <i>position</i> of a file channel.
*
* @param channel channel to read from
* @param channelPosition position to read from
* @param dest destination byte array to put data in
* @param destOffset offset in dest to read into
* @param length number of bytes to read
* @return total bytes read or -1 if an attempt was made to read past EOF. The method always tries to read all the bytes
* that will fit in the destination byte buffer.
*/
public static int readFromFileChannel(FileChannel channel, long channelPosition, byte[] dest, int destOffset, int length) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(dest, destOffset, length);
return readFromFileChannel(channel, channelPosition, buffer);
}
/**
* read from a file channel into a byte buffer, starting at a certain position.
*
* @param channel channel to read from
* @param channelPosition position to read from
* @param dest destination {@link java.nio.ByteBuffer} to put data in
* @return total bytes read or -1 if an attempt was made to read past EOF. The method always tries to read all the bytes
* that will fit in the destination byte buffer.
*/
public static int readFromFileChannel(FileChannel channel, long channelPosition, ByteBuffer dest) throws IOException {
if (dest.isDirect() || (dest.remaining() < READ_CHUNK_SIZE)) {
return readSingleChunk(channel, channelPosition, dest);
} else {
int bytesRead = 0;
int bytesToRead = dest.remaining();
// duplicate the buffer in order to be able to change the limit
ByteBuffer tmpBuffer = dest.duplicate();
try {
while (dest.hasRemaining()) {
tmpBuffer.limit(Math.min(dest.limit(), tmpBuffer.position() + READ_CHUNK_SIZE));
int read = readSingleChunk(channel, channelPosition, tmpBuffer);
if (read < 0) {
return read;
}
bytesRead += read;
channelPosition += read;
dest.position(tmpBuffer.position());
}
} finally {
// make sure we update byteBuffer to indicate how far we came..
dest.position(tmpBuffer.position());
}
assert bytesRead == bytesToRead : "failed to read an entire buffer but also didn't get an EOF (read [" + bytesRead + "] needed [" + bytesToRead + "]";
return bytesRead;
}
}
private static int readSingleChunk(FileChannel channel, long channelPosition, ByteBuffer dest) throws IOException {
int bytesRead = 0;
while (dest.hasRemaining()) {
int read = channel.read(dest, channelPosition);
if (read < 0) {
return read;
}
assert read > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)";
bytesRead += read;
channelPosition += read;
}
return bytesRead;
}
/**
* Copies bytes from source {@link org.jboss.netty.buffer.ChannelBuffer} to a {@link java.nio.channels.GatheringByteChannel}
*
* @param source ChannelBuffer to copy from
* @param sourceIndex index in <i>source</i> to start copying from
* @param length how many bytes to copy
* @param channel target GatheringByteChannel
* @throws IOException
*/
public static void writeToChannel(ChannelBuffer source, int sourceIndex, int length, GatheringByteChannel channel) throws IOException {
while (length > 0) {
int written = source.getBytes(sourceIndex, channel, length);
sourceIndex += written;
length -= written;
}
assert length == 0;
}
/**
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
*
* @param source byte array to copy from
* @param channel target WritableByteChannel
* @throws IOException
*/
public static void writeToChannel(byte[] source, WritableByteChannel channel) throws IOException {
writeToChannel(source, 0, source.length, channel);
}
/**
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
*
* @param source byte array to copy from
* @param offset start copying from this offset
* @param length how many bytes to copy
* @param channel target WritableByteChannel
* @throws IOException
*/
public static void writeToChannel(byte[] source, int offset, int length, WritableByteChannel channel) throws IOException {
int toWrite = Math.min(length, WRITE_CHUNK_SIZE);
ByteBuffer buffer = ByteBuffer.wrap(source, offset, toWrite);
int written = channel.write(buffer);
length -= written;
while (length > 0) {
toWrite = Math.min(length, WRITE_CHUNK_SIZE);
buffer.limit(buffer.position() + toWrite);
written = channel.write(buffer);
length -= written;
}
assert length == 0 : "wrote more then expected bytes (length=" + length + ")";
}
/**
* Writes a {@link java.nio.ByteBuffer} to a {@link java.nio.channels.WritableByteChannel}
*
* @param byteBuffer source buffer
* @param channel channel to write to
* @throws IOException
*/
public static void writeToChannel(ByteBuffer byteBuffer, WritableByteChannel channel) throws IOException {
if (byteBuffer.isDirect() || (byteBuffer.remaining() <= WRITE_CHUNK_SIZE)) {
while (byteBuffer.hasRemaining()) {
channel.write(byteBuffer);
}
} else {
// duplicate the buffer in order to be able to change the limit
ByteBuffer tmpBuffer = byteBuffer.duplicate();
try {
while (byteBuffer.hasRemaining()) {
tmpBuffer.limit(Math.min(byteBuffer.limit(), tmpBuffer.position() + WRITE_CHUNK_SIZE));
while (tmpBuffer.hasRemaining()) {
channel.write(tmpBuffer);
}
byteBuffer.position(tmpBuffer.position());
}
} finally {
// make sure we update byteBuffer to indicate how far we came..
byteBuffer.position(tmpBuffer.position());
}
}
}
}

View File

@ -1,109 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
*
*/
public class FileChannelInputStream extends InputStream {
private final FileChannel channel;
private long position;
private long length;
private ByteBuffer bb = null;
private byte[] bs = null; // Invoker's previous array
private byte[] b1 = null;
private long markPosition;
/**
* @param channel The channel to read from
* @param position The position to start reading from
* @param length The length to read
*/
public FileChannelInputStream(FileChannel channel, long position, long length) {
this.channel = channel;
this.position = position;
this.markPosition = position;
this.length = position + length; // easier to work with total length
}
@Override
public int read() throws IOException {
if (b1 == null) {
b1 = new byte[1];
}
int n = read(b1);
if (n == 1) {
return b1[0] & 0xff;
}
return -1;
}
@Override
public int read(byte[] bs, int off, int len) throws IOException {
if (len == 0) {
return 0;
}
if ((length - position) < len) {
len = (int) (length - position);
}
if (len == 0) {
return -1;
}
ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs));
bb.limit(Math.min(off + len, bb.capacity()));
bb.position(off);
this.bb = bb;
this.bs = bs;
int read = channel.read(bb, position);
if (read > 0) {
position += read;
}
return read;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public void mark(int readlimit) {
this.markPosition = position;
}
@Override
public void reset() throws IOException {
position = markPosition;
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -38,7 +39,6 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShardComponent;
import java.io.IOException;
import java.io.InputStream;
/**
*
@ -173,11 +173,6 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
void seekForward(long length);
/**
* Returns a stream of this snapshot.
*/
InputStream stream() throws IOException;
/**
* The length in bytes of this stream.
*/
@ -479,27 +474,31 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
id = in.readString();
type = in.readString();
source = in.readBytesReference();
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readString();
try {
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readString();
}
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readString();
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readString();
}
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
if (version >= 6) {
this.versionType = VersionType.fromValue(in.readByte());
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
if (version >= 6) {
this.versionType = VersionType.fromValue(in.readByte());
}
} catch (Exception e) {
throw new ElasticsearchException("failed to read [" + type + "][" + id + "]", e);
}
assert versionType.validateVersionForWrites(version);
@ -550,6 +549,12 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
this.uid = uid;
}
public Delete(Term uid, long version, VersionType versionType) {
this.uid = uid;
this.version = version;
this.versionType = versionType;
}
@Override
public Type opType() {
return Type.DELETE;

View File

@ -20,13 +20,13 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -104,7 +104,8 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
if (bufferCount > 0) {
// we use the channel to write, since on windows, writing to the RAF might not be reflected
// when reading through the channel
raf.channel().write(ByteBuffer.wrap(buffer, 0, bufferCount));
Channels.writeToChannel(buffer, 0, bufferCount, raf.channel());
lastWrittenPosition += bufferCount;
bufferCount = 0;
}
@ -122,25 +123,34 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
} finally {
rwl.readLock().unlock();
}
ByteBuffer buffer = ByteBuffer.allocate(location.size);
raf.channel().read(buffer, location.translogLocation);
return buffer.array();
// we don't have to have a read lock here because we only write ahead to the file, so all writes has been complete
// for the requested location.
return Channels.readFromFileChannel(raf.channel(), location.translogLocation, location.size);
}
@Override
public FsChannelSnapshot snapshot() throws TranslogException {
rwl.writeLock().lock();
try {
flushBuffer();
if (!raf.increaseRefCount()) {
return null;
if (raf.increaseRefCount()) {
boolean success = false;
try {
rwl.writeLock().lock();
try {
flushBuffer();
FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter);
success = true;
return snapshot;
} catch (Exception e) {
throw new TranslogException(shardId, "exception while creating snapshot", e);
} finally {
rwl.writeLock().unlock();
}
} finally {
if (!success) {
raf.decreaseRefCount(false);
}
}
return new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter);
} catch (IOException e) {
throw new TranslogException(shardId, "failed to flush", e);
} finally {
rwl.writeLock().unlock();
}
return null;
}
@Override

View File

@ -20,16 +20,16 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.FileChannelInputStream;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -52,6 +52,12 @@ public class FsChannelSnapshot implements Translog.Snapshot {
private ByteBuffer cacheBuffer;
private AtomicBoolean closed = new AtomicBoolean(false);
/**
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
*/
public FsChannelSnapshot(long id, RafReference raf, long length, int totalOperations) throws FileNotFoundException {
this.id = id;
this.raf = raf;
@ -80,11 +86,6 @@ public class FsChannelSnapshot implements Translog.Snapshot {
return this.totalOperations;
}
@Override
public InputStream stream() throws IOException {
return new FileChannelInputStream(channel, position, lengthInBytes());
}
@Override
public long lengthInBytes() {
return length - position;
@ -93,31 +94,36 @@ public class FsChannelSnapshot implements Translog.Snapshot {
@Override
public boolean hasNext() {
try {
if (position > length) {
if (position >= length) {
return false;
}
if (cacheBuffer == null) {
cacheBuffer = ByteBuffer.allocate(1024);
}
cacheBuffer.limit(4);
int bytesRead = channel.read(cacheBuffer, position);
if (bytesRead < 4) {
return false;
int bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer);
if (bytesRead < 0) {
// the snapshot is acquired under a write lock. we should never read beyond the EOF
throw new EOFException("read past EOF. pos [" + position + "] length: [" + cacheBuffer.limit() + "] end: [" + channel.size() + "]");
}
assert bytesRead == 4;
cacheBuffer.flip();
int opSize = cacheBuffer.getInt();
position += 4;
if ((position + opSize) > length) {
// restore the position to before we read the opSize
// the snapshot is acquired under a write lock. we should never read beyond the EOF
position -= 4;
return false;
throw new EOFException("opSize of [" + opSize + "] pointed beyond EOF. position [" + position + "] length [" + length + "]");
}
if (cacheBuffer.capacity() < opSize) {
cacheBuffer = ByteBuffer.allocate(opSize);
}
cacheBuffer.clear();
cacheBuffer.limit(opSize);
channel.read(cacheBuffer, position);
bytesRead = Channels.readFromFileChannel(channel, position, cacheBuffer);
if (bytesRead < 0) {
throw new EOFException("tried to read past EOF. opSize [" + opSize + "] position [" + position + "] length [" + length + "]");
}
cacheBuffer.flip();
position += opSize;
lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true));
@ -139,6 +145,9 @@ public class FsChannelSnapshot implements Translog.Snapshot {
@Override
public void close() throws ElasticsearchException {
if (!closed.compareAndSet(false, true)) {
return;
}
raf.decreaseRefCount(true);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
@ -108,6 +109,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
this.bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));
this.bufferSize = (int) componentSettings.getAsBytesSize("buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes();
}
@Override
@ -360,6 +362,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
if (syncOnEachOperation) {
current.sync();
}
assert new BytesArray(current.read(location)).equals(bytes);
FsTranslogFile trans = this.trans;
if (trans != null) {
try {

View File

@ -20,15 +20,16 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SimpleFsTranslogFile implements FsTranslogFile {
@ -36,11 +37,12 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
private final ShardId shardId;
private final RafReference raf;
private final AtomicBoolean closed = new AtomicBoolean();
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final AtomicInteger operationCounter = new AtomicInteger();
private volatile int operationCounter = 0;
private final AtomicLong lastPosition = new AtomicLong(0);
private final AtomicLong lastWrittenPosition = new AtomicLong(0);
private volatile long lastPosition = 0;
private volatile long lastWrittenPosition = 0;
private volatile long lastSyncPosition = 0;
@ -56,25 +58,34 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
}
public int estimatedNumberOfOperations() {
return operationCounter.get();
return operationCounter;
}
public long translogSizeInBytes() {
return lastWrittenPosition.get();
return lastWrittenPosition;
}
public Translog.Location add(BytesReference data) throws IOException {
long position = lastPosition.getAndAdd(data.length());
data.writeTo(raf.channel());
lastWrittenPosition.getAndAdd(data.length());
operationCounter.incrementAndGet();
return new Translog.Location(id, position, data.length());
rwl.writeLock().lock();
try {
long position = lastPosition;
data.writeTo(raf.channel());
lastPosition = lastPosition + data.length();
lastWrittenPosition = lastWrittenPosition + data.length();
operationCounter = operationCounter + 1;
return new Translog.Location(id, position, data.length());
} finally {
rwl.writeLock().unlock();
}
}
public byte[] read(Translog.Location location) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(location.size);
raf.channel().read(buffer, location.translogLocation);
return buffer.array();
rwl.readLock().lock();
try {
return Channels.readFromFileChannel(raf.channel(), location.translogLocation, location.size);
} finally {
rwl.readLock().unlock();
}
}
public void close(boolean delete) {
@ -98,29 +109,45 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
* Returns a snapshot on this file, <tt>null</tt> if it failed to snapshot.
*/
public FsChannelSnapshot snapshot() throws TranslogException {
try {
if (!raf.increaseRefCount()) {
return null;
if (raf.increaseRefCount()) {
boolean success = false;
try {
rwl.writeLock().lock();
try {
FsChannelSnapshot snapshot = new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter);
success = true;
return snapshot;
} finally {
rwl.writeLock().unlock();
}
} catch (FileNotFoundException e) {
throw new TranslogException(shardId, "failed to create snapshot", e);
} finally {
if (!success) {
raf.decreaseRefCount(false);
}
}
return new FsChannelSnapshot(this.id, raf, lastWrittenPosition.get(), operationCounter.get());
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
}
return null;
}
@Override
public boolean syncNeeded() {
return lastWrittenPosition.get() != lastSyncPosition;
return lastWrittenPosition != lastSyncPosition;
}
public void sync() throws IOException {
// check if we really need to sync here...
long last = lastWrittenPosition.get();
if (last == lastSyncPosition) {
if (!syncNeeded()) {
return;
}
lastSyncPosition = last;
raf.channel().force(false);
rwl.writeLock().lock();
try {
lastSyncPosition = lastWrittenPosition;
raf.channel().force(false);
} finally {
rwl.writeLock().unlock();
}
}
@Override

View File

@ -0,0 +1,289 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import org.elasticsearch.common.bytes.ByteBufferBytesReference;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
public class ChannelsTests extends ElasticsearchTestCase {
byte[] randomBytes;
FileChannel fileChannel;
RandomAccessFile randomAccessFile;
@Before
public void setUp() throws Exception {
super.setUp();
File tmpFile = newTempFile();
randomAccessFile = new RandomAccessFile(tmpFile, "rw");
fileChannel = new MockFileChannel(randomAccessFile.getChannel());
randomBytes = randomUnicodeOfLength(scaledRandomIntBetween(10, 100000)).getBytes("UTF-8");
}
@After
public void tearDown() throws Exception {
fileChannel.close();
randomAccessFile.close();
super.tearDown();
}
@Test
public void testReadWriteThoughArrays() throws Exception {
Channels.writeToChannel(randomBytes, fileChannel);
byte[] readBytes = Channels.readFromFileChannel(fileChannel, 0, randomBytes.length);
assertThat("read bytes didn't match written bytes", randomBytes, Matchers.equalTo(readBytes));
}
@Test
public void testPartialReadWriteThroughArrays() throws Exception {
int length = randomIntBetween(1, randomBytes.length / 2);
int offset = randomIntBetween(0, randomBytes.length - length);
Channels.writeToChannel(randomBytes, offset, length, fileChannel);
int lengthToRead = randomIntBetween(1, length);
int offsetToRead = randomIntBetween(0, length - lengthToRead);
byte[] readBytes = new byte[randomBytes.length];
Channels.readFromFileChannel(fileChannel, offsetToRead, readBytes, offset + offsetToRead, lengthToRead);
BytesReference source = new BytesArray(randomBytes, offset + offsetToRead, lengthToRead);
BytesReference read = new BytesArray(readBytes, offset + offsetToRead, lengthToRead);
assertThat("read bytes didn't match written bytes", source.toBytes(), Matchers.equalTo(read.toBytes()));
}
@Test(expected = EOFException.class)
public void testBufferReadPastEOFWithException() throws Exception {
int bytesToWrite = randomIntBetween(0, randomBytes.length - 1);
Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
Channels.readFromFileChannel(fileChannel, 0, bytesToWrite + 1 + randomInt(1000));
}
@Test
public void testBufferReadPastEOFWithoutException() throws Exception {
int bytesToWrite = randomIntBetween(0, randomBytes.length - 1);
Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
byte[] bytes = new byte[bytesToWrite + 1 + randomInt(1000)];
int read = Channels.readFromFileChannel(fileChannel, 0, bytes, 0, bytes.length);
assertThat(read, Matchers.lessThan(0));
}
@Test
public void testReadWriteThroughBuffers() throws IOException {
ByteBuffer source;
if (randomBoolean()) {
source = ByteBuffer.wrap(randomBytes);
} else {
source = ByteBuffer.allocateDirect(randomBytes.length);
source.put(randomBytes);
source.flip();
}
Channels.writeToChannel(source, fileChannel);
ByteBuffer copy;
if (randomBoolean()) {
copy = ByteBuffer.allocate(randomBytes.length);
} else {
copy = ByteBuffer.allocateDirect(randomBytes.length);
}
int read = Channels.readFromFileChannel(fileChannel, 0, copy);
assertThat(read, Matchers.equalTo(randomBytes.length));
byte[] copyBytes = new byte[read];
copy.flip();
copy.get(copyBytes);
assertThat("read bytes didn't match written bytes", randomBytes, Matchers.equalTo(copyBytes));
}
@Test
public void testPartialReadWriteThroughBuffers() throws IOException {
int length = randomIntBetween(1, randomBytes.length / 2);
int offset = randomIntBetween(0, randomBytes.length - length);
ByteBuffer source;
if (randomBoolean()) {
source = ByteBuffer.wrap(randomBytes, offset, length);
} else {
source = ByteBuffer.allocateDirect(length);
source.put(randomBytes, offset, length);
source.flip();
}
Channels.writeToChannel(source, fileChannel);
int lengthToRead = randomIntBetween(1, length);
int offsetToRead = randomIntBetween(0, length - lengthToRead);
ByteBuffer copy;
if (randomBoolean()) {
copy = ByteBuffer.allocate(lengthToRead);
} else {
copy = ByteBuffer.allocateDirect(lengthToRead);
}
int read = Channels.readFromFileChannel(fileChannel, offsetToRead, copy);
assertThat(read, Matchers.equalTo(lengthToRead));
copy.flip();
BytesReference sourceRef = new BytesArray(randomBytes, offset + offsetToRead, lengthToRead);
BytesReference copyRef = new ByteBufferBytesReference(copy);
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
}
@Test
public void testWriteFromChannel() throws IOException {
int length = randomIntBetween(1, randomBytes.length / 2);
int offset = randomIntBetween(0, randomBytes.length - length);
ByteBuffer byteBuffer = ByteBuffer.wrap(randomBytes);
ChannelBuffer source = new ByteBufferBackedChannelBuffer(byteBuffer);
Channels.writeToChannel(source, offset, length, fileChannel);
BytesReference copyRef = new BytesArray(Channels.readFromFileChannel(fileChannel, 0, length));
BytesReference sourceRef = new BytesArray(randomBytes, offset, length);
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
}
class MockFileChannel extends FileChannel {
FileChannel delegate;
public MockFileChannel(FileChannel delegate) {
this.delegate = delegate;
}
@Override
public int read(ByteBuffer dst) throws IOException {
// delay buffer read..
int willActuallyRead = randomInt(dst.remaining());
ByteBuffer mockDst = dst.duplicate();
mockDst.limit(mockDst.position() + willActuallyRead);
try {
return delegate.read(mockDst);
} finally {
dst.position(mockDst.position());
}
}
@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
return delegate.read(dsts, offset, length);
}
@Override
public int write(ByteBuffer src) throws IOException {
// delay buffer write..
int willActuallyWrite = randomInt(src.remaining());
ByteBuffer mockSrc = src.duplicate();
mockSrc.limit(mockSrc.position() + willActuallyWrite);
try {
return delegate.write(mockSrc);
} finally {
src.position(mockSrc.position());
}
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
return delegate.write(srcs, offset, length);
}
@Override
public long position() throws IOException {
return delegate.position();
}
@Override
public FileChannel position(long newPosition) throws IOException {
return delegate.position(newPosition);
}
@Override
public long size() throws IOException {
return delegate.size();
}
@Override
public FileChannel truncate(long size) throws IOException {
return delegate.truncate(size);
}
@Override
public void force(boolean metaData) throws IOException {
delegate.force(metaData);
}
@Override
public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
return delegate.transferTo(position, count, target);
}
@Override
public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
return delegate.transferFrom(src, position, count);
}
@Override
public int read(ByteBuffer dst, long position) throws IOException {
return delegate.read(dst, position);
}
@Override
public int write(ByteBuffer src, long position) throws IOException {
return delegate.write(src, position);
}
@Override
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
return delegate.map(mode, position, size);
}
@Override
public FileLock lock(long position, long size, boolean shared) throws IOException {
return delegate.lock(position, size, shared);
}
@Override
public FileLock tryLock(long position, long size, boolean shared) throws IOException {
return delegate.tryLock(position, size, shared);
}
@Override
protected void implCloseChannel() throws IOException {
delegate.close();
}
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.bytes;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;

View File

@ -20,9 +20,14 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
@ -30,28 +35,32 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public abstract class AbstractSimpleTranslogTests {
public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
protected Translog translog;
@Before
public void setUp() {
public void setUp() throws Exception {
super.setUp();
translog = create();
translog.newTranslog(1);
}
@After
public void tearDown() {
public void tearDown() throws Exception {
translog.closeWithDelete();
super.tearDown();
}
protected abstract Translog create();
@ -261,6 +270,134 @@ public abstract class AbstractSimpleTranslogTests {
snapshot.close();
}
static class LocationOperation {
final Translog.Operation operation;
final Translog.Location location;
public LocationOperation(Translog.Operation operation, Translog.Location location) {
this.operation = operation;
this.location = location;
}
}
@Test
public void testConcurrentWritesWithVaryingSize() throws Throwable {
final int opsPerThread = randomIntBetween(10, 200);
int threadCount = 2 + randomInt(5);
logger.info("testing with [{}] threads, each doing [{}] ops", threadCount, opsPerThread);
final BlockingQueue<LocationOperation> writtenOperations = new ArrayBlockingQueue<>(threadCount * opsPerThread);
Thread[] threads = new Thread[threadCount];
final Throwable[] threadExceptions = new Throwable[threadCount];
final CountDownLatch downLatch = new CountDownLatch(1);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
downLatch.await();
for (int opCount = 0; opCount < opsPerThread; opCount++) {
Translog.Operation op;
switch (randomFrom(Translog.Operation.Type.values())) {
case CREATE:
op = new Translog.Create("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case SAVE:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
case DELETE_BY_QUERY:
op = new Translog.DeleteByQuery(
new BytesArray(randomRealisticUnicodeOfLengthBetween(10, 400).getBytes("UTF-8")),
new String[]{randomRealisticUnicodeOfLengthBetween(10, 400)},
"test");
break;
default:
throw new ElasticsearchException("not supported op type");
}
Translog.Location loc = translog.add(op);
writtenOperations.add(new LocationOperation(op, loc));
}
} catch (Throwable t) {
threadExceptions[threadId] = t;
}
}
});
threads[i].setDaemon(true);
threads[i].start();
}
downLatch.countDown();
for (int i = 0; i < threadCount; i++) {
if (threadExceptions[i] != null) {
throw threadExceptions[i];
}
threads[i].join(60 * 1000);
}
for (LocationOperation locationOperation : writtenOperations) {
byte[] data = translog.read(locationOperation.location);
StreamInput streamInput = new BytesStreamInput(data, false);
streamInput.readInt(); // size
Translog.Operation op = TranslogStreams.readTranslogOperation(streamInput);
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
case SAVE:
Translog.Index indexOp = (Translog.Index) op;
Translog.Index expIndexOp = (Translog.Index) expectedOp;
assertEquals(expIndexOp.id(), indexOp.id());
assertEquals(expIndexOp.routing(), indexOp.routing());
assertEquals(expIndexOp.type(), indexOp.type());
assertEquals(expIndexOp.source(), indexOp.source());
assertEquals(expIndexOp.version(), indexOp.version());
assertEquals(expIndexOp.versionType(), indexOp.versionType());
break;
case CREATE:
Translog.Create createOp = (Translog.Create) op;
Translog.Create expCreateOp = (Translog.Create) expectedOp;
assertEquals(expCreateOp.id(), createOp.id());
assertEquals(expCreateOp.routing(), createOp.routing());
assertEquals(expCreateOp.type(), createOp.type());
assertEquals(expCreateOp.source(), createOp.source());
assertEquals(expCreateOp.version(), createOp.version());
assertEquals(expCreateOp.versionType(), createOp.versionType());
break;
case DELETE:
Translog.Delete delOp = (Translog.Delete) op;
Translog.Delete expDelOp = (Translog.Delete) expectedOp;
assertEquals(expDelOp.uid(), delOp.uid());
assertEquals(expDelOp.version(), delOp.version());
assertEquals(expDelOp.versionType(), delOp.versionType());
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery delQueryOp = (Translog.DeleteByQuery) op;
Translog.DeleteByQuery expDelQueryOp = (Translog.DeleteByQuery) expectedOp;
assertThat(expDelQueryOp.source(), equalTo(delQueryOp.source()));
assertThat(expDelQueryOp.filteringAliases(), equalTo(delQueryOp.filteringAliases()));
assertThat(expDelQueryOp.types(), equalTo(delQueryOp.types()));
break;
default:
throw new ElasticsearchException("unsupported opType");
}
}
}
private Term newUid(String id) {
return new Term("_uid", id);
}

View File

@ -21,8 +21,8 @@ package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.AbstractSimpleTranslogTests;
import org.elasticsearch.index.translog.Translog;
import org.junit.AfterClass;
import java.io.File;
@ -35,12 +35,16 @@ public class FsBufferedTranslogTests extends AbstractSimpleTranslogTests {
@Override
protected Translog create() {
return new FsTranslog(shardId,
ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name()).build(),
new File("data/fs-translog"));
ImmutableSettings.settingsBuilder()
.put("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name())
.put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024))
.build(),
new File("data/fs-buf-translog")
);
}
@AfterClass
public static void cleanup() {
FileSystemUtils.deleteRecursively(new File("data/fs-translog"), true);
FileSystemUtils.deleteRecursively(new File("data/fs-buf-translog"), true);
}
}

View File

@ -21,8 +21,8 @@ package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.AbstractSimpleTranslogTests;
import org.elasticsearch.index.translog.Translog;
import org.junit.AfterClass;
import java.io.File;
@ -36,11 +36,11 @@ public class FsSimpleTranslogTests extends AbstractSimpleTranslogTests {
protected Translog create() {
return new FsTranslog(shardId,
ImmutableSettings.settingsBuilder().put("index.translog.fs.type", FsTranslogFile.Type.SIMPLE.name()).build(),
new File("data/fs-translog"));
new File("data/fs-simple-translog"));
}
@AfterClass
public static void cleanup() {
FileSystemUtils.deleteRecursively(new File("data/fs-translog"), true);
FileSystemUtils.deleteRecursively(new File("data/fs-simple-translog"), true);
}
}