From 2259ef671b2fd9656d039440cd6525e30f6abe67 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 18 Aug 2010 14:29:46 +0300 Subject: [PATCH] Gateway: Failure to read full translog from the gateway, closes #328. --- .../fs/FsAppendableBlobContainer.java | 2 +- .../common/io/ByteArrayDataInputStream.java | 41 -- .../common/io/ByteArrayDataOutputStream.java | 70 --- .../common/io/FastDataOutputStream.java | 405 ------------------ .../common/io/stream/BytesStreamOutput.java | 4 + .../io/stream/DataInputStreamInput.java | 58 --- .../io/stream/DataOutputStreamOutput.java | 2 +- .../io/stream/InputStreamStreamInput.java | 70 --- .../io/stream/OutputStreamStreamOutput.java | 55 --- .../blobstore/BlobStoreIndexShardGateway.java | 40 +- .../index/translog/fs/FsTranslog.java | 48 +-- 11 files changed, 52 insertions(+), 743 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataInputStream.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataOutputStream.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastDataOutputStream.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataInputStreamInput.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/OutputStreamStreamOutput.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java index d958617db6d..32b37a28b26 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java @@ -57,9 +57,9 @@ public class FsAppendableBlobContainer extends AbstractFsBlobContainer implement raf = new RandomAccessFile(file, "rw"); raf.seek(raf.length()); listener.withStream(new DataOutputStreamOutput(raf)); - listener.onCompleted(); raf.close(); FileSystemUtils.syncFile(file); + listener.onCompleted(); } catch (IOException e) { listener.onFailure(e); } finally { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataInputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataInputStream.java deleted file mode 100644 index f37a6f8c9d6..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataInputStream.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io; - -import org.elasticsearch.common.util.concurrent.NotThreadSafe; - -import java.io.DataInputStream; - -/** - * @author kimchy (Shay Banon) - */ -@NotThreadSafe -public class ByteArrayDataInputStream extends DataInputStream { - - /** - * Creates a DataInputStream that uses the specified - * underlying InputStream. - * - * @param source the specified source - */ - public ByteArrayDataInputStream(byte[] source) { - super(new FastByteArrayInputStream(source)); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataOutputStream.java deleted file mode 100644 index 65838558035..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/ByteArrayDataOutputStream.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io; - -import org.elasticsearch.common.thread.ThreadLocals; -import org.elasticsearch.common.util.concurrent.NotThreadSafe; - -import java.io.DataOutputStream; - -/** - * @author kimchy (Shay Banon) - */ -@NotThreadSafe -public class ByteArrayDataOutputStream extends DataOutputStream { - - /** - * A thread local based cache of {@link ByteArrayDataOutputStream}. - */ - public static class Cached { - - private static final ThreadLocal> cache = new ThreadLocal>() { - @Override protected ThreadLocals.CleanableValue initialValue() { - return new ThreadLocals.CleanableValue(new ByteArrayDataOutputStream()); - } - }; - - /** - * Returns the cached thread local byte strean, with its internal stream cleared. - */ - public static ByteArrayDataOutputStream cached() { - ByteArrayDataOutputStream os = cache.get().get(); - ((FastByteArrayOutputStream) os.out).reset(); - return os; - } - } - - - public ByteArrayDataOutputStream() { - super(new FastByteArrayOutputStream()); - } - - public byte[] copiedByteArray() { - return outputStream().copiedByteArray(); - } - - public byte[] unsafeByteArray() { - return outputStream().unsafeByteArray(); - } - - private FastByteArrayOutputStream outputStream() { - return (FastByteArrayOutputStream) out; - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastDataOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastDataOutputStream.java deleted file mode 100644 index b36bec3b3ab..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastDataOutputStream.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io; - -import java.io.*; - -/** - * @author kimchy (Shay Banon) - */ -public class FastDataOutputStream extends FilterOutputStream implements DataOutput { - /** - * The number of bytes written to the data output stream so far. - * If this counter overflows, it will be wrapped to Integer.MAX_VALUE. - */ - protected int written; - - /** - * bytearr is initialized on demand by writeUTF - */ - private byte[] bytearr = null; - - /** - * Creates a new data output stream to write data to the specified - * underlying output stream. The counter written is - * set to zero. - * - * @param out the underlying output stream, to be saved for later - * use. - * @see java.io.FilterOutputStream#out - */ - public FastDataOutputStream(OutputStream out) { - super(out); - } - - /** - * Increases the written counter by the specified value - * until it reaches Integer.MAX_VALUE. - */ - private void incCount(int value) { - int temp = written + value; - if (temp < 0) { - temp = Integer.MAX_VALUE; - } - written = temp; - } - - /** - * Writes the specified byte (the low eight bits of the argument - * b) to the underlying output stream. If no exception - * is thrown, the counter written is incremented by - * 1. - *

