Rewrite BytesStreamOutput on top of BigArrays/ByteArray.

Fix for #5159
This commit is contained in:
Holger Hoffstätte 2014-03-04 13:55:34 +01:00
parent 5ae1236857
commit 9c7032c427
4 changed files with 380 additions and 88 deletions

View File

@ -19,38 +19,52 @@
package org.elasticsearch.common.io.stream;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import java.io.IOException;
/**
*
* A @link {@link StreamOutput} that uses{@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation & copying of the internal data.
*/
public class BytesStreamOutput extends StreamOutput implements BytesStream {
public static final int DEFAULT_SIZE = 2 * 1024;
public static final int OVERSIZE_LIMIT = 256 * 1024;
/**
* Factory/manager for our ByteArray
*/
private final BigArrays bigarrays;
/**
* The buffer where data is stored.
* The internal list of pages.
*/
protected byte buf[];
private ByteArray bytes;
/**
* The number of valid bytes in the buffer.
*/
protected int count;
private int count;
/**
* Create a nonrecycling {@link BytesStreamOutput} with 1 initial page acquired.
*/
public BytesStreamOutput() {
this(DEFAULT_SIZE);
this(BigArrays.PAGE_SIZE_IN_BYTES);
}
public BytesStreamOutput(int size) {
this.buf = new byte[size];
/**
* Create a nonrecycling {@link BytesStreamOutput} with enough initial pages acquired
* to satisfy the capacity given by {@link expectedSize}.
*
* @param expectedSize the expected maximum size of the stream in bytes.
*/
public BytesStreamOutput(int expectedSize) {
bigarrays = BigArrays.NON_RECYCLING_INSTANCE;
bytes = bigarrays.newByteArray(expectedSize);
}
@Override
@ -63,88 +77,90 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream {
return count;
}
@Override
public void seek(long position) throws IOException {
if (position > Integer.MAX_VALUE) {
throw new UnsupportedOperationException();
}
count = (int) position;
}
@Override
public void writeByte(byte b) throws IOException {
int newcount = count + 1;
if (newcount > buf.length) {
buf = grow(newcount);
}
buf[count] = b;
count = newcount;
}
public void skip(int length) {
int newcount = count + length;
if (newcount > buf.length) {
buf = grow(newcount);
}
count = newcount;
ensureCapacity(count+1);
bytes.set(count, b);
count++;
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
// nothing to copy
if (length == 0) {
return;
}
int newcount = count + length;
if (newcount > buf.length) {
buf = grow(newcount);
}
System.arraycopy(b, offset, buf, count, length);
count = newcount;
// illegal args: offset and/or length exceed array size
if (b.length < (offset + length)) {
throw new IllegalArgumentException("Illegal offset " + offset + "/length " + length + " for byte[] of length " + b.length);
}
private byte[] grow(int newCount) {
// try and grow faster while we are small...
if (newCount < OVERSIZE_LIMIT) {
newCount = Math.max(buf.length << 1, newCount);
}
return ArrayUtil.grow(buf, newCount);
}
// get enough pages for new size
ensureCapacity(count+length);
public void seek(int seekTo) {
count = seekTo;
// bulk copy
bytes.set(count, b, offset, length);
// advance
count += length;
}
public void reset() {
count = 0;
// shrink list of pages
if (bytes.size() > BigArrays.PAGE_SIZE_IN_BYTES) {
bytes = bigarrays.resize(bytes, BigArrays.PAGE_SIZE_IN_BYTES);
}
public int bufferSize() {
return buf.length;
// go back to start
count = 0;
}
@Override
public void flush() throws IOException {
// nothing to do there
// nothing to do
}
@Override
public void seek(long position) throws IOException {
if (position > Integer.MAX_VALUE) {
throw new IllegalArgumentException("position " + position + " > Integer.MAX_VALUE");
}
count = (int)position;
ensureCapacity(count);
}
public void skip(int length) {
count += length;
ensureCapacity(count);
}
@Override
public void close() throws IOException {
// nothing to do here
}
@Override
public BytesReference bytes() {
return new BytesArray(buf, 0, count);
// empty for now.
}
/**
* Returns the current size of the buffer.
*
* @return the value of the <code>count</code> field, which is the number
* of valid bytes in this output stream.
* @return the value of the <code>count</code> field, which is the number of valid
* bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
*/
public int size() {
return count;
}
@Override
public BytesReference bytes() {
BytesRef bref = new BytesRef();
bytes.get(0, count, bref);
return new BytesArray(bref, false);
}
private void ensureCapacity(int offset) {
bytes = bigarrays.grow(bytes, offset);
}
}

View File

@ -21,7 +21,7 @@ package org.elasticsearch.index.translog.fs;
import jsr166y.ThreadLocalRandom;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -34,6 +34,7 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File;
@ -340,18 +341,25 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
TranslogStreams.writeTranslogOperation(out, operation);
out.flush();
// write size to beginning of stream
int size = out.size();
out.seek(0);
out.writeInt(size - 4);
Location location = current.add(out.bytes().array(), out.bytes().arrayOffset(), size);
// seek back to end
out.seek(size);
BytesReference ref = out.bytes();
byte[] refBytes = ref.array();
int refBytesOffset = ref.arrayOffset();
Location location = current.add(refBytes, refBytesOffset, size);
if (syncOnEachOperation) {
current.sync();
}
FsTranslogFile trans = this.trans;
if (trans != null) {
try {
location = trans.add(out.bytes().array(), out.bytes().arrayOffset(), size);
location = trans.add(refBytes, refBytesOffset, size);
} catch (ClosedChannelException e) {
// ignore
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -221,11 +222,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
// TODO: Can we make it atomic?
throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists");
}
snapshotsBlobContainer.writeBlob(snapshotBlobName, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(snapshotBlobName, bRef.streamInput(), bRef.length());
// Write Global MetaData
// TODO: Check if metadata needs to be written
bStream = writeGlobalMetaData(metaData);
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bRef.streamInput(), bRef.length());
for (String index : indices) {
IndexMetaData indexMetaData = metaData.index(index);
BlobPath indexPath = basePath().add("indices").add(index);
@ -240,7 +243,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.close();
indexMetaDataBlobContainer.writeBlob(snapshotBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
bRef = bStream.bytes();
indexMetaDataBlobContainer.writeBlob(snapshotBlobName(snapshotId), bRef.streamInput(), bRef.length());
}
} catch (IOException ex) {
throw new SnapshotCreationException(snapshotId, ex);
@ -314,7 +318,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
updatedSnapshot.endTime(System.currentTimeMillis());
snapshot = updatedSnapshot.build();
BytesStreamOutput bStream = writeSnapshot(snapshot);
snapshotsBlobContainer.writeBlob(blobName, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(blobName, bRef.streamInput(), bRef.length());
ImmutableList<SnapshotId> snapshotIds = snapshots();
if (!snapshotIds.contains(snapshotId)) {
snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build();
@ -569,7 +574,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
builder.endArray();
builder.endObject();
builder.close();
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bRef.streamInput(), bRef.length());
}
/**

View File

@ -22,17 +22,242 @@ package org.elasticsearch.common.io.streams;
import org.apache.lucene.util.Constants;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
/**
*
* Tests for {@link BytesStreamOutput} paging behaviour.
*/
public class BytesStreamsTests extends ElasticsearchTestCase {
@Test
public void testEmpty() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
// test empty stream to array
assertEquals(0, out.size());
assertEquals(0, out.bytes().toBytes().length);
out.close();
}
@Test
public void testSingleByte() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
assertEquals(0, out.size());
int expectedSize = 1;
byte[] expectedData = randomizedByteArrayWithSize(expectedSize);
// write single byte
out.writeByte(expectedData[0]);
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testSingleShortPage() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int expectedSize = 10;
byte[] expectedData = randomizedByteArrayWithSize(expectedSize);
// write byte-by-byte
for (int i = 0; i < expectedSize; i++) {
out.writeByte(expectedData[i]);
}
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testIllegalBulkWrite() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
// bulk-write with wrong args
try {
out.writeBytes(new byte[]{}, 0, 1);
fail("expected IllegalArgumentException: length > (size-offset)");
}
catch (IllegalArgumentException iax1) {
// expected
}
out.close();
}
@Test
public void testSingleShortPageBulkWrite() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
// first bulk-write empty array: should not change anything
int expectedSize = 0;
byte[] expectedData = randomizedByteArrayWithSize(expectedSize);
out.writeBytes(expectedData);
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
// bulk-write again with actual bytes
expectedSize = 10;
expectedData = randomizedByteArrayWithSize(expectedSize);
out.writeBytes(expectedData);
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testSingleFullPageBulkWrite() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int expectedSize = BigArrays.BYTE_PAGE_SIZE;
byte[] expectedData = randomizedByteArrayWithSize(expectedSize);
// write in bulk
out.writeBytes(expectedData);
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testSingleFullPageBulkWriteWithOffset() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int initialOffset = 10;
int additionalLength = BigArrays.BYTE_PAGE_SIZE;
byte[] expectedData = randomizedByteArrayWithSize(initialOffset + additionalLength);
// first create initial offset
out.writeBytes(expectedData, 0, initialOffset);
assertEquals(initialOffset, out.size());
// now write the rest - more than fits into the remaining first page
out.writeBytes(expectedData, initialOffset, additionalLength);
assertEquals(expectedData.length, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testSingleFullPageBulkWriteWithOffsetCrossover() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int initialOffset = 10;
int additionalLength = BigArrays.BYTE_PAGE_SIZE * 2;
byte[] expectedData = randomizedByteArrayWithSize(initialOffset + additionalLength);
out.writeBytes(expectedData, 0, initialOffset);
assertEquals(initialOffset, out.size());
// now write the rest - more than fits into the remaining page + a full page after
// that,
// ie. we cross over into a third
out.writeBytes(expectedData, initialOffset, additionalLength);
assertEquals(expectedData.length, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testSingleFullPage() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int expectedSize = BigArrays.BYTE_PAGE_SIZE;
byte[] expectedData = randomizedByteArrayWithSize(expectedSize);
// write byte-by-byte
for (int i = 0; i < expectedSize; i++) {
out.writeByte(expectedData[i]);
}
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testOneFullOneShortPage() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int expectedSize = BigArrays.BYTE_PAGE_SIZE + 10;
byte[] expectedData = randomizedByteArrayWithSize(expectedSize);
// write byte-by-byte
for (int i = 0; i < expectedSize; i++) {
out.writeByte(expectedData[i]);
}
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testTwoFullOneShortPage() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int expectedSize = (BigArrays.BYTE_PAGE_SIZE * 2) + 1;
byte[] expectedData = randomizedByteArrayWithSize(expectedSize);
// write byte-by-byte
for (int i = 0; i < expectedSize; i++) {
out.writeByte(expectedData[i]);
}
assertEquals(expectedSize, out.size());
assertArrayEquals(expectedData, out.bytes().toBytes());
out.close();
}
@Test
public void testSeek() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int position = 0;
assertEquals(position, out.position());
out.seek(position += 10);
out.seek(position += BigArrays.BYTE_PAGE_SIZE);
out.seek(position += BigArrays.BYTE_PAGE_SIZE + 10);
out.seek(position += BigArrays.BYTE_PAGE_SIZE * 2);
assertEquals(position, out.position());
assertEquals(position, out.bytes().toBytes().length);
out.close();
}
@Test
public void testSkip() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
int position = 0;
assertEquals(position, out.position());
int forward = 100;
out.skip(forward);
assertEquals(position + forward, out.position());
out.close();
}
@Test
public void testSimpleStreams() throws Exception {
assumeTrue(Constants.JRE_IS_64BIT);
@ -72,19 +297,56 @@ public class BytesStreamsTests extends ElasticsearchTestCase {
assertThat(in.readGenericValue(), equalTo((Object)doubleArray));
assertThat(in.readString(), equalTo("hello"));
assertThat(in.readString(), equalTo("goodbye"));
in.close();
out.close();
}
// we ignore this test for now since all existing callers of BytesStreamOutput happily
// call bytes() after close().
@Ignore
@Test
public void testGrowLogic() throws Exception {
assumeTrue(Constants.JRE_IS_64BIT);
public void testAccessAfterClose() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
out.writeBytes(new byte[BytesStreamOutput.DEFAULT_SIZE - 5]);
assertThat(out.bufferSize(), equalTo(2048)); // remains the default
out.writeBytes(new byte[1 * 1024]);
assertThat(out.bufferSize(), equalTo(4608));
out.writeBytes(new byte[32 * 1024]);
assertThat(out.bufferSize(), equalTo(40320));
out.writeBytes(new byte[32 * 1024]);
assertThat(out.bufferSize(), equalTo(90720));
// immediately close
out.close();
assertEquals(-1, out.size());
assertEquals(-1, out.position());
// writing a single byte must fail
try {
out.writeByte((byte)0);
fail("expected IllegalStateException: stream closed");
}
catch (IllegalStateException iex1) {
// expected
}
// writing in bulk must fail
try {
out.writeBytes(new byte[0], 0, 0);
fail("expected IllegalStateException: stream closed");
}
catch (IllegalStateException iex1) {
// expected
}
// toByteArray() must fail
try {
out.bytes().toBytes();
fail("expected IllegalStateException: stream closed");
}
catch (IllegalStateException iex1) {
// expected
}
}
// create & fill byte[] with randomized data
protected byte[] randomizedByteArrayWithSize(int size) {
byte[] data = new byte[size];
getRandom().nextBytes(data);
return data;
}
}