HBASE-15160 Put back HFile's HDFS op latency sampling code and add metrics for monitoring (Yu Li and Enis Soztutar)

This commit is contained in:
Enis Soztutar 2017-06-02 17:41:46 -07:00
parent ef46debde8
commit 118429cbac
17 changed files with 443 additions and 49 deletions

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.metrics.BaseSource;
public interface MetricsIOSource extends BaseSource {
/**
* The name of the metrics
*/
String METRICS_NAME = "IO";
/**
* The name of the metrics context that metrics will be under.
*/
String METRICS_CONTEXT = "regionserver";
/**
* Description
*/
String METRICS_DESCRIPTION = "Metrics about FileSystem IO";
/**
* The name of the metrics context that metrics will be under in jmx
*/
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
String FS_READ_TIME_HISTO_KEY = "fsReadTime";
String FS_PREAD_TIME_HISTO_KEY = "fsPReadTime";
String FS_WRITE_HISTO_KEY = "fsWriteTime";
String CHECKSUM_FAILURES_KEY = "fsChecksumFailureCount";
String FS_READ_TIME_HISTO_DESC
= "Latency of HFile's sequential reads on this region server in milliseconds";
String FS_PREAD_TIME_HISTO_DESC
= "Latency of HFile's positional reads on this region server in milliseconds";
String FS_WRITE_TIME_HISTO_DESC
= "Latency of HFile's writes on this region server in milliseconds";
String CHECKSUM_FAILURES_DESC = "Number of checksum failures for the HBase HFile checksums at the"
+ " HBase level (separate from HDFS checksums)";
/**
* Update the fs sequential read time histogram
* @param t time it took, in milliseconds
*/
void updateFsReadTime(long t);
/**
* Update the fs positional read time histogram
* @param t time it took, in milliseconds
*/
void updateFsPReadTime(long t);
/**
* Update the fs write time histogram
* @param t time it took, in milliseconds
*/
void updateFsWriteTime(long t);
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
public interface MetricsIOWrapper {
long getChecksumFailures();
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.io.MetricsIOSource;
import org.apache.hadoop.hbase.io.MetricsIOWrapper;
/**
* Interface of a factory to create Metrics Sources used inside of regionservers.
*/
@ -60,4 +63,11 @@ public interface MetricsRegionServerSourceFactory {
* @return A metrics heap memory manager source
*/
MetricsHeapMemoryManagerSource getHeapMemoryManager();
/**
* Create a MetricsIOSource from a MetricsIOWrapper.
*
* @return A metrics IO source
*/
MetricsIOSource createIO(MetricsIOWrapper wrapper);
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.Interns;
public class MetricsIOSourceImpl extends BaseSourceImpl implements MetricsIOSource {
private final MetricsIOWrapper wrapper;
private final MetricHistogram fsReadTimeHisto;
private final MetricHistogram fsPReadTimeHisto;
private final MetricHistogram fsWriteTimeHisto;
public MetricsIOSourceImpl(MetricsIOWrapper wrapper) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, wrapper);
}
public MetricsIOSourceImpl(String metricsName,
String metricsDescription,
String metricsContext,
String metricsJmxContext,
MetricsIOWrapper wrapper) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
this.wrapper = wrapper;
fsReadTimeHisto = getMetricsRegistry()
.newTimeHistogram(FS_READ_TIME_HISTO_KEY, FS_READ_TIME_HISTO_DESC);
fsPReadTimeHisto = getMetricsRegistry()
.newTimeHistogram(FS_PREAD_TIME_HISTO_KEY, FS_PREAD_TIME_HISTO_DESC);
fsWriteTimeHisto = getMetricsRegistry()
.newTimeHistogram(FS_WRITE_HISTO_KEY, FS_WRITE_TIME_HISTO_DESC);
}
@Override
public void updateFsReadTime(long t) {
fsReadTimeHisto.add(t);
};
@Override
public void updateFsPReadTime(long t) {
fsPReadTimeHisto.add(t);
};
@Override
public void updateFsWriteTime(long t) {
fsWriteTimeHisto.add(t);
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
// wrapper can be null because this function is called inside of init.
if (wrapper != null) {
mrb.addCounter(Interns.info(CHECKSUM_FAILURES_KEY, CHECKSUM_FAILURES_DESC),
wrapper.getChecksumFailures());
}
metricsRegistry.snapshot(mrb, all);
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.MetricsIOSource;
import org.apache.hadoop.hbase.io.MetricsIOSourceImpl;
import org.apache.hadoop.hbase.io.MetricsIOWrapper;
/**
* Factory to create MetricsRegionServerSource when given a MetricsRegionServerWrapper
@ -75,4 +78,8 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public MetricsTableSource createTable(String table, MetricsTableWrapperAggregate wrapper) {
return new MetricsTableSourceImpl(table, getTableAggregate(), wrapper);
}
public MetricsIOSource createIO(MetricsIOWrapper wrapper) {
return new MetricsIOSourceImpl(wrapper);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceFactory;
import com.google.common.annotations.VisibleForTesting;
public class MetricsIO {
private final MetricsIOSource source;
private final MetricsIOWrapper wrapper;
public MetricsIO(MetricsIOWrapper wrapper) {
this(CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createIO(wrapper), wrapper);
}
MetricsIO(MetricsIOSource source, MetricsIOWrapper wrapper) {
this.source = source;
this.wrapper = wrapper;
}
@VisibleForTesting
public MetricsIOSource getMetricsSource() {
return source;
}
@VisibleForTesting
public MetricsIOWrapper getWrapper() {
return wrapper;
}
public void updateFsReadTime(long t) {
source.updateFsReadTime(t);
}
public void updateFsPreadTime(long t) {
source.updateFsPReadTime(t);
}
public void updateFsWriteTime(long t) {
source.updateFsWriteTime(t);
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.io.hfile.HFile;
public class MetricsIOWrapperImpl implements MetricsIOWrapper {
@Override
public long getChecksumFailures() {
return HFile.getAndResetChecksumFailuresCount();
}
}

View File

@ -52,6 +52,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.MetricsIO;
import org.apache.hadoop.hbase.io.MetricsIOWrapperImpl;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@ -188,12 +190,35 @@ public class HFile {
// For tests. Gets incremented when we read a block whether from HDFS or from Cache.
public static final LongAdder DATABLOCK_READ_COUNT = new LongAdder();
/** Static instance for the metrics so that HFileReaders access the same instance */
static final MetricsIO metrics = new MetricsIO(new MetricsIOWrapperImpl());
/**
* Number of checksum verification failures. It also
* clears the counter.
*/
public static final long getAndResetChecksumFailuresCount() {
return CHECKSUM_FAILURES.sumThenReset();
}
/**
* Number of checksum verification failures. It also
* clears the counter.
*/
public static final long getChecksumFailuresCount() {
return CHECKSUM_FAILURES.sumThenReset();
return CHECKSUM_FAILURES.sum();
}
public static final void updateReadLatency(long latencyMillis, boolean pread) {
if (pread) {
metrics.updateFsPreadTime(latencyMillis);
} else {
metrics.updateFsReadTime(latencyMillis);
}
}
public static final void updateWriteLatency(long latencyMillis) {
metrics.updateFsWriteTime(latencyMillis);
}
/** API required to write an {@link HFile} */

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@ -54,6 +51,9 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches.
* Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since
@ -435,6 +435,7 @@ public class HFileBlock implements Cacheable {
return nextBlockOnDiskSize;
}
@Override
public BlockType getBlockType() {
return blockType;
}
@ -1113,8 +1114,10 @@ public class HFileBlock implements Cacheable {
protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
throws IOException {
ensureBlockReady();
long startTime = System.currentTimeMillis();
out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
out.write(onDiskChecksum);
HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
}
/**
@ -1363,7 +1366,8 @@ public class HFileBlock implements Cacheable {
* applicable headers, or -1 if unknown
* @return the newly read block
*/
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread) throws IOException;
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics)
throws IOException;
/**
* Creates a block iterator over the given portion of the {@link HFile}.
@ -1483,7 +1487,7 @@ public class HFileBlock implements Cacheable {
if (offset >= endOffset) {
return null;
}
HFileBlock b = readBlockData(offset, length, false);
HFileBlock b = readBlockData(offset, length, false, false);
offset += b.getOnDiskSizeWithHeader();
length = b.getNextBlockOnDiskSize();
return b.unpack(fileContext, owner);
@ -1520,7 +1524,8 @@ public class HFileBlock implements Cacheable {
*/
@VisibleForTesting
protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
boolean peekIntoNextBlock, long fileOffset, boolean pread)
throws IOException {
if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
// We are asked to read the next block's header as well, but there is
// not enough room in the array.
@ -1531,6 +1536,7 @@ public class HFileBlock implements Cacheable {
if (!pread) {
// Seek + read. Better for scanning.
HFileUtil.seekOnMultipleSources(istream, fileOffset);
// TODO: do we need seek time latencies?
long realOffset = istream.getPos();
if (realOffset != fileOffset) {
throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
@ -1568,8 +1574,8 @@ public class HFileBlock implements Cacheable {
* @param pread whether to use a positional read
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread)
throws IOException {
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
boolean updateMetrics) throws IOException {
// Get a copy of the current state of whether to validate
// hbase checksums or not for this read call. This is not
// thread-safe but the one constaint is that if we decide
@ -1580,7 +1586,7 @@ public class HFileBlock implements Cacheable {
HFileBlock blk = readBlockDataInternal(is, offset,
onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum);
doVerificationThruHBaseChecksum, updateMetrics);
if (blk == null) {
HFile.LOG.warn("HBase checksum verification failed for file " +
pathName + " at offset " +
@ -1607,7 +1613,7 @@ public class HFileBlock implements Cacheable {
is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
doVerificationThruHBaseChecksum = false;
blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum);
doVerificationThruHBaseChecksum, updateMetrics);
if (blk != null) {
HFile.LOG.warn("HDFS checksum verification succeeded for file " +
pathName + " at offset " +
@ -1708,7 +1714,7 @@ public class HFileBlock implements Cacheable {
*/
@VisibleForTesting
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
throws IOException {
if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read "
@ -1724,6 +1730,7 @@ public class HFileBlock implements Cacheable {
", pread=" + pread + ", verifyChecksum=" + verifyChecksum + ", cachedHeader=" +
headerBuf + ", onDiskSizeWithHeader=" + onDiskSizeWithHeader);
}
long startTime = System.currentTimeMillis();
if (onDiskSizeWithHeader <= 0) {
// We were not passed the block size. Need to get it from the header. If header was not in
// cache, need to seek to pull it in. This is costly and should happen very rarely.
@ -1771,6 +1778,10 @@ public class HFileBlock implements Cacheable {
!validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
return null;
}
long duration = System.currentTimeMillis() - startTime;
if (updateMetrics) {
HFile.updateReadLatency(duration, pread);
}
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
@ -1783,7 +1794,7 @@ public class HFileBlock implements Cacheable {
hFileBlock.sanityCheckUncompressed();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Read " + hFileBlock);
LOG.trace("Read " + hFileBlock + " in " + duration + " ns");
}
// Cache next block header if we read it for the next time through here.
if (nextBlockOnDiskSize != -1) {

View File

@ -1395,7 +1395,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache Miss, please load.
}
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, true).
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false).
unpack(hfileContext, fsBlockReader);
// Cache the block
@ -1483,7 +1483,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
// Load block from filesystem.
HFileBlock hfileBlock =
fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread);
fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction);
validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();

View File

@ -219,7 +219,9 @@ public class HFileWriterImpl implements HFile.Writer {
throws IOException {
trailer.setFileInfoOffset(outputStream.getPos());
finishFileInfo();
long startTime = System.currentTimeMillis();
fileInfo.write(out);
HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
}
/**
@ -827,7 +829,9 @@ public class HFileWriterImpl implements HFile.Writer {
trailer.setEntryCount(entryCount);
trailer.setCompressionCodec(hFileContext.getCompression());
long startTime = System.currentTimeMillis();
trailer.serialize(outputStream);
HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
if (closeOutputStream) {
outputStream.close();

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestMetricsIO {
public MetricsAssertHelper HELPER = CompatibilityFactory.getInstance(MetricsAssertHelper.class);
@Test
public void testMetrics() {
MetricsIO metrics = new MetricsIO(new MetricsIOWrapper() {
@Override
public long getChecksumFailures() { return 40; }
});
metrics.updateFsReadTime(100);
metrics.updateFsReadTime(200);
metrics.updateFsPreadTime(300);
metrics.updateFsWriteTime(400);
metrics.updateFsWriteTime(500);
metrics.updateFsWriteTime(600);
HELPER.assertCounter("fsChecksumFailureCount", 40, metrics.getMetricsSource());
HELPER.assertCounter("fsReadTime_numOps", 2, metrics.getMetricsSource());
HELPER.assertCounter("fsPReadTime_numOps", 1, metrics.getMetricsSource());
HELPER.assertCounter("fsWriteTime_numOps", 3, metrics.getMetricsSource());
}
}

View File

@ -94,7 +94,7 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, false);
HFileBlock b = hbr.readBlockData(0, -1, false, false);
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
@ -127,7 +127,7 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, false);
HFileBlock b = hbr.readBlockData(0, -1, false, false);
ByteBuff data = b.getBufferWithoutHeader();
for (int i = 0; i < 1000; i++) {
assertEquals(i, data.getInt());
@ -139,7 +139,7 @@ public class TestChecksum {
exception_thrown = true;
}
assertTrue(exception_thrown);
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
}
@ -190,7 +190,7 @@ public class TestChecksum {
.withHBaseCheckSum(true)
.build();
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
@ -203,7 +203,7 @@ public class TestChecksum {
// assert that we encountered hbase checksum verification failures
// but still used hdfs checksums and read data successfully.
assertEquals(1, HFile.getChecksumFailuresCount());
assertEquals(1, HFile.getAndResetChecksumFailuresCount());
validateData(in);
// A single instance of hbase checksum failure causes the reader to
@ -211,18 +211,18 @@ public class TestChecksum {
// requests. Verify that this is correct.
for (int i = 0; i <
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
b = hbr.readBlockData(0, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
b = hbr.readBlockData(0, -1, pread, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
// The next read should have hbase checksum verification reanabled,
// we verify this by assertng that there was a hbase-checksum failure.
b = hbr.readBlockData(0, -1, pread);
assertEquals(1, HFile.getChecksumFailuresCount());
b = hbr.readBlockData(0, -1, pread, false);
assertEquals(1, HFile.getAndResetChecksumFailuresCount());
// Since the above encountered a checksum failure, we switch
// back to not checking hbase checksums.
b = hbr.readBlockData(0, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
b = hbr.readBlockData(0, -1, pread, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
is.close();
// Now, use a completely new reader. Switch off hbase checksums in
@ -232,7 +232,7 @@ public class TestChecksum {
assertEquals(false, newfs.useHBaseChecksum());
is = new FSDataInputStreamWrapper(newfs, path);
hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
b = hbr.readBlockData(0, -1, pread);
b = hbr.readBlockData(0, -1, pread, false);
is.close();
b.sanityCheck();
b = b.unpack(meta, hbr);
@ -246,7 +246,7 @@ public class TestChecksum {
// assert that we did not encounter hbase checksum verification failures
// but still used hdfs checksums and read data successfully.
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
validateData(in);
}
}
@ -316,7 +316,7 @@ public class TestChecksum {
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
is, nochecksum), totalSize, hfs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
is.close();
b.sanityCheck();
assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
@ -326,7 +326,7 @@ public class TestChecksum {
expectedChunks * HFileBlock.CHECKSUM_SIZE);
// assert that we did not encounter hbase checksum verification failures
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
}
}
@ -361,12 +361,13 @@ public class TestChecksum {
@Override
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException {
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
throws IOException {
if (verifyChecksum) {
corruptDataStream = true;
}
HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
verifyChecksum);
verifyChecksum, updateMetrics);
corruptDataStream = false;
return b;
}

View File

@ -320,9 +320,9 @@ public class TestHFileBlock {
.withIncludesTags(includesTag)
.withCompression(algo).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
is.close();
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
@ -334,12 +334,12 @@ public class TestHFileBlock {
is = fs.open(path);
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
b.totalChecksumBytes(), pread);
b.totalChecksumBytes(), pread, false);
assertEquals(expected, b);
int wrongCompressedSize = 2172;
try {
b = hbr.readBlockData(0, wrongCompressedSize
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread);
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
@ -422,8 +422,8 @@ public class TestHFileBlock {
HFileBlock blockFromHFile, blockUnpacked;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
blockFromHFile = hbr.readBlockData(pos, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
blockFromHFile.sanityCheck();
pos += blockFromHFile.getOnDiskSizeWithHeader();
assertEquals((int) encodedSizes.get(blockId),
@ -557,7 +557,7 @@ public class TestHFileBlock {
if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset);
}
HFileBlock b = hbr.readBlockData(curOffset, -1, pread);
HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
if (detailedLogging) {
LOG.info("Block #" + i + ": " + b);
}
@ -571,7 +571,7 @@ public class TestHFileBlock {
// Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader.
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread);
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false);
b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType());
@ -586,7 +586,7 @@ public class TestHFileBlock {
assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
assertEquals(b.getOnDiskDataSizeWithHeader(),
b2.getOnDiskDataSizeWithHeader());
assertEquals(0, HFile.getChecksumFailuresCount());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
curOffset += b.getOnDiskSizeWithHeader();
@ -681,7 +681,7 @@ public class TestHFileBlock {
HFileBlock b;
try {
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
b = hbr.readBlockData(offset, onDiskSizeArg, pread);
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
} catch (IOException ex) {
LOG.error("Error in client " + clientId + " trying to read block at "
+ offset + ", pread=" + pread + ", withOnDiskSize=" +

View File

@ -186,7 +186,7 @@ public class TestHFileBlockIndex {
}
missCount += 1;
prevBlock = realReader.readBlockData(offset, onDiskSize, pread);
prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false);
prevOffset = offset;
prevOnDiskSize = onDiskSize;
prevPread = pread;

View File

@ -17,6 +17,12 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -49,8 +55,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
@Category({IOTests.class, SmallTests.class})
public class TestHFileEncryption {
private static final Log LOG = LogFactory.getLog(TestHFileEncryption.class);
@ -99,8 +103,8 @@ public class TestHFileEncryption {
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
throws IOException {
HFileBlock b = hbr.readBlockData(pos, -1, false);
assertEquals(0, HFile.getChecksumFailuresCount());
HFileBlock b = hbr.readBlockData(pos, -1, false, false);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
b.sanityCheck();
assertFalse(b.isUnpacked());
b = b.unpack(ctx, hbr);

View File

@ -218,7 +218,7 @@ public class TestHFileWriterV3 {
fsdis.seek(0);
long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
.unpack(context, blockReader);
assertEquals(BlockType.DATA, block.getBlockType());
ByteBuff buf = block.getBufferWithoutHeader();
@ -279,7 +279,7 @@ public class TestHFileWriterV3 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset());
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
.unpack(context, blockReader);
assertEquals(BlockType.META, block.getBlockType());
Text t = new Text();