- * Implements the write method of OutputStream. - * - * @param b the byte to be written. - * @throws java.io.IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public void write(int b) throws IOException { - out.write(b); - incCount(1); - } - - /** - * Writes len bytes from the specified byte array - * starting at offset off to the underlying output stream. - * If no exception is thrown, the counter written is - * incremented by len. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public void write(byte b[], int off, int len) - throws IOException { - out.write(b, off, len); - incCount(len); - } - - /** - * Flushes this data output stream. This forces any buffered output - * bytes to be written out to the stream. - *

- * The flush method of DataOutputStream - * calls the flush method of its underlying output stream. - * - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.io.OutputStream#flush() - */ - public void flush() throws IOException { - out.flush(); - } - - /** - * Writes a boolean to the underlying output stream as - * a 1-byte value. The value true is written out as the - * value (byte)1; the value false is - * written out as the value (byte)0. If no exception is - * thrown, the counter written is incremented by - * 1. - * - * @param v a boolean value to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public final void writeBoolean(boolean v) throws IOException { - out.write(v ? 1 : 0); - incCount(1); - } - - /** - * Writes out a byte to the underlying output stream as - * a 1-byte value. If no exception is thrown, the counter - * written is incremented by 1. - * - * @param v a byte value to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public final void writeByte(int v) throws IOException { - out.write(v); - incCount(1); - } - - /** - * Writes a short to the underlying output stream as two - * bytes, high byte first. If no exception is thrown, the counter - * written is incremented by 2. - * - * @param v a short to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public final void writeShort(int v) throws IOException { - out.write((v >>> 8) & 0xFF); - out.write((v >>> 0) & 0xFF); - incCount(2); - } - - /** - * Writes a char to the underlying output stream as a - * 2-byte value, high byte first. If no exception is thrown, the - * counter written is incremented by 2. - * - * @param v a char value to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public final void writeChar(int v) throws IOException { - out.write((v >>> 8) & 0xFF); - out.write((v >>> 0) & 0xFF); - incCount(2); - } - - /** - * Writes an int to the underlying output stream as four - * bytes, high byte first. If no exception is thrown, the counter - * written is incremented by 4. - * - * @param v an int to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public final void writeInt(int v) throws IOException { - out.write((v >>> 24) & 0xFF); - out.write((v >>> 16) & 0xFF); - out.write((v >>> 8) & 0xFF); - out.write((v >>> 0) & 0xFF); - incCount(4); - } - - private byte writeBuffer[] = new byte[8]; - - /** - * Writes a long to the underlying output stream as eight - * bytes, high byte first. In no exception is thrown, the counter - * written is incremented by 8. - * - * @param v a long to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public final void writeLong(long v) throws IOException { - writeBuffer[0] = (byte) (v >>> 56); - writeBuffer[1] = (byte) (v >>> 48); - writeBuffer[2] = (byte) (v >>> 40); - writeBuffer[3] = (byte) (v >>> 32); - writeBuffer[4] = (byte) (v >>> 24); - writeBuffer[5] = (byte) (v >>> 16); - writeBuffer[6] = (byte) (v >>> 8); - writeBuffer[7] = (byte) (v >>> 0); - out.write(writeBuffer, 0, 8); - incCount(8); - } - - /** - * Converts the float argument to an int using the - * floatToIntBits method in class Float, - * and then writes that int value to the underlying - * output stream as a 4-byte quantity, high byte first. If no - * exception is thrown, the counter written is - * incremented by 4. - * - * @param v a float value to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.lang.Float#floatToIntBits(float) - */ - public final void writeFloat(float v) throws IOException { - writeInt(Float.floatToIntBits(v)); - } - - /** - * Converts the double argument to a long using the - * doubleToLongBits method in class Double, - * and then writes that long value to the underlying - * output stream as an 8-byte quantity, high byte first. If no - * exception is thrown, the counter written is - * incremented by 8. - * - * @param v a double value to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.lang.Double#doubleToLongBits(double) - */ - public final void writeDouble(double v) throws IOException { - writeLong(Double.doubleToLongBits(v)); - } - - /** - * Writes out the string to the underlying output stream as a - * sequence of bytes. Each character in the string is written out, in - * sequence, by discarding its high eight bits. If no exception is - * thrown, the counter written is incremented by the - * length of s. - * - * @param s a string of bytes to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - public final void writeBytes(String s) throws IOException { - int len = s.length(); - for (int i = 0; i < len; i++) { - out.write((byte) s.charAt(i)); - } - incCount(len); - } - - /** - * Writes a string to the underlying output stream as a sequence of - * characters. Each character is written to the data output stream as - * if by the writeChar method. If no exception is - * thrown, the counter written is incremented by twice - * the length of s. - * - * @param s a String value to be written. - * @throws IOException if an I/O error occurs. - * @see java.io.DataOutputStream#writeChar(int) - * @see java.io.FilterOutputStream#out - */ - public final void writeChars(String s) throws IOException { - int len = s.length(); - for (int i = 0; i < len; i++) { - int v = s.charAt(i); - out.write((v >>> 8) & 0xFF); - out.write((v >>> 0) & 0xFF); - } - incCount(len * 2); - } - - /** - * Writes a string to the underlying output stream using - * modified UTF-8 - * encoding in a machine-independent manner. - *

- * First, two bytes are written to the output stream as if by the - * writeShort method giving the number of bytes to - * follow. This value is the number of bytes actually written out, - * not the length of the string. Following the length, each character - * of the string is output, in sequence, using the modified UTF-8 encoding - * for the character. If no exception is thrown, the counter - * written is incremented by the total number of - * bytes written to the output stream. This will be at least two - * plus the length of str, and at most two plus - * thrice the length of str. - * - * @param str a string to be written. - * @throws IOException if an I/O error occurs. - */ - public final void writeUTF(String str) throws IOException { - writeUTF(str, this); - } - - /** - * Writes a string to the specified DataOutput using - * modified UTF-8 - * encoding in a machine-independent manner. - *

- * First, two bytes are written to out as if by the writeShort - * method giving the number of bytes to follow. This value is the number of - * bytes actually written out, not the length of the string. Following the - * length, each character of the string is output, in sequence, using the - * modified UTF-8 encoding for the character. If no exception is thrown, the - * counter written is incremented by the total number of - * bytes written to the output stream. This will be at least two - * plus the length of str, and at most two plus - * thrice the length of str. - * - * @param str a string to be written. - * @param out destination to write to - * @return The number of bytes written out. - * @throws IOException if an I/O error occurs. - */ - static int writeUTF(String str, DataOutput out) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) - throw new UTFDataFormatException( - "encoded string too long: " + utflen + " bytes"); - - byte[] bytearr = null; - if (out instanceof FastDataOutputStream) { - FastDataOutputStream dos = (FastDataOutputStream) out; - if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) - dos.bytearr = new byte[(utflen * 2) + 2]; - bytearr = dos.bytearr; - } else { - bytearr = new byte[utflen + 2]; - } - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) break; - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } - } - out.write(bytearr, 0, utflen + 2); - return utflen + 2; - } - - /** - * Returns the current value of the counter written, - * the number of bytes written to this data output stream so far. - * If the counter overflows, it will be wrapped to Integer.MAX_VALUE. - * - * @return the value of the written field. - * @see java.io.DataOutputStream#written - */ - public final int size() { - return written; - } -} - diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index a15183e55cb..96498b1f0fd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -66,6 +66,10 @@ public class BytesStreamOutput extends StreamOutput { count = newcount; } + public void seek(int seekTo) { + count = seekTo; + } + public void reset() { count = 0; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataInputStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataInputStreamInput.java deleted file mode 100644 index d35dbeff3b5..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataInputStreamInput.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import java.io.Closeable; -import java.io.DataInput; -import java.io.IOException; - -/** - * @author kimchy (shay.banon) - */ -public class DataInputStreamInput extends StreamInput { - - private final DataInput in; - - public DataInputStreamInput(DataInput in) { - this.in = in; - } - - @Override public int read() throws IOException { - return in.readByte() & 0xFF; - } - - @Override public byte readByte() throws IOException { - return in.readByte(); - } - - @Override public void readBytes(byte[] b, int offset, int len) throws IOException { - in.readFully(b, offset, len); - } - - @Override public void reset() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override public void close() throws IOException { - if (in instanceof Closeable) { - ((Closeable) in).close(); - } - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataOutputStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataOutputStreamOutput.java index 741beac817f..6f48528628e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataOutputStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/DataOutputStreamOutput.java @@ -35,7 +35,7 @@ public class DataOutputStreamOutput extends StreamOutput { } @Override public void writeByte(byte b) throws IOException { - out.write(b); + out.writeByte(b); } @Override public void writeBytes(byte[] b, int offset, int length) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java deleted file mode 100644 index ec50480e138..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -/** - * @author kimchy (shay.banon) - */ -public class InputStreamStreamInput extends StreamInput { - - private final InputStream is; - - public InputStreamStreamInput(InputStream is) { - this.is = is; - } - - @Override public int read() throws IOException { - return is.read(); - } - - @Override public int read(byte[] b) throws IOException { - return is.read(b); - } - - @Override public int read(byte[] b, int off, int len) throws IOException { - return is.read(b, off, len); - } - - @Override public byte readByte() throws IOException { - return (byte) is.read(); - } - - @Override public void readBytes(byte[] b, int offset, int len) throws IOException { - int n = 0; - while (n < len) { - int count = is.read(b, offset + n, len - n); - if (count < 0) - throw new EOFException(); - n += count; - } - } - - @Override public void reset() throws IOException { - is.reset(); - } - - @Override public void close() throws IOException { - is.close(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/OutputStreamStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/OutputStreamStreamOutput.java deleted file mode 100644 index c4afeea9a77..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/OutputStreamStreamOutput.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * @author kimchy (shay.banon) - */ -public class OutputStreamStreamOutput extends StreamOutput { - - private final OutputStream os; - - public OutputStreamStreamOutput(OutputStream os) { - this.os = os; - } - - @Override public void writeByte(byte b) throws IOException { - os.write(b); - } - - @Override public void writeBytes(byte[] b, int offset, int length) throws IOException { - os.write(b, offset, length); - } - - @Override public void reset() throws IOException { - // nothing to do - } - - @Override public void flush() throws IOException { - os.flush(); - } - - @Override public void close() throws IOException { - os.close(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index 0ba6018e75c..49722cdef44 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -26,14 +26,14 @@ import org.apache.lucene.store.IndexOutput; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.Digest; import org.elasticsearch.common.Hex; -import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput; @@ -58,7 +58,6 @@ import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.ref.SoftReference; import java.security.MessageDigest; import java.util.ArrayList; import java.util.List; @@ -96,8 +95,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo protected final ConcurrentMap cachedMd5 = ConcurrentCollections.newConcurrentMap(); - private volatile SoftReference cachedBos = new SoftReference(new FastByteArrayOutputStream()); - private volatile AppendableBlobContainer.AppendableBlob translogBlob; private volatile RecoveryStatus recoveryStatus; @@ -291,18 +288,19 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo if (!snapshot.newTranslogCreated()) { translogSnapshot.seekForward(snapshot.lastTranslogPosition()); } - FastByteArrayOutputStream bos = cachedBos.get(); - if (bos == null) { - bos = new FastByteArrayOutputStream(); - cachedBos = new SoftReference(bos); - } - OutputStreamStreamOutput bosOs = new OutputStreamStreamOutput(bos); + BytesStreamOutput bout = CachedStreamOutput.cachedBytes(); while (translogSnapshot.hasNext()) { - bos.reset(); - TranslogStreams.writeTranslogOperation(bosOs, translogSnapshot.next()); - bosOs.flush(); - os.writeVInt(bos.size()); - os.writeBytes(bos.unsafeByteArray(), bos.size()); + bout.reset(); + + bout.writeInt(0); + TranslogStreams.writeTranslogOperation(bout, translogSnapshot.next()); + bout.flush(); + + int size = bout.size(); + bout.seek(0); + bout.writeInt(size - 4); + + os.writeBytes(bout.unsafeByteArray(), size); currentSnapshotStatus.translog().addTranslogOperations(1); } } @@ -435,7 +433,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } - StopWatch timer = new StopWatch().start(); try { indexShard.performRecoveryPrepareForTranslog(); @@ -451,12 +448,19 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo return; } bos.write(data, offset, size); + // if we don't have enough to read the header size of the first translog, bail and wait for the next one + if (bos.size() < 4) { + return; + } BytesStreamInput si = new BytesStreamInput(bos.unsafeByteArray(), 0, bos.size()); int position; while (true) { try { position = si.position(); - int opSize = si.readVInt(); + if (position + 4 > bos.size()) { + break; + } + int opSize = si.readInt(); int curPos = si.position(); if ((si.position() + opSize) > bos.size()) { break; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 41047c4df62..c51703f6769 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -20,8 +20,8 @@ package org.elasticsearch.index.translog.fs; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.FastByteArrayOutputStream; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -36,7 +36,6 @@ import org.elasticsearch.index.translog.TranslogStreams; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.ref.SoftReference; import java.util.concurrent.atomic.AtomicInteger; /** @@ -54,9 +53,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog private final AtomicInteger operationCounter = new AtomicInteger(); - private RafReference raf; + private long lastPosition = 0; - private volatile SoftReference cachedBos = new SoftReference(new FastByteArrayOutputStream()); + private RafReference raf; @Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { super(shardId, indexSettings); @@ -91,6 +90,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog @Override public void newTranslog(long id) throws TranslogException { synchronized (mutex) { operationCounter.set(0); + lastPosition = 0; this.id = id; if (raf != null) { raf.decreaseRefCount(); @@ -105,23 +105,23 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } @Override public void add(Operation operation) throws TranslogException { - synchronized (mutex) { - FastByteArrayOutputStream bos = cachedBos.get(); - if (bos == null) { - bos = new FastByteArrayOutputStream(); - cachedBos = new SoftReference(bos); - } - try { - bos.reset(); - OutputStreamStreamOutput bosOs = new OutputStreamStreamOutput(bos); - TranslogStreams.writeTranslogOperation(bosOs, operation); - bosOs.flush(); - raf.raf().writeInt(bos.size()); - raf.raf().write(bos.unsafeByteArray(), 0, bos.size()); + try { + BytesStreamOutput out = CachedStreamOutput.cachedBytes(); + out.writeInt(0); // marker for the size... + TranslogStreams.writeTranslogOperation(out, operation); + out.flush(); + + int size = out.size(); + out.seek(0); + out.writeInt(size - 4); + + synchronized (mutex) { + raf.raf().write(out.unsafeByteArray(), 0, size); + lastPosition += size; operationCounter.incrementAndGet(); - } catch (Exception e) { - throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } + } catch (Exception e) { + throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } } @@ -130,9 +130,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog try { raf.increaseRefCount(); if (useStream) { - return new FsStreamSnapshot(shardId, this.id, raf, raf.raf().getFilePointer()); + return new FsStreamSnapshot(shardId, this.id, raf, lastPosition); } else { - return new FsChannelSnapshot(shardId, this.id, raf, raf.raf().getFilePointer()); + return new FsChannelSnapshot(shardId, this.id, raf, lastPosition); } } catch (IOException e) { throw new TranslogException(shardId, "Failed to snapshot", e); @@ -148,11 +148,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog try { raf.increaseRefCount(); if (useStream) { - FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, raf.raf().getFilePointer()); + FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastPosition); newSnapshot.seekForward(snapshot.position()); return newSnapshot; } else { - FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, raf.raf().getFilePointer()); + FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastPosition); newSnapshot.seekForward(snapshot.position()); return newSnapshot; }