HBASE-15160 Put back HFile's HDFS op latency sampling code and add metrics for monitoring (Yu Li and Enis Soztutar)

This commit is contained in:
Enis Soztutar 2017-06-06 14:40:50 -07:00
parent 356d4e9187
commit ea3075e7fd
17 changed files with 435 additions and 46 deletions

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.metrics.BaseSource;
public interface MetricsIOSource extends BaseSource {
/**
* The name of the metrics
*/
String METRICS_NAME = "IO";
/**
* The name of the metrics context that metrics will be under.
*/
String METRICS_CONTEXT = "regionserver";
/**
* Description
*/
String METRICS_DESCRIPTION = "Metrics about FileSystem IO";
/**
* The name of the metrics context that metrics will be under in jmx
*/
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
String FS_READ_TIME_HISTO_KEY = "fsReadTime";
String FS_PREAD_TIME_HISTO_KEY = "fsPReadTime";
String FS_WRITE_HISTO_KEY = "fsWriteTime";
String CHECKSUM_FAILURES_KEY = "fsChecksumFailureCount";
String FS_READ_TIME_HISTO_DESC
= "Latency of HFile's sequential reads on this region server in milliseconds";
String FS_PREAD_TIME_HISTO_DESC
= "Latency of HFile's positional reads on this region server in milliseconds";
String FS_WRITE_TIME_HISTO_DESC
= "Latency of HFile's writes on this region server in milliseconds";
String CHECKSUM_FAILURES_DESC = "Number of checksum failures for the HBase HFile checksums at the"
+ " HBase level (separate from HDFS checksums)";
/**
* Update the fs sequential read time histogram
* @param t time it took, in milliseconds
*/
void updateFsReadTime(long t);
/**
* Update the fs positional read time histogram
* @param t time it took, in milliseconds
*/
void updateFsPReadTime(long t);
/**
* Update the fs write time histogram
* @param t time it took, in milliseconds
*/
void updateFsWriteTime(long t);
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
public interface MetricsIOWrapper {
long getChecksumFailures();
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.io.MetricsIOSource;
import org.apache.hadoop.hbase.io.MetricsIOWrapper;
/**
* Interface of a factory to create Metrics Sources used inside of regionservers.
*/
@ -54,4 +57,11 @@ public interface MetricsRegionServerSourceFactory {
* @return A metrics table aggregate source
*/
MetricsTableAggregateSource getTableAggregate();
/**
* Create a MetricsIOSource from a MetricsIOWrapper.
*
* @return A metrics IO source
*/
MetricsIOSource createIO(MetricsIOWrapper wrapper);
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.Interns;
public class MetricsIOSourceImpl extends BaseSourceImpl implements MetricsIOSource {
private final MetricsIOWrapper wrapper;
private final MetricHistogram fsReadTimeHisto;
private final MetricHistogram fsPReadTimeHisto;
private final MetricHistogram fsWriteTimeHisto;
public MetricsIOSourceImpl(MetricsIOWrapper wrapper) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, wrapper);
}
public MetricsIOSourceImpl(String metricsName,
String metricsDescription,
String metricsContext,
String metricsJmxContext,
MetricsIOWrapper wrapper) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
this.wrapper = wrapper;
fsReadTimeHisto = getMetricsRegistry()
.newTimeHistogram(FS_READ_TIME_HISTO_KEY, FS_READ_TIME_HISTO_DESC);
fsPReadTimeHisto = getMetricsRegistry()
.newTimeHistogram(FS_PREAD_TIME_HISTO_KEY, FS_PREAD_TIME_HISTO_DESC);
fsWriteTimeHisto = getMetricsRegistry()
.newTimeHistogram(FS_WRITE_HISTO_KEY, FS_WRITE_TIME_HISTO_DESC);
}
@Override
public void updateFsReadTime(long t) {
fsReadTimeHisto.add(t);
};
@Override
public void updateFsPReadTime(long t) {
fsPReadTimeHisto.add(t);
};
@Override
public void updateFsWriteTime(long t) {
fsWriteTimeHisto.add(t);
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
// wrapper can be null because this function is called inside of init.
if (wrapper != null) {
mrb.addCounter(Interns.info(CHECKSUM_FAILURES_KEY, CHECKSUM_FAILURES_DESC),
wrapper.getChecksumFailures());
}
metricsRegistry.snapshot(mrb, all);
}
}

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.MetricsIOSource;
import org.apache.hadoop.hbase.io.MetricsIOSourceImpl;
import org.apache.hadoop.hbase.io.MetricsIOWrapper;
/**
* Factory to create MetricsRegionServerSource when given a MetricsRegionServerWrapper
@ -65,4 +68,8 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public MetricsTableSource createTable(String table, MetricsTableWrapperAggregate wrapper) {
return new MetricsTableSourceImpl(table, getTableAggregate(), wrapper);
}
public MetricsIOSource createIO(MetricsIOWrapper wrapper) {
return new MetricsIOSourceImpl(wrapper);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceFactory;
import com.google.common.annotations.VisibleForTesting;
public class MetricsIO {
private final MetricsIOSource source;
private final MetricsIOWrapper wrapper;
public MetricsIO(MetricsIOWrapper wrapper) {
this(CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createIO(wrapper), wrapper);
}
MetricsIO(MetricsIOSource source, MetricsIOWrapper wrapper) {
this.source = source;
this.wrapper = wrapper;
}
@VisibleForTesting
public MetricsIOSource getMetricsSource() {
return source;
}
@VisibleForTesting
public MetricsIOWrapper getWrapper() {
return wrapper;
}
public void updateFsReadTime(long t) {
source.updateFsReadTime(t);
}
public void updateFsPreadTime(long t) {
source.updateFsPReadTime(t);
}
public void updateFsWriteTime(long t) {
source.updateFsWriteTime(t);
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.io.hfile.HFile;
public class MetricsIOWrapperImpl implements MetricsIOWrapper {
@Override
public long getChecksumFailures() {
return HFile.getAndResetChecksumFailuresCount();
}
}

View File

@ -52,6 +52,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.MetricsIO;
import org.apache.hadoop.hbase.io.MetricsIOWrapperImpl;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -185,16 +187,39 @@ public class HFile {
// For tests. Gets incremented when we read a block whether from HDFS or from Cache.
public static final Counter DATABLOCK_READ_COUNT = new Counter();
/** Static instance for the metrics so that HFileReaders access the same instance */
static final MetricsIO metrics = new MetricsIO(new MetricsIOWrapperImpl());
/**
* Number of checksum verification failures. It also
* clears the counter.
*/
public static final long getChecksumFailuresCount() {
public static final long getAndResetChecksumFailuresCount() {
long count = CHECKSUM_FAILURES.get();
CHECKSUM_FAILURES.set(0);
return count;
}
/**
* Number of checksum verification failures.
*/
public static final long getChecksumFailuresCount() {
long count = CHECKSUM_FAILURES.get();
return count;
}
public static final void updateReadLatency(long latencyMillis, boolean pread) {
if (pread) {
metrics.updateFsPreadTime(latencyMillis);
} else {
metrics.updateFsReadTime(latencyMillis);
}
}
public static final void updateWriteLatency(long latencyMillis) {
metrics.updateFsWriteTime(latencyMillis);
}
/** API required to write an {@link HFile} */
public interface Writer extends Closeable {

View File

@ -1090,8 +1090,10 @@ public class HFileBlock implements Cacheable {
protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
throws IOException {
ensureBlockReady();
long startTime = System.currentTimeMillis();
out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
out.write(onDiskChecksum);
HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
}
/**
@ -1340,7 +1342,8 @@ public class HFileBlock implements Cacheable {
* applicable headers, or -1 if unknown
* @return the newly read block
*/
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread) throws IOException;
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics)
throws IOException;
/**
* Creates a block iterator over the given portion of the {@link HFile}.
@ -1419,7 +1422,7 @@ public class HFileBlock implements Cacheable {
if (offset >= endOffset) {
return null;
}
HFileBlock b = readBlockData(offset, length, false);
HFileBlock b = readBlockData(offset, length, false, false);
offset += b.getOnDiskSizeWithHeader();
length = b.getNextBlockOnDiskSize();
return b.unpack(fileContext, owner);
@ -1468,7 +1471,7 @@ public class HFileBlock implements Cacheable {
// Seek + read. Better for scanning.
try {
HFileUtil.seekOnMultipleSources(istream, fileOffset);
// TODO: do we need seek time latencies?
long realOffset = istream.getPos();
if (realOffset != fileOffset) {
throw new IOException("Tried to seek to " + fileOffset + " to "
@ -1575,8 +1578,8 @@ public class HFileBlock implements Cacheable {
* @param pread whether to use a positional read
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread)
throws IOException {
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
boolean updateMetrics) throws IOException {
// Get a copy of the current state of whether to validate
// hbase checksums or not for this read call. This is not
// thread-safe but the one constaint is that if we decide
@ -1587,7 +1590,7 @@ public class HFileBlock implements Cacheable {
HFileBlock blk = readBlockDataInternal(is, offset,
onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum);
doVerificationThruHBaseChecksum, updateMetrics);
if (blk == null) {
HFile.LOG.warn("HBase checksum verification failed for file " +
pathName + " at offset " +
@ -1614,7 +1617,7 @@ public class HFileBlock implements Cacheable {
is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
doVerificationThruHBaseChecksum = false;
blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum);
doVerificationThruHBaseChecksum, updateMetrics);
if (blk != null) {
HFile.LOG.warn("HDFS checksum verification succeeded for file " +
pathName + " at offset " +
@ -1714,7 +1717,7 @@ public class HFileBlock implements Cacheable {
* @return the HFileBlock or null if there is a HBase checksum mismatch
*/
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
throws IOException {
if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read "
@ -1730,6 +1733,7 @@ public class HFileBlock implements Cacheable {
", pread=" + pread + ", verifyChecksum=" + verifyChecksum + ", cachedHeader=" +
headerBuf + ", onDiskSizeWithHeader=" + onDiskSizeWithHeader);
}
long startTime = System.currentTimeMillis();
if (onDiskSizeWithHeader <= 0) {
// We were not passed the block size. Need to get it from the header. If header was not in
// cache, need to seek to pull it in. This is costly and should happen very rarely.
@ -1778,6 +1782,10 @@ public class HFileBlock implements Cacheable {
!validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
return null;
}
long duration = System.currentTimeMillis() - startTime;
if (updateMetrics) {
HFile.updateReadLatency(duration, pread);
}
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
@ -1789,7 +1797,7 @@ public class HFileBlock implements Cacheable {
hFileBlock.sanityCheckUncompressed();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Read " + hFileBlock);
LOG.trace("Read " + hFileBlock + " in " + duration + " ns");
}
// Cache next block header if we read it for the next time through here.
if (nextBlockOnDiskSize != -1) {

View File

@ -371,7 +371,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
blockSize, true).unpack(hfileContext, fsBlockReader);
blockSize, true, false).unpack(hfileContext, fsBlockReader);
// Cache the block
if (cacheBlock) {
@ -452,7 +452,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
// Load block from filesystem.
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize,
pread);
pread, !isCompaction);
validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestMetricsIO {
public MetricsAssertHelper HELPER = CompatibilityFactory.getInstance(MetricsAssertHelper.class);
@Test
public void testMetrics() {
MetricsIO metrics = new MetricsIO(new MetricsIOWrapper() {
@Override
public long getChecksumFailures() { return 40; }
});
metrics.updateFsReadTime(100);
metrics.updateFsReadTime(200);
metrics.updateFsPreadTime(300);
metrics.updateFsWriteTime(400);
metrics.updateFsWriteTime(500);
metrics.updateFsWriteTime(600);
HELPER.assertCounter("fsChecksumFailureCount", 40, metrics.getMetricsSource());
HELPER.assertCounter("fsReadTime_numOps", 2, metrics.getMetricsSource());
HELPER.assertCounter("fsPReadTime_numOps", 1, metrics.getMetricsSource());
HELPER.assertCounter("fsWriteTime_numOps", 3, metrics.getMetricsSource());
}
}

