HBASE-13817 ByteBufferOuputStream - add writeInt support.

This commit is contained in:
anoopsjohn 2015-06-04 10:22:33 +05:30
parent e8e5a9f639
commit bb62d5b2e8
5 changed files with 44 additions and 11 deletions

View File

@ -37,6 +37,7 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -2532,12 +2533,22 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
if (!withTags) { if (!withTags) {
length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
} }
// This does same as DataOuput#writeInt (big-endian, etc.) writeInt(out, length);
StreamUtils.writeInt(out, length);
out.write(this.bytes, this.offset, length); out.write(this.bytes, this.offset, length);
return length + Bytes.SIZEOF_INT; return length + Bytes.SIZEOF_INT;
} }
// This does same as DataOuput#writeInt (big-endian, etc.)
public static void writeInt(OutputStream out, int v) throws IOException {
// We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
// ByteBuffer in one step.
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).writeInt(v);
} else {
StreamUtils.writeInt(out, v);
}
}
/** /**
* Comparator that compares row component only of a KeyValue. * Comparator that compares row component only of a KeyValue.
*/ */

View File

@ -679,11 +679,11 @@ public class KeyValueUtil {
int tlen = cell.getTagsLength(); int tlen = cell.getTagsLength();
// write total length // write total length
StreamUtils.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags)); KeyValue.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags));
// write key length // write key length
StreamUtils.writeInt(out, keyLength(rlen, flen, qlen)); KeyValue.writeInt(out, keyLength(rlen, flen, qlen));
// write value length // write value length
StreamUtils.writeInt(out, vlen); KeyValue.writeInt(out, vlen);
// Write rowkey - 2 bytes rk length followed by rowkey bytes // Write rowkey - 2 bytes rk length followed by rowkey bytes
StreamUtils.writeShort(out, rlen); StreamUtils.writeShort(out, rlen);
out.write(cell.getRowArray(), cell.getRowOffset(), rlen); out.write(cell.getRowArray(), cell.getRowOffset(), rlen);

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -44,8 +43,7 @@ public class NoTagsKeyValue extends KeyValue {
public int write(OutputStream out, boolean withTags) throws IOException { public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
// check KeyValueUtil#oswrite also and do necessary changes. // check KeyValueUtil#oswrite also and do necessary changes.
// This does same as DataOuput#writeInt (big-endian, etc.) writeInt(out, this.length);
StreamUtils.writeInt(out, this.length);
out.write(this.bytes, this.offset, this.length); out.write(this.bytes, this.offset, this.length);
return this.length + Bytes.SIZEOF_INT; return this.length + Bytes.SIZEOF_INT;
} }

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
@ -57,6 +58,7 @@ public class ByteBufferOutputStream extends OutputStream {
* @see #getByteBuffer() * @see #getByteBuffer()
*/ */
public ByteBufferOutputStream(final ByteBuffer bb) { public ByteBufferOutputStream(final ByteBuffer bb) {
assert bb.order() == ByteOrder.BIG_ENDIAN;
this.buf = bb; this.buf = bb;
this.buf.clear(); this.buf.clear();
} }
@ -128,6 +130,17 @@ public class ByteBufferOutputStream extends OutputStream {
buf.put(b, off, len); buf.put(b, off, len);
} }
/**
* Writes an <code>int</code> to the underlying output stream as four
* bytes, high byte first.
* @param i the <code>int</code> to write
* @throws IOException if an I/O error occurs.
*/
public void writeInt(int i) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_INT);
this.buf.putInt(i);
}
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
// noop // noop

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Streamable; import org.apache.hadoop.hbase.Streamable;
import org.apache.hadoop.hbase.SettableSequenceId; import org.apache.hadoop.hbase.SettableSequenceId;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.BlockType;
@ -548,9 +549,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
public int write(OutputStream out, boolean withTags) throws IOException { public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags); tagsLength, withTags);
StreamUtils.writeInt(out, lenToWrite); writeInt(out, lenToWrite);
StreamUtils.writeInt(out, keyOnlyBuffer.length); writeInt(out, keyOnlyBuffer.length);
StreamUtils.writeInt(out, valueLength); writeInt(out, valueLength);
// Write key // Write key
out.write(keyOnlyBuffer); out.write(keyOnlyBuffer);
// Write value // Write value
@ -574,6 +575,16 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
} }
} }
private static void writeInt(OutputStream out, int v) throws IOException {
// We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
// ByteBuffer in one step.
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).writeInt(v);
} else {
StreamUtils.writeInt(out, v);
}
}
protected abstract static class protected abstract static class
BufferedEncodedSeeker<STATE extends SeekerState> BufferedEncodedSeeker<STATE extends SeekerState>
implements EncodedSeeker { implements EncodedSeeker {