HBASE-15077 Support OffheapKV write in compaction with out copying data on heap.

This commit is contained in:
anoopsjohn 2016-01-12 10:02:39 +05:30
parent ec47a811a2
commit da932ee38d
10 changed files with 245 additions and 55 deletions

View File

@ -254,7 +254,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements HeapSize, Clone
length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
} }
ByteBufferUtils.putInt(out, length); ByteBufferUtils.putInt(out, length);
ByteBufferUtils.writeByteBuffer(out, this.buf, this.offset, length); ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length);
return length + Bytes.SIZEOF_INT; return length + Bytes.SIZEOF_INT;
} }

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Our own implementation of ByteArrayOutputStream where all methods are NOT synchronized and
* supports writing ByteBuffer directly to it.
*/
@InterfaceAudience.Private
public class ByteArrayOutputStream extends OutputStream implements ByteBufferSupportOutputStream {
// Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private byte[] buf;
private int pos = 0;
public ByteArrayOutputStream() {
this(32);
}
public ByteArrayOutputStream(int capacity) {
this.buf = new byte[capacity];
}
@Override
public void write(ByteBuffer b, int off, int len) throws IOException {
checkSizeAndGrow(len);
ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len);
this.pos += len;
}
@Override
public void writeInt(int i) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_INT);
Bytes.putInt(this.buf, this.pos, i);
this.pos += Bytes.SIZEOF_INT;
}
@Override
public void write(int b) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
buf[this.pos] = (byte) b;
this.pos++;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkSizeAndGrow(len);
System.arraycopy(b, off, this.buf, this.pos, len);
this.pos += len;
}
private void checkSizeAndGrow(int extra) {
long capacityNeeded = this.pos + (long) extra;
if (capacityNeeded > this.buf.length) {
// guarantee it's possible to fit
if (capacityNeeded > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
// double until hit the cap
long nextCapacity = Math.min(this.buf.length << 1, MAX_ARRAY_SIZE);
// but make sure there is enough if twice the existing capacity is still too small
nextCapacity = Math.max(nextCapacity, capacityNeeded);
if (nextCapacity > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
byte[] newBuf = new byte[(int) nextCapacity];
System.arraycopy(buf, 0, newBuf, 0, buf.length);
buf = newBuf;
}
}
/**
* Resets the <code>pos</code> field of this byte array output stream to zero. The output stream
* can be used again.
*/
public void reset() {
this.pos = 0;
}
/**
* Copies the content of this Stream into a new byte array.
* @return the contents of this output stream, as new byte array.
*/
public byte toByteArray()[] {
return Arrays.copyOf(buf, pos);
}
/**
* @return the underlying array where the data gets accumulated
*/
public byte[] getBuffer() {
return this.buf;
}
/**
* @return The current size of the buffer.
*/
public int size() {
return this.pos;
}
}

View File

@ -37,7 +37,8 @@ import org.apache.hadoop.hbase.util.Bytes;
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ByteBufferOutputStream extends OutputStream { public class ByteBufferOutputStream extends OutputStream
implements ByteBufferSupportOutputStream {
// Borrowed from openJDK: // Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
* Our extension of DataOutputStream which implements ByteBufferSupportOutputStream
*/
@InterfaceAudience.Private
public class ByteBufferSupportDataOutputStream extends DataOutputStream
implements ByteBufferSupportOutputStream {
public ByteBufferSupportDataOutputStream(OutputStream out) {
super(out);
}
@Override
public void write(ByteBuffer b, int off, int len) throws IOException {
ByteBufferUtils.copyBufferToStream(out, b, off, len);
written += len;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Interface adds support for writing {@link ByteBuffer} into OutputStream.
*/
@InterfaceAudience.Private
public interface ByteBufferSupportOutputStream {
/**
* Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
* to this output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException
* if an I/O error occurs. In particular, an <code>IOException</code> is thrown if
* the output stream is closed.
*/
void write(ByteBuffer b, int off, int len) throws IOException;
/**
* Writes an <code>int</code> to the underlying output stream as four
* bytes, high byte first.
* @param i the <code>int</code> to write
* @throws IOException if an I/O error occurs.
*/
void writeInt(int i) throws IOException;
}

View File

@ -673,14 +673,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// Write key // Write key
out.write(keyBuffer.array()); out.write(keyBuffer.array());
// Write value // Write value
ByteBufferUtils.writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength); ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags) { if (withTags) {
// 2 bytes tags length followed by tags bytes // 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int. // tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437 // As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8))); out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength)); out.write((byte) (0xff & this.tagsLength));
ByteBufferUtils.writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
} }
return lenToWrite + Bytes.SIZEOF_INT; return lenToWrite + Bytes.SIZEOF_INT;
} }

View File

@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -141,8 +141,8 @@ public final class ByteBufferUtils {
// We have writeInt in ByteBufferOutputStream so that it can directly write // We have writeInt in ByteBufferOutputStream so that it can directly write
// int to underlying // int to underlying
// ByteBuffer in one step. // ByteBuffer in one step.
if (out instanceof ByteBufferOutputStream) { if (out instanceof ByteBufferSupportOutputStream) {
((ByteBufferOutputStream) out).writeInt(value); ((ByteBufferSupportOutputStream) out).writeInt(value);
} else { } else {
StreamUtils.writeInt(out, value); StreamUtils.writeInt(out, value);
} }
@ -179,9 +179,10 @@ public final class ByteBufferUtils {
*/ */
public static void copyBufferToStream(OutputStream out, ByteBuffer in, public static void copyBufferToStream(OutputStream out, ByteBuffer in,
int offset, int length) throws IOException { int offset, int length) throws IOException {
if (in.hasArray()) { if (out instanceof ByteBufferSupportOutputStream) {
out.write(in.array(), in.arrayOffset() + offset, ((ByteBufferSupportOutputStream) out).write(in, offset, length);
length); } else if (in.hasArray()) {
out.write(in.array(), in.arrayOffset() + offset, length);
} else { } else {
for (int i = 0; i < length; ++i) { for (int i = 0; i < length; ++i) {
out.write(toByte(in, offset + i)); out.write(toByte(in, offset + i));
@ -904,19 +905,6 @@ public final class ByteBufferUtils {
} }
} }
public static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
throws IOException {
// We have write which takes ByteBuffer in ByteBufferOutputStream so that it
// can directly write
// bytes from the src ByteBuffer to the destination ByteBuffer. This avoid
// need for temp array
// creation and copy
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).write(b, offset, length);
} else {
ByteBufferUtils.copyBufferToStream(out, b, offset, length);
}
}
// For testing purpose // For testing purpose
public static String toStringBinary(final ByteBuffer b, int off, int len) { public static String toStringBinary(final ByteBuffer b, int off, int len) {
StringBuilder result = new StringBuilder(); StringBuilder result = new StringBuilder();

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -35,7 +34,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
@ -915,7 +916,7 @@ public class HFileBlock implements Cacheable {
state = State.WRITING; state = State.WRITING;
// We will compress it later in finishBlock() // We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory); userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) { if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
} }
@ -969,11 +970,8 @@ public class HFileBlock implements Cacheable {
*/ */
private void finishBlock() throws IOException { private void finishBlock() throws IOException {
if (blockType == BlockType.DATA) { if (blockType == BlockType.DATA) {
BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
new BufferGrabbingByteArrayOutputStream();
baosInMemory.writeTo(baosInMemoryCopy);
this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
baosInMemoryCopy.buf, blockType); baosInMemory.getBuffer(), blockType);
blockType = dataBlockEncodingCtx.getBlockType(); blockType = dataBlockEncodingCtx.getBlockType();
} }
userDataStream.flush(); userDataStream.flush();
@ -1011,19 +1009,6 @@ public class HFileBlock implements Cacheable {
onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
} }
public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
private byte[] buf;
@Override
public void write(byte[] b, int off, int len) {
this.buf = b;
}
public byte[] getBuffer() {
return this.buf;
}
}
/** /**
* Put the header into the given byte array at the given offset. * Put the header into the given byte array at the given offset.
* @param onDiskSize size of the block on disk header + data + checksum * @param onDiskSize size of the block on disk header + data + checksum

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -45,8 +44,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeSeeker; import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeSeeker;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
@ -256,9 +255,7 @@ public class TestDataBlockEncoders {
for (KeyValue kv : kvs) { for (KeyValue kv : kvs) {
encoder.encode(kv, encodingContext, dos); encoder.encode(kv, encodingContext, dos);
} }
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
baos.writeTo(stream);
encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET]; byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length); System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
if (useOffheapData) { if (useOffheapData) {
@ -398,9 +395,7 @@ public class TestDataBlockEncoders {
for (KeyValue kv : kvList) { for (KeyValue kv : kvList) {
encoder.encode(kv, encodingContext, dos); encoder.encode(kv, encodingContext, dos);
} }
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
baos.writeTo(stream);
encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
byte[] encodedData = baos.toByteArray(); byte[] encodedData = baos.toByteArray();
testAlgorithm(encodedData, unencodedDataBuf, encoder); testAlgorithm(encodedData, unencodedDataBuf, encoder);

View File

@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -31,12 +30,12 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
@ -217,9 +216,7 @@ public class TestHFileDataBlockEncoder {
for (KeyValue kv : kvs) { for (KeyValue kv : kvs) {
blockEncoder.encode(kv, context, dos); blockEncoder.encode(kv, context, dos);
} }
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); blockEncoder.endBlockEncoding(context, dos, baos.getBuffer(), BlockType.DATA);
baos.writeTo(stream);
blockEncoder.endBlockEncoding(context, dos, stream.getBuffer(), BlockType.DATA);
byte[] encodedBytes = baos.toByteArray(); byte[] encodedBytes = baos.toByteArray();
size = encodedBytes.length - block.getDummyHeaderForVersion().length; size = encodedBytes.length - block.getDummyHeaderForVersion().length;
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes), return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),