View File

@ -93,7 +93,7 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, false);
HFileBlock b = hbr.readBlockData(0, -1, false, false);
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
@ -126,7 +126,7 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, false);
HFileBlock b = hbr.readBlockData(0, -1, false, false);
ByteBuffer data = b.getBufferWithoutHeader();
for (int i = 0; i < 1000; i++) {
assertEquals(i, data.getInt());
@ -138,7 +138,7 @@ public class TestChecksum {
exception_thrown = true;
}
assertTrue(exception_thrown);
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
}
@ -189,7 +189,7 @@ public class TestChecksum {
.withHBaseCheckSum(true)
.build();
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
@ -202,7 +202,7 @@ public class TestChecksum {
// assert that we encountered hbase checksum verification failures
// but still used hdfs checksums and read data successfully.
assertEquals(1, HFile.getChecksumFailuresCount());
assertEquals(1, HFile.getAndResetChecksumFailuresCount());
validateData(in);
// A single instance of hbase checksum failure causes the reader to
@ -210,18 +210,18 @@ public class TestChecksum {
// requests. Verify that this is correct.
for (int i = 0; i <
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
b = hbr.readBlockData(0, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
b = hbr.readBlockData(0, -1, pread, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
// The next read should have hbase checksum verification reanabled,
// we verify this by assertng that there was a hbase-checksum failure.
b = hbr.readBlockData(0, -1, pread);
assertEquals(1, HFile.getChecksumFailuresCount());
b = hbr.readBlockData(0, -1, pread, false);
assertEquals(1, HFile.getAndResetChecksumFailuresCount());
// Since the above encountered a checksum failure, we switch
// back to not checking hbase checksums.
b = hbr.readBlockData(0, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
b = hbr.readBlockData(0, -1, pread, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
is.close();
// Now, use a completely new reader. Switch off hbase checksums in
@ -231,7 +231,7 @@ public class TestChecksum {
assertEquals(false, newfs.useHBaseChecksum());
is = new FSDataInputStreamWrapper(newfs, path);
hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
b = hbr.readBlockData(0, -1, pread);
b = hbr.readBlockData(0, -1, pread, false);
is.close();
b.sanityCheck();
b = b.unpack(meta, hbr);
@ -245,7 +245,7 @@ public class TestChecksum {
// assert that we did not encounter hbase checksum verification failures
// but still used hdfs checksums and read data successfully.
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
validateData(in);
}
}
@ -315,7 +315,7 @@ public class TestChecksum {
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
is, nochecksum), totalSize, hfs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
is.close();
b.sanityCheck();
assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
@ -325,7 +325,7 @@ public class TestChecksum {
expectedChunks * HFileBlock.CHECKSUM_SIZE);
// assert that we did not encounter hbase checksum verification failures
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
}
}
@ -360,12 +360,13 @@ public class TestChecksum {
@Override
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException {
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
throws IOException {
if (verifyChecksum) {
corruptDataStream = true;
}
HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
verifyChecksum);
verifyChecksum, updateMetrics);
corruptDataStream = false;
return b;
}

View File

@ -314,9 +314,9 @@ public class TestHFileBlock {
.withIncludesTags(includesTag)
.withCompression(algo).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
is.close();
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
@ -328,12 +328,12 @@ public class TestHFileBlock {
is = fs.open(path);
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
b.totalChecksumBytes(), pread);
b.totalChecksumBytes(), pread, false);
assertEquals(expected, b);
int wrongCompressedSize = 2172;
try {
b = hbr.readBlockData(0, wrongCompressedSize
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread);
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
@ -416,8 +416,8 @@ public class TestHFileBlock {
HFileBlock blockFromHFile, blockUnpacked;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
blockFromHFile = hbr.readBlockData(pos, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
blockFromHFile.sanityCheck();
pos += blockFromHFile.getOnDiskSizeWithHeader();
assertEquals((int) encodedSizes.get(blockId),
@ -550,7 +550,7 @@ public class TestHFileBlock {
if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset);
}
HFileBlock b = hbr.readBlockData(curOffset, -1, pread);
HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
if (detailedLogging) {
LOG.info("Block #" + i + ": " + b);
}
@ -564,7 +564,7 @@ public class TestHFileBlock {
// Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader.
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread);
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false);
b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType());
@ -579,7 +579,7 @@ public class TestHFileBlock {
assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
assertEquals(b.getOnDiskDataSizeWithHeader(),
b2.getOnDiskDataSizeWithHeader());
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
curOffset += b.getOnDiskSizeWithHeader();
@ -673,7 +673,7 @@ public class TestHFileBlock {
HFileBlock b;
try {
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
b = hbr.readBlockData(offset, onDiskSizeArg, pread);
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
} catch (IOException ex) {
LOG.error("Error in client " + clientId + " trying to read block at "
+ offset + ", pread=" + pread + ", withOnDiskSize=" +

View File

@ -177,7 +177,7 @@ public class TestHFileBlockIndex {
}
missCount += 1;
prevBlock = realReader.readBlockData(offset, onDiskSize, pread);
prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false);
prevOffset = offset;
prevOnDiskSize = onDiskSize;
prevPread = pread;

View File

@ -17,6 +17,12 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -98,8 +104,8 @@ public class TestHFileEncryption {
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
throws IOException {
HFileBlock b = hbr.readBlockData(pos, -1, false);
assertEquals(0, HFile.getChecksumFailuresCount());
HFileBlock b = hbr.readBlockData(pos, -1, false, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
b.sanityCheck();
assertFalse(b.isUnpacked());
b = b.unpack(ctx, hbr);

View File

@ -190,7 +190,7 @@ public class TestHFileWriterV2 {
fsdis.seek(0);
long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false);
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false);
assertEquals(BlockType.DATA, block.getBlockType());
if (meta.isCompressedOrEncrypted()) {
assertFalse(block.isUnpacked());
@ -237,7 +237,7 @@ public class TestHFileWriterV2 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset());
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
.unpack(meta, blockReader);
assertEquals(BlockType.META, block.getBlockType());
Text t = new Text();

View File

@ -219,7 +219,7 @@ public class TestHFileWriterV3 {
fsdis.seek(0);
long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
.unpack(context, blockReader);
assertEquals(BlockType.DATA, block.getBlockType());
ByteBuffer buf = block.getBufferWithoutHeader();
@ -278,7 +278,7 @@ public class TestHFileWriterV3 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset());
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
.unpack(context, blockReader);
assertEquals(BlockType.META, block.getBlockType());
Text t = new Text();