HBASE-14307 - Incorrect use of positional read api in HFileBlock (Chris
Nauroth)
This commit is contained in:
parent
fda317cebb
commit
c438052cc2
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -720,6 +721,45 @@ public class HFileBlock implements Cacheable {
|
||||||
return bytesRemaining <= 0;
|
return bytesRemaining <= 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read from an input stream. Analogous to
|
||||||
|
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
|
||||||
|
* positional read and specifies a number of "extra" bytes that would be
|
||||||
|
* desirable but not absolutely necessary to read.
|
||||||
|
*
|
||||||
|
* @param in the input stream to read from
|
||||||
|
* @param position the position within the stream from which to start reading
|
||||||
|
* @param buf the buffer to read into
|
||||||
|
* @param bufOffset the destination offset in the buffer
|
||||||
|
* @param necessaryLen the number of bytes that are absolutely necessary to
|
||||||
|
* read
|
||||||
|
* @param extraLen the number of extra bytes that would be nice to read
|
||||||
|
* @return true if and only if extraLen is > 0 and reading those extra bytes
|
||||||
|
* was successful
|
||||||
|
* @throws IOException if failed to read the necessary bytes
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static boolean positionalReadWithExtra(FSDataInputStream in,
|
||||||
|
long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
|
||||||
|
throws IOException {
|
||||||
|
int bytesRemaining = necessaryLen + extraLen;
|
||||||
|
int bytesRead = 0;
|
||||||
|
while (bytesRead < necessaryLen) {
|
||||||
|
int ret = in.read(position, buf, bufOffset, bytesRemaining);
|
||||||
|
if (ret < 0) {
|
||||||
|
throw new IOException("Premature EOF from inputStream (positional read "
|
||||||
|
+ "returned " + ret + ", was trying to read " + necessaryLen
|
||||||
|
+ " necessary bytes and " + extraLen + " extra bytes, "
|
||||||
|
+ "successfully read " + bytesRead);
|
||||||
|
}
|
||||||
|
position += ret;
|
||||||
|
bufOffset += ret;
|
||||||
|
bytesRemaining -= ret;
|
||||||
|
bytesRead += ret;
|
||||||
|
}
|
||||||
|
return bytesRead != necessaryLen && bytesRemaining <= 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the on-disk size of the next block (including the header size)
|
* @return the on-disk size of the next block (including the header size)
|
||||||
* that was read by peeking into the next block's header
|
* that was read by peeking into the next block's header
|
||||||
|
@ -1443,14 +1483,8 @@ public class HFileBlock implements Cacheable {
|
||||||
} else {
|
} else {
|
||||||
// Positional read. Better for random reads; or when the streamLock is already locked.
|
// Positional read. Better for random reads; or when the streamLock is already locked.
|
||||||
int extraSize = peekIntoNextBlock ? hdrSize : 0;
|
int extraSize = peekIntoNextBlock ? hdrSize : 0;
|
||||||
int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
|
if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
|
||||||
if (ret < size) {
|
size, extraSize)) {
|
||||||
throw new IOException("Positional read of " + size + " bytes " +
|
|
||||||
"failed at offset " + fileOffset + " (returned " + ret + ")");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret == size || ret < size + extraSize) {
|
|
||||||
// Could not read the next block's header, or did not try.
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test suite covering HFileBlock positional read logic.
|
||||||
|
*/
|
||||||
|
@Category({IOTests.class, SmallTests.class})
|
||||||
|
public class TestHFileBlockPositionalRead {
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPositionalReadNoExtra() throws IOException {
|
||||||
|
long position = 0;
|
||||||
|
int bufOffset = 0;
|
||||||
|
int necessaryLen = 10;
|
||||||
|
int extraLen = 0;
|
||||||
|
int totalLen = necessaryLen + extraLen;
|
||||||
|
byte[] buf = new byte[totalLen];
|
||||||
|
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||||
|
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
|
||||||
|
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
|
||||||
|
bufOffset, necessaryLen, extraLen);
|
||||||
|
assertFalse("Expect false return when no extra bytes requested", ret);
|
||||||
|
verify(in).read(position, buf, bufOffset, totalLen);
|
||||||
|
verifyNoMoreInteractions(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPositionalReadShortReadOfNecessaryBytes() throws IOException {
|
||||||
|
long position = 0;
|
||||||
|
int bufOffset = 0;
|
||||||
|
int necessaryLen = 10;
|
||||||
|
int extraLen = 0;
|
||||||
|
int totalLen = necessaryLen + extraLen;
|
||||||
|
byte[] buf = new byte[totalLen];
|
||||||
|
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||||
|
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
|
||||||
|
when(in.read(5, buf, 5, 5)).thenReturn(5);
|
||||||
|
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
|
||||||
|
bufOffset, necessaryLen, extraLen);
|
||||||
|
assertFalse("Expect false return when no extra bytes requested", ret);
|
||||||
|
verify(in).read(position, buf, bufOffset, totalLen);
|
||||||
|
verify(in).read(5, buf, 5, 5);
|
||||||
|
verifyNoMoreInteractions(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPositionalReadExtraSucceeded() throws IOException {
|
||||||
|
long position = 0;
|
||||||
|
int bufOffset = 0;
|
||||||
|
int necessaryLen = 10;
|
||||||
|
int extraLen = 5;
|
||||||
|
int totalLen = necessaryLen + extraLen;
|
||||||
|
byte[] buf = new byte[totalLen];
|
||||||
|
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||||
|
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
|
||||||
|
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
|
||||||
|
bufOffset, necessaryLen, extraLen);
|
||||||
|
assertTrue("Expect true return when reading extra bytes succeeds", ret);
|
||||||
|
verify(in).read(position, buf, bufOffset, totalLen);
|
||||||
|
verifyNoMoreInteractions(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPositionalReadExtraFailed() throws IOException {
|
||||||
|
long position = 0;
|
||||||
|
int bufOffset = 0;
|
||||||
|
int necessaryLen = 10;
|
||||||
|
int extraLen = 5;
|
||||||
|
int totalLen = necessaryLen + extraLen;
|
||||||
|
byte[] buf = new byte[totalLen];
|
||||||
|
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||||
|
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
|
||||||
|
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
|
||||||
|
bufOffset, necessaryLen, extraLen);
|
||||||
|
assertFalse("Expect false return when reading extra bytes fails", ret);
|
||||||
|
verify(in).read(position, buf, bufOffset, totalLen);
|
||||||
|
verifyNoMoreInteractions(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes()
|
||||||
|
throws IOException {
|
||||||
|
long position = 0;
|
||||||
|
int bufOffset = 0;
|
||||||
|
int necessaryLen = 10;
|
||||||
|
int extraLen = 5;
|
||||||
|
int totalLen = necessaryLen + extraLen;
|
||||||
|
byte[] buf = new byte[totalLen];
|
||||||
|
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||||
|
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
|
||||||
|
when(in.read(5, buf, 5, 10)).thenReturn(10);
|
||||||
|
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
|
||||||
|
bufOffset, necessaryLen, extraLen);
|
||||||
|
assertTrue("Expect true return when reading extra bytes succeeds", ret);
|
||||||
|
verify(in).read(position, buf, bufOffset, totalLen);
|
||||||
|
verify(in).read(5, buf, 5, 10);
|
||||||
|
verifyNoMoreInteractions(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPositionalReadPrematureEOF() throws IOException {
|
||||||
|
long position = 0;
|
||||||
|
int bufOffset = 0;
|
||||||
|
int necessaryLen = 10;
|
||||||
|
int extraLen = 0;
|
||||||
|
int totalLen = necessaryLen + extraLen;
|
||||||
|
byte[] buf = new byte[totalLen];
|
||||||
|
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||||
|
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
|
||||||
|
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
|
||||||
|
exception.expect(IOException.class);
|
||||||
|
exception.expectMessage("EOF");
|
||||||
|
HFileBlock.positionalReadWithExtra(in, position, buf, bufOffset,
|
||||||
|
necessaryLen, extraLen);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue