HBASE-14307 - Incorrect use of positional read api in HFileBlock (Chris

Nauroth)
This commit is contained in:
ramkrishna 2015-09-11 15:56:09 +05:30
parent d9bc84b1fc
commit 954a6e0a75
2 changed files with 190 additions and 8 deletions

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
@ -694,6 +695,45 @@ public class HFileBlock implements Cacheable {
return bytesRemaining <= 0; return bytesRemaining <= 0;
} }
/**
* Read from an input stream. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
* positional read and specifies a number of "extra" bytes that would be
* desirable but not absolutely necessary to read.
*
* @param in the input stream to read from
* @param position the position within the stream from which to start reading
* @param buf the buffer to read into
* @param bufOffset the destination offset in the buffer
* @param necessaryLen the number of bytes that are absolutely necessary to
* read
* @param extraLen the number of extra bytes that would be nice to read
* @return true if and only if extraLen is > 0 and reading those extra bytes
* was successful
* @throws IOException if failed to read the necessary bytes
*/
@VisibleForTesting
static boolean positionalReadWithExtra(FSDataInputStream in,
long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
throws IOException {
int bytesRemaining = necessaryLen + extraLen;
int bytesRead = 0;
while (bytesRead < necessaryLen) {
int ret = in.read(position, buf, bufOffset, bytesRemaining);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read "
+ "returned " + ret + ", was trying to read " + necessaryLen
+ " necessary bytes and " + extraLen + " extra bytes, "
+ "successfully read " + bytesRead);
}
position += ret;
bufOffset += ret;
bytesRemaining -= ret;
bytesRead += ret;
}
return bytesRead != necessaryLen && bytesRemaining <= 0;
}
/** /**
* @return the on-disk size of the next block (including the header size) * @return the on-disk size of the next block (including the header size)
* that was read by peeking into the next block's header * that was read by peeking into the next block's header
@ -1377,14 +1417,8 @@ public class HFileBlock implements Cacheable {
} else { } else {
// Positional read. Better for random reads; or when the streamLock is already locked. // Positional read. Better for random reads; or when the streamLock is already locked.
int extraSize = peekIntoNextBlock ? hdrSize : 0; int extraSize = peekIntoNextBlock ? hdrSize : 0;
int ret = istream.read(fileOffset, dest, destOffset, size + extraSize); if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
if (ret < size) { size, extraSize)) {
throw new IOException("Positional read of " + size + " bytes " +
"failed at offset " + fileOffset + " (returned " + ret + ")");
}
if (ret == size || ret < size + extraSize) {
// Could not read the next block's header, or did not try.
return -1; return -1;
} }
} }

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
/**
* Unit test suite covering HFileBlock positional read logic.
*/
@Category({IOTests.class, SmallTests.class})
public class TestHFileBlockPositionalRead {
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testPositionalReadNoExtra() throws IOException {
long position = 0;
int bufOffset = 0;
int necessaryLen = 10;
int extraLen = 0;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
bufOffset, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in);
}
@Test
public void testPositionalReadShortReadOfNecessaryBytes() throws IOException {
long position = 0;
int bufOffset = 0;
int necessaryLen = 10;
int extraLen = 0;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 5)).thenReturn(5);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
bufOffset, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 5);
verifyNoMoreInteractions(in);
}
@Test
public void testPositionalReadExtraSucceeded() throws IOException {
long position = 0;
int bufOffset = 0;
int necessaryLen = 10;
int extraLen = 5;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
bufOffset, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in);
}
@Test
public void testPositionalReadExtraFailed() throws IOException {
long position = 0;
int bufOffset = 0;
int necessaryLen = 10;
int extraLen = 5;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
bufOffset, necessaryLen, extraLen);
assertFalse("Expect false return when reading extra bytes fails", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in);
}
@Test
public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes()
throws IOException {
long position = 0;
int bufOffset = 0;
int necessaryLen = 10;
int extraLen = 5;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 10)).thenReturn(10);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
bufOffset, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 10);
verifyNoMoreInteractions(in);
}
@Test
public void testPositionalReadPrematureEOF() throws IOException {
long position = 0;
int bufOffset = 0;
int necessaryLen = 10;
int extraLen = 0;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
exception.expect(IOException.class);
exception.expectMessage("EOF");
HFileBlock.positionalReadWithExtra(in, position, buf, bufOffset,
necessaryLen, extraLen);
}
}