Gateway: Failure to read full translog from the gateway, closes #328.

This commit is contained in:
kimchy 2010-08-18 14:29:46 +03:00
parent 3f9034b41c
commit 2259ef671b
11 changed files with 52 additions and 743 deletions

View File

@ -57,9 +57,9 @@ public class FsAppendableBlobContainer extends AbstractFsBlobContainer implement
raf = new RandomAccessFile(file, "rw");
raf.seek(raf.length());
listener.withStream(new DataOutputStreamOutput(raf));
listener.onCompleted();
raf.close();
FileSystemUtils.syncFile(file);
listener.onCompleted();
} catch (IOException e) {
listener.onFailure(e);
} finally {

View File

@ -1,41 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import org.elasticsearch.common.util.concurrent.NotThreadSafe;
import java.io.DataInputStream;
/**
* @author kimchy (Shay Banon)
*/
@NotThreadSafe
public class ByteArrayDataInputStream extends DataInputStream {
/**
* Creates a DataInputStream that uses the specified
* underlying InputStream.
*
* @param source the specified source
*/
public ByteArrayDataInputStream(byte[] source) {
super(new FastByteArrayInputStream(source));
}
}

View File

@ -1,70 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.NotThreadSafe;
import java.io.DataOutputStream;
/**
* @author kimchy (Shay Banon)
*/
@NotThreadSafe
public class ByteArrayDataOutputStream extends DataOutputStream {
/**
* A thread local based cache of {@link ByteArrayDataOutputStream}.
*/
public static class Cached {
private static final ThreadLocal<ThreadLocals.CleanableValue<ByteArrayDataOutputStream>> cache = new ThreadLocal<ThreadLocals.CleanableValue<ByteArrayDataOutputStream>>() {
@Override protected ThreadLocals.CleanableValue<ByteArrayDataOutputStream> initialValue() {
return new ThreadLocals.CleanableValue<ByteArrayDataOutputStream>(new ByteArrayDataOutputStream());
}
};
/**
* Returns the cached thread local byte strean, with its internal stream cleared.
*/
public static ByteArrayDataOutputStream cached() {
ByteArrayDataOutputStream os = cache.get().get();
((FastByteArrayOutputStream) os.out).reset();
return os;
}
}
public ByteArrayDataOutputStream() {
super(new FastByteArrayOutputStream());
}
public byte[] copiedByteArray() {
return outputStream().copiedByteArray();
}
public byte[] unsafeByteArray() {
return outputStream().unsafeByteArray();
}
private FastByteArrayOutputStream outputStream() {
return (FastByteArrayOutputStream) out;
}
}

View File

@ -1,405 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import java.io.*;
/**
* @author kimchy (Shay Banon)
*/
public class FastDataOutputStream extends FilterOutputStream implements DataOutput {
/**
* The number of bytes written to the data output stream so far.
* If this counter overflows, it will be wrapped to Integer.MAX_VALUE.
*/
protected int written;
/**
* bytearr is initialized on demand by writeUTF
*/
private byte[] bytearr = null;
/**
* Creates a new data output stream to write data to the specified
* underlying output stream. The counter <code>written</code> is
* set to zero.
*
* @param out the underlying output stream, to be saved for later
* use.
* @see java.io.FilterOutputStream#out
*/
public FastDataOutputStream(OutputStream out) {
super(out);
}
/**
* Increases the written counter by the specified value
* until it reaches Integer.MAX_VALUE.
*/
private void incCount(int value) {
int temp = written + value;
if (temp < 0) {
temp = Integer.MAX_VALUE;
}
written = temp;
}
/**
* Writes the specified byte (the low eight bits of the argument
* <code>b</code>) to the underlying output stream. If no exception
* is thrown, the counter <code>written</code> is incremented by
* <code>1</code>.
* <p/>
* Implements the <code>write</code> method of <code>OutputStream</code>.
*
* @param b the <code>byte</code> to be written.
* @throws java.io.IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public void write(int b) throws IOException {
out.write(b);
incCount(1);
}
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> to the underlying output stream.
* If no exception is thrown, the counter <code>written</code> is
* incremented by <code>len</code>.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public void write(byte b[], int off, int len)
throws IOException {
out.write(b, off, len);
incCount(len);
}
/**
* Flushes this data output stream. This forces any buffered output
* bytes to be written out to the stream.
* <p/>
* The <code>flush</code> method of <code>DataOutputStream</code>
* calls the <code>flush</code> method of its underlying output stream.
*
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
* @see java.io.OutputStream#flush()
*/
public void flush() throws IOException {
out.flush();
}
/**
* Writes a <code>boolean</code> to the underlying output stream as
* a 1-byte value. The value <code>true</code> is written out as the
* value <code>(byte)1</code>; the value <code>false</code> is
* written out as the value <code>(byte)0</code>. If no exception is
* thrown, the counter <code>written</code> is incremented by
* <code>1</code>.
*
* @param v a <code>boolean</code> value to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public final void writeBoolean(boolean v) throws IOException {
out.write(v ? 1 : 0);
incCount(1);
}
/**
* Writes out a <code>byte</code> to the underlying output stream as
* a 1-byte value. If no exception is thrown, the counter
* <code>written</code> is incremented by <code>1</code>.
*
* @param v a <code>byte</code> value to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public final void writeByte(int v) throws IOException {
out.write(v);
incCount(1);
}
/**
* Writes a <code>short</code> to the underlying output stream as two
* bytes, high byte first. If no exception is thrown, the counter
* <code>written</code> is incremented by <code>2</code>.
*
* @param v a <code>short</code> to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public final void writeShort(int v) throws IOException {
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
incCount(2);
}
/**
* Writes a <code>char</code> to the underlying output stream as a
* 2-byte value, high byte first. If no exception is thrown, the
* counter <code>written</code> is incremented by <code>2</code>.
*
* @param v a <code>char</code> value to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public final void writeChar(int v) throws IOException {
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
incCount(2);
}
/**
* Writes an <code>int</code> to the underlying output stream as four
* bytes, high byte first. If no exception is thrown, the counter
* <code>written</code> is incremented by <code>4</code>.
*
* @param v an <code>int</code> to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public final void writeInt(int v) throws IOException {
out.write((v >>> 24) & 0xFF);
out.write((v >>> 16) & 0xFF);
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
incCount(4);
}
private byte writeBuffer[] = new byte[8];
/**
* Writes a <code>long</code> to the underlying output stream as eight
* bytes, high byte first. In no exception is thrown, the counter
* <code>written</code> is incremented by <code>8</code>.
*
* @param v a <code>long</code> to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public final void writeLong(long v) throws IOException {
writeBuffer[0] = (byte) (v >>> 56);
writeBuffer[1] = (byte) (v >>> 48);
writeBuffer[2] = (byte) (v >>> 40);
writeBuffer[3] = (byte) (v >>> 32);
writeBuffer[4] = (byte) (v >>> 24);
writeBuffer[5] = (byte) (v >>> 16);
writeBuffer[6] = (byte) (v >>> 8);
writeBuffer[7] = (byte) (v >>> 0);
out.write(writeBuffer, 0, 8);
incCount(8);
}
/**
* Converts the float argument to an <code>int</code> using the
* <code>floatToIntBits</code> method in class <code>Float</code>,
* and then writes that <code>int</code> value to the underlying
* output stream as a 4-byte quantity, high byte first. If no
* exception is thrown, the counter <code>written</code> is
* incremented by <code>4</code>.
*
* @param v a <code>float</code> value to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
* @see java.lang.Float#floatToIntBits(float)
*/
public final void writeFloat(float v) throws IOException {
writeInt(Float.floatToIntBits(v));
}
/**
* Converts the double argument to a <code>long</code> using the
* <code>doubleToLongBits</code> method in class <code>Double</code>,
* and then writes that <code>long</code> value to the underlying
* output stream as an 8-byte quantity, high byte first. If no
* exception is thrown, the counter <code>written</code> is
* incremented by <code>8</code>.
*
* @param v a <code>double</code> value to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
* @see java.lang.Double#doubleToLongBits(double)
*/
public final void writeDouble(double v) throws IOException {
writeLong(Double.doubleToLongBits(v));
}
/**
* Writes out the string to the underlying output stream as a
* sequence of bytes. Each character in the string is written out, in
* sequence, by discarding its high eight bits. If no exception is
* thrown, the counter <code>written</code> is incremented by the
* length of <code>s</code>.
*
* @param s a string of bytes to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
public final void writeBytes(String s) throws IOException {
int len = s.length();
for (int i = 0; i < len; i++) {
out.write((byte) s.charAt(i));
}
incCount(len);
}
/**
* Writes a string to the underlying output stream as a sequence of
* characters. Each character is written to the data output stream as
* if by the <code>writeChar</code> method. If no exception is
* thrown, the counter <code>written</code> is incremented by twice
* the length of <code>s</code>.
*
* @param s a <code>String</code> value to be written.
* @throws IOException if an I/O error occurs.
* @see java.io.DataOutputStream#writeChar(int)
* @see java.io.FilterOutputStream#out
*/
public final void writeChars(String s) throws IOException {
int len = s.length();
for (int i = 0; i < len; i++) {
int v = s.charAt(i);
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
}
incCount(len * 2);
}
/**
* Writes a string to the underlying output stream using
* <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
* encoding in a machine-independent manner.
* <p/>
* First, two bytes are written to the output stream as if by the
* <code>writeShort</code> method giving the number of bytes to
* follow. This value is the number of bytes actually written out,
* not the length of the string. Following the length, each character
* of the string is output, in sequence, using the modified UTF-8 encoding
* for the character. If no exception is thrown, the counter
* <code>written</code> is incremented by the total number of
* bytes written to the output stream. This will be at least two
* plus the length of <code>str</code>, and at most two plus
* thrice the length of <code>str</code>.
*
* @param str a string to be written.
* @throws IOException if an I/O error occurs.
*/
public final void writeUTF(String str) throws IOException {
writeUTF(str, this);
}
/**
* Writes a string to the specified DataOutput using
* <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
* encoding in a machine-independent manner.
* <p/>
* First, two bytes are written to out as if by the <code>writeShort</code>
* method giving the number of bytes to follow. This value is the number of
* bytes actually written out, not the length of the string. Following the
* length, each character of the string is output, in sequence, using the
* modified UTF-8 encoding for the character. If no exception is thrown, the
* counter <code>written</code> is incremented by the total number of
* bytes written to the output stream. This will be at least two
* plus the length of <code>str</code>, and at most two plus
* thrice the length of <code>str</code>.
*
* @param str a string to be written.
* @param out destination to write to
* @return The number of bytes written out.
* @throws IOException if an I/O error occurs.
*/
static int writeUTF(String str, DataOutput out) throws IOException {
int strlen = str.length();
int utflen = 0;
int c, count = 0;
/* use charAt instead of copying String to char array */
for (int i = 0; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
} else {
utflen += 2;
}
}
if (utflen > 65535)
throw new UTFDataFormatException(
"encoded string too long: " + utflen + " bytes");
byte[] bytearr = null;
if (out instanceof FastDataOutputStream) {
FastDataOutputStream dos = (FastDataOutputStream) out;
if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2)))
dos.bytearr = new byte[(utflen * 2) + 2];
bytearr = dos.bytearr;
} else {
bytearr = new byte[utflen + 2];
}
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
int i = 0;
for (i = 0; i < strlen; i++) {
c = str.charAt(i);
if (!((c >= 0x0001) && (c <= 0x007F))) break;
bytearr[count++] = (byte) c;
}
for (; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
bytearr[count++] = (byte) c;
} else if (c > 0x07FF) {
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
} else {
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
out.write(bytearr, 0, utflen + 2);
return utflen + 2;
}
/**
* Returns the current value of the counter <code>written</code>,
* the number of bytes written to this data output stream so far.
* If the counter overflows, it will be wrapped to Integer.MAX_VALUE.
*
* @return the value of the <code>written</code> field.
* @see java.io.DataOutputStream#written
*/
public final int size() {
return written;
}
}

View File

@ -66,6 +66,10 @@ public class BytesStreamOutput extends StreamOutput {
count = newcount;
}
public void seek(int seekTo) {
count = seekTo;
}
public void reset() {
count = 0;
}

View File

@ -1,58 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.Closeable;
import java.io.DataInput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class DataInputStreamInput extends StreamInput {
private final DataInput in;
public DataInputStreamInput(DataInput in) {
this.in = in;
}
@Override public int read() throws IOException {
return in.readByte() & 0xFF;
}
@Override public byte readByte() throws IOException {
return in.readByte();
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readFully(b, offset, len);
}
@Override public void reset() throws IOException {
throw new UnsupportedOperationException();
}
@Override public void close() throws IOException {
if (in instanceof Closeable) {
((Closeable) in).close();
}
}
}

View File

@ -35,7 +35,7 @@ public class DataOutputStreamOutput extends StreamOutput {
}
@Override public void writeByte(byte b) throws IOException {
out.write(b);
out.writeByte(b);
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {

View File

@ -1,70 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
/**
* @author kimchy (shay.banon)
*/
public class InputStreamStreamInput extends StreamInput {
private final InputStream is;
public InputStreamStreamInput(InputStream is) {
this.is = is;
}
@Override public int read() throws IOException {
return is.read();
}
@Override public int read(byte[] b) throws IOException {
return is.read(b);
}
@Override public int read(byte[] b, int off, int len) throws IOException {
return is.read(b, off, len);
}
@Override public byte readByte() throws IOException {
return (byte) is.read();
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
int n = 0;
while (n < len) {
int count = is.read(b, offset + n, len - n);
if (count < 0)
throw new EOFException();
n += count;
}
}
@Override public void reset() throws IOException {
is.reset();
}
@Override public void close() throws IOException {
is.close();
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.IOException;
import java.io.OutputStream;
/**
* @author kimchy (shay.banon)
*/
public class OutputStreamStreamOutput extends StreamOutput {
private final OutputStream os;
public OutputStreamStreamOutput(OutputStream os) {
this.os = os;
}
@Override public void writeByte(byte b) throws IOException {
os.write(b);
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
os.write(b, offset, length);
}
@Override public void reset() throws IOException {
// nothing to do
}
@Override public void flush() throws IOException {
os.flush();
}
@Override public void close() throws IOException {
os.close();
}
}

View File

@ -26,14 +26,14 @@ import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Digest;
import org.elasticsearch.common.Hex;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
@ -58,7 +58,6 @@ import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
@ -96,8 +95,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final ConcurrentMap<String, String> cachedMd5 = ConcurrentCollections.newConcurrentMap();
private volatile SoftReference<FastByteArrayOutputStream> cachedBos = new SoftReference<FastByteArrayOutputStream>(new FastByteArrayOutputStream());
private volatile AppendableBlobContainer.AppendableBlob translogBlob;
private volatile RecoveryStatus recoveryStatus;
@ -291,18 +288,19 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (!snapshot.newTranslogCreated()) {
translogSnapshot.seekForward(snapshot.lastTranslogPosition());
}
FastByteArrayOutputStream bos = cachedBos.get();
if (bos == null) {
bos = new FastByteArrayOutputStream();
cachedBos = new SoftReference<FastByteArrayOutputStream>(bos);
}
OutputStreamStreamOutput bosOs = new OutputStreamStreamOutput(bos);
BytesStreamOutput bout = CachedStreamOutput.cachedBytes();
while (translogSnapshot.hasNext()) {
bos.reset();
TranslogStreams.writeTranslogOperation(bosOs, translogSnapshot.next());
bosOs.flush();
os.writeVInt(bos.size());
os.writeBytes(bos.unsafeByteArray(), bos.size());
bout.reset();
bout.writeInt(0);
TranslogStreams.writeTranslogOperation(bout, translogSnapshot.next());
bout.flush();
int size = bout.size();
bout.seek(0);
bout.writeInt(size - 4);
os.writeBytes(bout.unsafeByteArray(), size);
currentSnapshotStatus.translog().addTranslogOperations(1);
}
}
@ -435,7 +433,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
StopWatch timer = new StopWatch().start();
try {
indexShard.performRecoveryPrepareForTranslog();
@ -451,12 +448,19 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
return;
}
bos.write(data, offset, size);
// if we don't have enough to read the header size of the first translog, bail and wait for the next one
if (bos.size() < 4) {
return;
}
BytesStreamInput si = new BytesStreamInput(bos.unsafeByteArray(), 0, bos.size());
int position;
while (true) {
try {
position = si.position();
int opSize = si.readVInt();
if (position + 4 > bos.size()) {
break;
}
int opSize = si.readInt();
int curPos = si.position();
if ((si.position() + opSize) > bos.size()) {
break;

View File

@ -20,8 +20,8 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -36,7 +36,6 @@ import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -54,9 +53,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
private final AtomicInteger operationCounter = new AtomicInteger();
private RafReference raf;
private long lastPosition = 0;
private volatile SoftReference<FastByteArrayOutputStream> cachedBos = new SoftReference<FastByteArrayOutputStream>(new FastByteArrayOutputStream());
private RafReference raf;
@Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
super(shardId, indexSettings);
@ -91,6 +90,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
@Override public void newTranslog(long id) throws TranslogException {
synchronized (mutex) {
operationCounter.set(0);
lastPosition = 0;
this.id = id;
if (raf != null) {
raf.decreaseRefCount();
@ -105,23 +105,23 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
@Override public void add(Operation operation) throws TranslogException {
synchronized (mutex) {
FastByteArrayOutputStream bos = cachedBos.get();
if (bos == null) {
bos = new FastByteArrayOutputStream();
cachedBos = new SoftReference<FastByteArrayOutputStream>(bos);
}
try {
bos.reset();
OutputStreamStreamOutput bosOs = new OutputStreamStreamOutput(bos);
TranslogStreams.writeTranslogOperation(bosOs, operation);
bosOs.flush();
raf.raf().writeInt(bos.size());
raf.raf().write(bos.unsafeByteArray(), 0, bos.size());
try {
BytesStreamOutput out = CachedStreamOutput.cachedBytes();
out.writeInt(0); // marker for the size...
TranslogStreams.writeTranslogOperation(out, operation);
out.flush();
int size = out.size();
out.seek(0);
out.writeInt(size - 4);
synchronized (mutex) {
raf.raf().write(out.unsafeByteArray(), 0, size);
lastPosition += size;
operationCounter.incrementAndGet();
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
}
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
}
}
@ -130,9 +130,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
try {
raf.increaseRefCount();
if (useStream) {
return new FsStreamSnapshot(shardId, this.id, raf, raf.raf().getFilePointer());
return new FsStreamSnapshot(shardId, this.id, raf, lastPosition);
} else {
return new FsChannelSnapshot(shardId, this.id, raf, raf.raf().getFilePointer());
return new FsChannelSnapshot(shardId, this.id, raf, lastPosition);
}
} catch (IOException e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
@ -148,11 +148,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
try {
raf.increaseRefCount();
if (useStream) {
FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, raf.raf().getFilePointer());
FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastPosition);
newSnapshot.seekForward(snapshot.position());
return newSnapshot;
} else {
FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, raf.raf().getFilePointer());
FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastPosition);
newSnapshot.seekForward(snapshot.position());
return newSnapshot;
}