HBASE-12202 Support DirectByteBuffer usage in HFileBlock.
This commit is contained in:
parent
c7f51db135
commit
c01d9981d8
|
@ -0,0 +1,101 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not thread safe!
|
||||||
|
* <p>
|
||||||
|
* Please note that the reads will cause position movement on wrapped ByteBuffer.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ByteBufferInputStream extends InputStream {
|
||||||
|
|
||||||
|
private ByteBuffer buf;
|
||||||
|
|
||||||
|
public ByteBufferInputStream(ByteBuffer buf) {
|
||||||
|
this.buf = buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the next byte of data from this input stream. The value byte is returned as an
|
||||||
|
* <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
|
||||||
|
* because the end of the stream has been reached, the value <code>-1</code> is returned.
|
||||||
|
* @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
|
||||||
|
*/
|
||||||
|
public int read() {
|
||||||
|
if (this.buf.hasRemaining()) {
|
||||||
|
return (this.buf.get() & 0xff);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
|
||||||
|
* given offset).
|
||||||
|
* @param b the array into which the data is read.
|
||||||
|
* @param off the start offset in the destination array <code>b</code>
|
||||||
|
* @param len the maximum number of bytes to read.
|
||||||
|
* @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
|
||||||
|
* 1 byte can be read because the end of the stream has been reached.
|
||||||
|
*/
|
||||||
|
public int read(byte b[], int off, int len) {
|
||||||
|
int avail = available();
|
||||||
|
if (avail <= 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len > avail) {
|
||||||
|
len = avail;
|
||||||
|
}
|
||||||
|
if (len <= 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.buf.get(b, off, len);
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
|
||||||
|
* end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
|
||||||
|
* equal to the smaller of <code>n</code> and remaining bytes in the stream.
|
||||||
|
* @param n the number of bytes to be skipped.
|
||||||
|
* @return the actual number of bytes skipped.
|
||||||
|
*/
|
||||||
|
public long skip(long n) {
|
||||||
|
long k = Math.min(n, available());
|
||||||
|
if (k < 0) {
|
||||||
|
k = 0;
|
||||||
|
}
|
||||||
|
this.buf.position((int) (this.buf.position() + k));
|
||||||
|
return k;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of remaining bytes that can be read (or skipped
|
||||||
|
* over) from this input stream.
|
||||||
|
*/
|
||||||
|
public int available() {
|
||||||
|
return this.buf.remaining();
|
||||||
|
}
|
||||||
|
}
|
|
@ -40,16 +40,14 @@ public interface HFileBlockDecodingContext {
|
||||||
* @param uncompressedSizeWithoutHeader numBytes without header required to store the block after
|
* @param uncompressedSizeWithoutHeader numBytes without header required to store the block after
|
||||||
* decompressing (not decoding)
|
* decompressing (not decoding)
|
||||||
* @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data
|
* @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data
|
||||||
* @param onDiskBlock on disk bytes to be decoded
|
* @param onDiskBlock on disk data to be decoded
|
||||||
* @param offset data start offset in onDiskBlock
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void prepareDecoding(
|
void prepareDecoding(
|
||||||
int onDiskSizeWithoutHeader,
|
int onDiskSizeWithoutHeader,
|
||||||
int uncompressedSizeWithoutHeader,
|
int uncompressedSizeWithoutHeader,
|
||||||
ByteBuffer blockBufferWithoutHeader,
|
ByteBuffer blockBufferWithoutHeader,
|
||||||
byte[] onDiskBlock,
|
ByteBuffer onDiskBlock
|
||||||
int offset
|
|
||||||
) throws IOException;
|
) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.encoding;
|
package org.apache.hadoop.hbase.io.encoding;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -24,13 +23,13 @@ import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
|
||||||
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Decryptor;
|
import org.apache.hadoop.hbase.io.crypto.Decryptor;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -52,9 +51,8 @@ public class HFileBlockDefaultDecodingContext implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
|
public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
|
||||||
ByteBuffer blockBufferWithoutHeader, byte[] onDiskBlock, int offset) throws IOException {
|
ByteBuffer blockBufferWithoutHeader, ByteBuffer onDiskBlock) throws IOException {
|
||||||
InputStream in = new DataInputStream(new ByteArrayInputStream(onDiskBlock, offset,
|
InputStream in = new DataInputStream(new ByteBufferInputStream(onDiskBlock));
|
||||||
onDiskSizeWithoutHeader));
|
|
||||||
|
|
||||||
Encryption.Context cryptoContext = fileContext.getEncryptionContext();
|
Encryption.Context cryptoContext = fileContext.getEncryptionContext();
|
||||||
if (cryptoContext != Encryption.Context.NONE) {
|
if (cryptoContext != Encryption.Context.NONE) {
|
||||||
|
|
|
@ -162,12 +162,10 @@ public enum BlockType {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BlockType read(ByteBuffer buf) throws IOException {
|
public static BlockType read(ByteBuffer buf) throws IOException {
|
||||||
BlockType blockType = parse(buf.array(),
|
byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)];
|
||||||
buf.arrayOffset() + buf.position(),
|
buf.get(magicBuf);
|
||||||
Math.min(buf.limit() - buf.position(), MAGIC_LENGTH));
|
BlockType blockType = parse(magicBuf, 0, magicBuf.length);
|
||||||
|
|
||||||
// If we got here, we have read exactly MAGIC_LENGTH bytes.
|
// If we got here, we have read exactly MAGIC_LENGTH bytes.
|
||||||
buf.position(buf.position() + MAGIC_LENGTH);
|
|
||||||
return blockType;
|
return blockType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -351,6 +351,27 @@ public final class ByteBufferUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy from one buffer to another from given offset. This will be absolute positional copying and
|
||||||
|
* won't affect the position of any of the buffers.
|
||||||
|
* @param out
|
||||||
|
* @param in
|
||||||
|
* @param sourceOffset
|
||||||
|
* @param destinationOffset
|
||||||
|
* @param length
|
||||||
|
*/
|
||||||
|
public static void copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset,
|
||||||
|
int destinationOffset, int length) {
|
||||||
|
if (in.hasArray() && out.hasArray()) {
|
||||||
|
System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset()
|
||||||
|
+ destinationOffset, length);
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < length; ++i) {
|
||||||
|
out.put((destinationOffset + i), in.get(sourceOffset + i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find length of common prefix of two parts in the buffer
|
* Find length of common prefix of two parts in the buffer
|
||||||
* @param buffer Where parts are located.
|
* @param buffer Where parts are located.
|
||||||
|
@ -454,4 +475,20 @@ public final class ByteBufferUtils {
|
||||||
return output;
|
return output;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int compareTo(ByteBuffer buf1, int o1, int len1, ByteBuffer buf2, int o2, int len2) {
|
||||||
|
if (buf1.hasArray() && buf2.hasArray()) {
|
||||||
|
return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
|
||||||
|
buf2.arrayOffset() + o2, len2);
|
||||||
|
}
|
||||||
|
int end1 = o1 + len1;
|
||||||
|
int end2 = o2 + len2;
|
||||||
|
for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
|
||||||
|
byte a = buf1.get(i);
|
||||||
|
byte b = buf2.get(j);
|
||||||
|
if (a != b) {
|
||||||
|
return a - b;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return len1 - len2;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,82 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ IOTests.class, SmallTests.class })
|
||||||
|
public class TestByteBufferInputStream {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReads() throws Exception {
|
||||||
|
ByteArrayOutputStream bos = new ByteArrayOutputStream(100);
|
||||||
|
DataOutputStream dos = new DataOutputStream(bos);
|
||||||
|
String s = "test";
|
||||||
|
int i = 128;
|
||||||
|
dos.write(1);
|
||||||
|
dos.writeInt(i);
|
||||||
|
dos.writeBytes(s);
|
||||||
|
dos.writeLong(12345L);
|
||||||
|
dos.writeShort(2);
|
||||||
|
dos.flush();
|
||||||
|
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
|
||||||
|
|
||||||
|
// bbis contains 19 bytes
|
||||||
|
// 1 byte, 4 bytes int, 4 bytes string, 8 bytes long and 2 bytes short
|
||||||
|
ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
|
||||||
|
assertEquals(15 + s.length(), bbis.available());
|
||||||
|
assertEquals(1, bbis.read());
|
||||||
|
byte[] ib = new byte[4];
|
||||||
|
bbis.read(ib);
|
||||||
|
assertEquals(i, Bytes.toInt(ib));
|
||||||
|
byte[] sb = new byte[s.length()];
|
||||||
|
bbis.read(sb);
|
||||||
|
assertEquals(s, Bytes.toString(sb));
|
||||||
|
byte[] lb = new byte[8];
|
||||||
|
bbis.read(lb);
|
||||||
|
assertEquals(12345, Bytes.toLong(lb));
|
||||||
|
assertEquals(2, bbis.available());
|
||||||
|
ib = new byte[4];
|
||||||
|
int read = bbis.read(ib, 0, ib.length);
|
||||||
|
// We dont have 4 bytes remainig but only 2. So onlt those should be returned back
|
||||||
|
assertEquals(2, read);
|
||||||
|
assertEquals(2, Bytes.toShort(ib));
|
||||||
|
assertEquals(0, bbis.available());
|
||||||
|
// At end. The read() should return -1
|
||||||
|
assertEquals(-1, bbis.read());
|
||||||
|
bbis.close();
|
||||||
|
|
||||||
|
bb = ByteBuffer.wrap(bos.toByteArray());
|
||||||
|
bbis = new ByteBufferInputStream(bb);
|
||||||
|
DataInputStream dis = new DataInputStream(bbis);
|
||||||
|
dis.read();
|
||||||
|
assertEquals(i, dis.readInt());
|
||||||
|
dis.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.hfile;
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
|
@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
|
||||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||||
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
|
||||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
|
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
|
||||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||||
|
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
@ -331,8 +332,10 @@ public class HFileBlock implements Cacheable {
|
||||||
* @return the buffer with header skipped and checksum omitted.
|
* @return the buffer with header skipped and checksum omitted.
|
||||||
*/
|
*/
|
||||||
public ByteBuffer getBufferWithoutHeader() {
|
public ByteBuffer getBufferWithoutHeader() {
|
||||||
return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
|
ByteBuffer dup = this.buf.duplicate();
|
||||||
buf.limit() - headerSize() - totalChecksumBytes()).slice();
|
dup.position(headerSize());
|
||||||
|
dup.limit(buf.limit() - totalChecksumBytes());
|
||||||
|
return dup.slice();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -345,8 +348,9 @@ public class HFileBlock implements Cacheable {
|
||||||
* @return the buffer of this block for read-only operations
|
* @return the buffer of this block for read-only operations
|
||||||
*/
|
*/
|
||||||
public ByteBuffer getBufferReadOnly() {
|
public ByteBuffer getBufferReadOnly() {
|
||||||
return ByteBuffer.wrap(buf.array(), buf.arrayOffset(),
|
ByteBuffer dup = this.buf.duplicate();
|
||||||
buf.limit() - totalChecksumBytes()).slice();
|
dup.limit(buf.limit() - totalChecksumBytes());
|
||||||
|
return dup.slice();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -357,7 +361,8 @@ public class HFileBlock implements Cacheable {
|
||||||
* @return the buffer with header and checksum included for read-only operations
|
* @return the buffer with header and checksum included for read-only operations
|
||||||
*/
|
*/
|
||||||
public ByteBuffer getBufferReadOnlyWithHeader() {
|
public ByteBuffer getBufferReadOnlyWithHeader() {
|
||||||
return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
|
ByteBuffer dup = this.buf.duplicate();
|
||||||
|
return dup.slice();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -450,17 +455,22 @@ public class HFileBlock implements Cacheable {
|
||||||
.append("(").append(onDiskSizeWithoutHeader)
|
.append("(").append(onDiskSizeWithoutHeader)
|
||||||
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
|
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
|
||||||
}
|
}
|
||||||
|
String dataBegin = null;
|
||||||
|
if (buf.hasArray()) {
|
||||||
|
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
|
||||||
|
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
|
||||||
|
} else {
|
||||||
|
ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
|
||||||
|
byte[] dataBeginBytes = new byte[Math.min(32,
|
||||||
|
bufWithoutHeader.limit() - bufWithoutHeader.position())];
|
||||||
|
bufWithoutHeader.get(dataBeginBytes);
|
||||||
|
dataBegin = Bytes.toStringBinary(dataBeginBytes);
|
||||||
|
}
|
||||||
sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
|
sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
|
||||||
.append(" totalChecksumBytes()=").append(totalChecksumBytes())
|
.append(" totalChecksumBytes()=").append(totalChecksumBytes())
|
||||||
.append(" isUnpacked()=").append(isUnpacked())
|
.append(" isUnpacked()=").append(isUnpacked())
|
||||||
.append(" buf=[ ")
|
.append(" buf=[ ").append(buf).append(" ]")
|
||||||
.append(buf)
|
.append(" dataBeginsWith=").append(dataBegin)
|
||||||
.append(", array().length=").append(buf.array().length)
|
|
||||||
.append(", arrayOffset()=").append(buf.arrayOffset())
|
|
||||||
.append(" ]")
|
|
||||||
.append(" dataBeginsWith=")
|
|
||||||
.append(Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
|
|
||||||
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())))
|
|
||||||
.append(" fileContext=").append(fileContext)
|
.append(" fileContext=").append(fileContext)
|
||||||
.append(" ]");
|
.append(" ]");
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
|
@ -472,10 +482,17 @@ public class HFileBlock implements Cacheable {
|
||||||
private void validateOnDiskSizeWithoutHeader(
|
private void validateOnDiskSizeWithoutHeader(
|
||||||
int expectedOnDiskSizeWithoutHeader) throws IOException {
|
int expectedOnDiskSizeWithoutHeader) throws IOException {
|
||||||
if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
|
if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
|
||||||
|
String dataBegin = null;
|
||||||
|
if (buf.hasArray()) {
|
||||||
|
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
|
||||||
|
} else {
|
||||||
|
ByteBuffer bufDup = getBufferReadOnly();
|
||||||
|
byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
|
||||||
|
bufDup.get(dataBeginBytes);
|
||||||
|
dataBegin = Bytes.toStringBinary(dataBeginBytes);
|
||||||
|
}
|
||||||
String blockInfoMsg =
|
String blockInfoMsg =
|
||||||
"Block offset: " + offset + ", data starts with: "
|
"Block offset: " + offset + ", data starts with: " + dataBegin;
|
||||||
+ Bytes.toStringBinary(buf.array(), buf.arrayOffset(),
|
|
||||||
buf.arrayOffset() + Math.min(32, buf.limit()));
|
|
||||||
throw new IOException("On-disk size without header provided is "
|
throw new IOException("On-disk size without header provided is "
|
||||||
+ expectedOnDiskSizeWithoutHeader + ", but block "
|
+ expectedOnDiskSizeWithoutHeader + ", but block "
|
||||||
+ "header contains " + onDiskSizeWithoutHeader + ". " +
|
+ "header contains " + onDiskSizeWithoutHeader + ". " +
|
||||||
|
@ -500,16 +517,30 @@ public class HFileBlock implements Cacheable {
|
||||||
|
|
||||||
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
|
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
|
||||||
reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
|
reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
|
||||||
|
|
||||||
|
ByteBuffer dup = this.buf.duplicate();
|
||||||
|
dup.position(this.headerSize());
|
||||||
|
dup = dup.slice();
|
||||||
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
|
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
|
||||||
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
|
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
|
||||||
this.getBufferReadOnlyWithHeader().array(), this.headerSize());
|
dup);
|
||||||
|
|
||||||
// Preserve the next block's header bytes in the new block if we have them.
|
// Preserve the next block's header bytes in the new block if we have them.
|
||||||
if (unpacked.hasNextBlockHeader()) {
|
if (unpacked.hasNextBlockHeader()) {
|
||||||
System.arraycopy(this.buf.array(), this.buf.arrayOffset() + this.onDiskDataSizeWithHeader,
|
// Both the buffers are limited till checksum bytes and avoid the next block's header.
|
||||||
unpacked.buf.array(), unpacked.buf.arrayOffset() + unpacked.headerSize() +
|
// Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
|
||||||
unpacked.uncompressedSizeWithoutHeader + unpacked.totalChecksumBytes(),
|
// any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
|
||||||
unpacked.headerSize());
|
// new BB objects
|
||||||
|
ByteBuffer inDup = this.buf.duplicate();
|
||||||
|
inDup.limit(inDup.limit() + headerSize());
|
||||||
|
ByteBuffer outDup = unpacked.buf.duplicate();
|
||||||
|
outDup.limit(outDup.limit() + unpacked.headerSize());
|
||||||
|
ByteBufferUtils.copyFromBufferToBuffer(
|
||||||
|
outDup,
|
||||||
|
inDup,
|
||||||
|
this.onDiskDataSizeWithHeader,
|
||||||
|
unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
|
||||||
|
+ unpacked.totalChecksumBytes(), unpacked.headerSize());
|
||||||
}
|
}
|
||||||
return unpacked;
|
return unpacked;
|
||||||
}
|
}
|
||||||
|
@ -532,11 +563,14 @@ public class HFileBlock implements Cacheable {
|
||||||
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
|
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
|
||||||
cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
|
cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
|
||||||
|
|
||||||
|
// TODO we need consider allocating offheap here?
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
|
ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
|
||||||
|
|
||||||
// Copy header bytes.
|
// Copy header bytes into newBuf.
|
||||||
System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
|
// newBuf is HBB so no issue in calling array()
|
||||||
newBuf.arrayOffset(), headerSize);
|
ByteBuffer dup = buf.duplicate();
|
||||||
|
dup.position(0);
|
||||||
|
dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
|
||||||
|
|
||||||
buf = newBuf;
|
buf = newBuf;
|
||||||
// set limit to exclude next block's header
|
// set limit to exclude next block's header
|
||||||
|
@ -590,8 +624,9 @@ public class HFileBlock implements Cacheable {
|
||||||
* @return a byte stream reading the data + checksum of this block
|
* @return a byte stream reading the data + checksum of this block
|
||||||
*/
|
*/
|
||||||
public DataInputStream getByteStream() {
|
public DataInputStream getByteStream() {
|
||||||
return new DataInputStream(new ByteArrayInputStream(buf.array(),
|
ByteBuffer dup = this.buf.duplicate();
|
||||||
buf.arrayOffset() + headerSize(), buf.limit() - headerSize()));
|
dup.position(this.headerSize());
|
||||||
|
return new DataInputStream(new ByteBufferInputStream(dup));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1360,7 +1395,7 @@ public class HFileBlock implements Cacheable {
|
||||||
private static class PrefetchedHeader {
|
private static class PrefetchedHeader {
|
||||||
long offset = -1;
|
long offset = -1;
|
||||||
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||||
ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
|
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Reads version 2 blocks from the filesystem. */
|
/** Reads version 2 blocks from the filesystem. */
|
||||||
|
@ -1547,6 +1582,7 @@ public class HFileBlock implements Cacheable {
|
||||||
if (headerBuf != null) {
|
if (headerBuf != null) {
|
||||||
// the header has been read when reading the previous block, copy
|
// the header has been read when reading the previous block, copy
|
||||||
// to this block's header
|
// to this block's header
|
||||||
|
// headerBuf is HBB
|
||||||
System.arraycopy(headerBuf.array(),
|
System.arraycopy(headerBuf.array(),
|
||||||
headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
|
headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1591,11 +1627,13 @@ public class HFileBlock implements Cacheable {
|
||||||
// in a series of reads or a random read, and we don't have access
|
// in a series of reads or a random read, and we don't have access
|
||||||
// to the block index. This is costly and should happen very rarely.
|
// to the block index. This is costly and should happen very rarely.
|
||||||
headerBuf = ByteBuffer.allocate(hdrSize);
|
headerBuf = ByteBuffer.allocate(hdrSize);
|
||||||
|
// headerBuf is HBB
|
||||||
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
|
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
|
||||||
hdrSize, false, offset, pread);
|
hdrSize, false, offset, pread);
|
||||||
}
|
}
|
||||||
b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
|
b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
|
||||||
onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
|
onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
|
||||||
|
// headerBuf is HBB
|
||||||
System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
|
System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
|
||||||
nextBlockOnDiskSize =
|
nextBlockOnDiskSize =
|
||||||
readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
|
readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
|
||||||
|
@ -1685,9 +1723,8 @@ public class HFileBlock implements Cacheable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serialize(ByteBuffer destination) {
|
public void serialize(ByteBuffer destination) {
|
||||||
// assumes HeapByteBuffer
|
ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
|
||||||
destination.put(this.buf.array(), this.buf.arrayOffset(),
|
- EXTRA_SERIALIZATION_SPACE);
|
||||||
getSerializedLength() - EXTRA_SERIALIZATION_SPACE);
|
|
||||||
serializeExtraInfo(destination);
|
serializeExtraInfo(destination);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1735,8 +1772,7 @@ public class HFileBlock implements Cacheable {
|
||||||
if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
|
if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (Bytes.compareTo(this.buf.array(), this.buf.arrayOffset(), this.buf.limit(),
|
if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
|
||||||
castedComparison.buf.array(), castedComparison.buf.arrayOffset(),
|
|
||||||
castedComparison.buf.limit()) != 0) {
|
castedComparison.buf.limit()) != 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1826,24 +1862,16 @@ public class HFileBlock implements Cacheable {
|
||||||
* has minor version > 0.
|
* has minor version > 0.
|
||||||
*/
|
*/
|
||||||
static String toStringHeader(ByteBuffer buf) throws IOException {
|
static String toStringHeader(ByteBuffer buf) throws IOException {
|
||||||
int offset = buf.arrayOffset();
|
byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
|
||||||
byte[] b = buf.array();
|
buf.get(magicBuf);
|
||||||
long magic = Bytes.toLong(b, offset);
|
BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
|
||||||
BlockType bt = BlockType.read(buf);
|
int compressedBlockSizeNoHeader = buf.getInt();;
|
||||||
offset += Bytes.SIZEOF_LONG;
|
int uncompressedBlockSizeNoHeader = buf.getInt();;
|
||||||
int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
|
long prevBlockOffset = buf.getLong();
|
||||||
offset += Bytes.SIZEOF_INT;
|
byte cksumtype = buf.get();
|
||||||
int uncompressedBlockSizeNoHeader = Bytes.toInt(b, offset);
|
long bytesPerChecksum = buf.getInt();
|
||||||
offset += Bytes.SIZEOF_INT;
|
long onDiskDataSizeWithHeader = buf.getInt();
|
||||||
long prevBlockOffset = Bytes.toLong(b, offset);
|
return " Header dump: magic: " + Bytes.toString(magicBuf) +
|
||||||
offset += Bytes.SIZEOF_LONG;
|
|
||||||
byte cksumtype = b[offset];
|
|
||||||
offset += Bytes.SIZEOF_BYTE;
|
|
||||||
long bytesPerChecksum = Bytes.toInt(b, offset);
|
|
||||||
offset += Bytes.SIZEOF_INT;
|
|
||||||
long onDiskDataSizeWithHeader = Bytes.toInt(b, offset);
|
|
||||||
offset += Bytes.SIZEOF_INT;
|
|
||||||
return " Header dump: magic: " + magic +
|
|
||||||
" blockType " + bt +
|
" blockType " + bt +
|
||||||
" compressedBlockSizeNoHeader " +
|
" compressedBlockSizeNoHeader " +
|
||||||
compressedBlockSizeNoHeader +
|
compressedBlockSizeNoHeader +
|
||||||
|
|
Loading…
Reference in New Issue