HBASE-27153 Improvements to read-path tracing

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Nick Dimiduk 2022-06-23 14:20:39 +02:00 committed by Nick Dimiduk
parent e8c14ee308
commit d734acc00e
9 changed files with 670 additions and 192 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.trace.hamcrest;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.is;
import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
@ -54,6 +55,10 @@ public final class AttributesMatchers {
return containsEntry(AttributeKey.stringKey(key), value); return containsEntry(AttributeKey.stringKey(key), value);
} }
public static Matcher<Attributes> containsEntry(String key, long value) {
return containsEntry(AttributeKey.longKey(key), value);
}
public static Matcher<Attributes> containsEntryWithStringValuesOf(String key, String... values) { public static Matcher<Attributes> containsEntryWithStringValuesOf(String key, String... values) {
return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values)); return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values));
} }
@ -63,6 +68,10 @@ public final class AttributesMatchers {
return new IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher); return new IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher);
} }
public static Matcher<Attributes> isEmpty() {
return hasProperty("empty", is(true));
}
private static final class IsAttributesContaining<T> extends TypeSafeMatcher<Attributes> { private static final class IsAttributesContaining<T> extends TypeSafeMatcher<Attributes> {
private final Matcher<AttributeKey<? super T>> keyMatcher; private final Matcher<AttributeKey<? super T>> keyMatcher;
private final Matcher<? super T> valueMatcher; private final Matcher<? super T> valueMatcher;

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.trace;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CHECKSUM_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.COMPRESSION_ALGORITHM_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DATA_BLOCK_ENCODING_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ENCRYPTION_CIPHER_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HFILE_NAME_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.READ_TYPE_KEY;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.yetus.audience.InterfaceAudience;
/**
* <p>
* Populate fields on an {@link AttributesBuilder} based on an {@link HFileContext}. Passed around
* inside an active {@link Context}, indexed under {@link #CONTEXT_KEY}. The class is designed such
* that calls to the {@link #accept(AttributesBuilder)} method are idempotent with regards to the
* instance of this class.
* </p>
* <p>
* The true and truly ridiculous class name should be something more like
* {@code HFileContext_ContextAttributes_AttributesBuilder_Consumer}.
* </p>
*/
@InterfaceAudience.Private
public class HFileContextAttributesBuilderConsumer implements Consumer<AttributesBuilder> {
/**
* Used to place extract attributes pertaining to the {@link HFileContext} that scopes the active
* {@link Context}.
*/
public static final ContextKey<Consumer<AttributesBuilder>> CONTEXT_KEY =
ContextKey.named("db.hbase.io.hfile.context_attributes");
private final HFileContext hFileContext;
private boolean skipChecksum = false;
private ReadType readType = null;
public HFileContextAttributesBuilderConsumer(final HFileContext hFileContext) {
this.hFileContext = Objects.requireNonNull(hFileContext);
}
/**
* Specify that the {@link ChecksumType} should not be included in the attributes.
*/
public HFileContextAttributesBuilderConsumer setSkipChecksum(final boolean skipChecksum) {
this.skipChecksum = skipChecksum;
return this;
}
/**
* Specify the {@link ReadType} involced in this IO operation.
*/
public HFileContextAttributesBuilderConsumer setReadType(final ReadType readType) {
// TODO: this is not a part of the HFileBlock, its context of the operation. Should track this
// detail elsewhere.
this.readType = readType;
return this;
}
@Override
public void accept(AttributesBuilder builder) {
if (hFileContext.getHFileName() != null) {
builder.put(HFILE_NAME_KEY, hFileContext.getHFileName());
}
if (hFileContext.getCompression() != null) {
builder.put(COMPRESSION_ALGORITHM_KEY, hFileContext.getCompression().getName());
}
if (hFileContext.getDataBlockEncoding() != null) {
builder.put(DATA_BLOCK_ENCODING_KEY, hFileContext.getDataBlockEncoding().name());
}
if (
hFileContext.getEncryptionContext() != null
&& hFileContext.getEncryptionContext().getCipher() != null
) {
builder.put(ENCRYPTION_CIPHER_KEY, hFileContext.getEncryptionContext().getCipher().getName());
}
if (!skipChecksum && hFileContext.getChecksumType() != null) {
builder.put(CHECKSUM_KEY, hFileContext.getChecksumType().getName());
}
if (readType != null) {
builder.put(READ_TYPE_KEY, readType.name());
}
}
}

View File

@ -17,13 +17,22 @@
*/ */
package org.apache.hadoop.hbase.io.util; package org.apache.hadoop.hbase.io.util;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -76,18 +85,24 @@ public final class BlockIOUtils {
* @throws IOException exception to throw if any error happen * @throws IOException exception to throw if any error happen
*/ */
public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
final Span span = Span.current();
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
if (!isByteBufferReadable(dis)) { if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
// the destination ByteBuff. // the destination ByteBuff.
byte[] heapBuf = new byte[length]; byte[] heapBuf = new byte[length];
IOUtils.readFully(dis, heapBuf, 0, length); IOUtils.readFully(dis, heapBuf, 0, length);
annotateHeapBytesRead(attributesBuilder, length);
span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
copyToByteBuff(heapBuf, 0, length, buf); copyToByteBuff(heapBuf, 0, length, buf);
return; return;
} }
int directBytesRead = 0, heapBytesRead = 0;
ByteBuffer[] buffers = buf.nioByteBuffers(); ByteBuffer[] buffers = buf.nioByteBuffers();
int remain = length; int remain = length;
int idx = 0; int idx = 0;
ByteBuffer cur = buffers[idx]; ByteBuffer cur = buffers[idx];
try {
while (remain > 0) { while (remain > 0) {
while (!cur.hasRemaining()) { while (!cur.hasRemaining()) {
if (++idx >= buffers.length) { if (++idx >= buffers.length) {
@ -103,6 +118,15 @@ public final class BlockIOUtils {
"Premature EOF from inputStream, but still need " + remain + " " + "bytes"); "Premature EOF from inputStream, but still need " + remain + " " + "bytes");
} }
remain -= bytesRead; remain -= bytesRead;
if (cur.isDirect()) {
directBytesRead += bytesRead;
} else {
heapBytesRead += bytesRead;
}
}
} finally {
annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
} }
} }
@ -116,11 +140,13 @@ public final class BlockIOUtils {
*/ */
public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
throws IOException { throws IOException {
byte[] buffer = new byte[1024];
if (length < 0) { if (length < 0) {
throw new IllegalArgumentException("Length must not be negative: " + length); throw new IllegalArgumentException("Length must not be negative: " + length);
} }
int heapBytesRead = 0;
int remain = length, count; int remain = length, count;
byte[] buffer = new byte[1024];
try {
while (remain > 0) { while (remain > 0) {
count = in.read(buffer, 0, Math.min(remain, buffer.length)); count = in.read(buffer, 0, Math.min(remain, buffer.length));
if (count < 0) { if (count < 0) {
@ -129,6 +155,13 @@ public final class BlockIOUtils {
} }
out.put(buffer, 0, count); out.put(buffer, 0, count);
remain -= count; remain -= count;
heapBytesRead += count;
}
} finally {
final Span span = Span.current();
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
annotateHeapBytesRead(attributesBuilder, heapBytesRead);
span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", attributesBuilder.build());
} }
} }
@ -147,7 +180,9 @@ public final class BlockIOUtils {
*/ */
private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
int necessaryLen, int extraLen) throws IOException { int necessaryLen, int extraLen) throws IOException {
int heapBytesRead = 0;
int bytesRemaining = necessaryLen + extraLen; int bytesRemaining = necessaryLen + extraLen;
try {
while (bytesRemaining > 0) { while (bytesRemaining > 0) {
int ret = in.read(buf, bufOffset, bytesRemaining); int ret = in.read(buf, bufOffset, bytesRemaining);
if (ret < 0) { if (ret < 0) {
@ -161,6 +196,13 @@ public final class BlockIOUtils {
} }
bufOffset += ret; bufOffset += ret;
bytesRemaining -= ret; bytesRemaining -= ret;
heapBytesRead += ret;
}
} finally {
final Span span = Span.current();
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
annotateHeapBytesRead(attributesBuilder, heapBytesRead);
span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
} }
return bytesRemaining <= 0; return bytesRemaining <= 0;
} }
@ -186,15 +228,18 @@ public final class BlockIOUtils {
copyToByteBuff(heapBuf, 0, heapBuf.length, buf); copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
return ret; return ret;
} }
int directBytesRead = 0, heapBytesRead = 0;
ByteBuffer[] buffers = buf.nioByteBuffers(); ByteBuffer[] buffers = buf.nioByteBuffers();
int bytesRead = 0; int bytesRead = 0;
int remain = necessaryLen + extraLen; int remain = necessaryLen + extraLen;
int idx = 0; int idx = 0;
ByteBuffer cur = buffers[idx]; ByteBuffer cur = buffers[idx];
try {
while (bytesRead < necessaryLen) { while (bytesRead < necessaryLen) {
while (!cur.hasRemaining()) { while (!cur.hasRemaining()) {
if (++idx >= buffers.length) { if (++idx >= buffers.length) {
throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); throw new IOException(
"Not enough ByteBuffers to read the reminding " + remain + "bytes");
} }
cur = buffers[idx]; cur = buffers[idx];
} }
@ -207,6 +252,17 @@ public final class BlockIOUtils {
} }
bytesRead += ret; bytesRead += ret;
remain -= ret; remain -= ret;
if (cur.isDirect()) {
directBytesRead += ret;
} else {
heapBytesRead += ret;
}
}
} finally {
final Span span = Span.current();
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
} }
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
} }
@ -264,6 +320,7 @@ public final class BlockIOUtils {
byte[] buf = new byte[remain]; byte[] buf = new byte[remain];
int bytesRead = 0; int bytesRead = 0;
int lengthMustRead = readAllBytes ? remain : necessaryLen; int lengthMustRead = readAllBytes ? remain : necessaryLen;
try {
while (bytesRead < lengthMustRead) { while (bytesRead < lengthMustRead) {
int ret = dis.read(position + bytesRead, buf, bytesRead, remain); int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
if (ret < 0) { if (ret < 0) {
@ -274,21 +331,30 @@ public final class BlockIOUtils {
bytesRead += ret; bytesRead += ret;
remain -= ret; remain -= ret;
} }
} finally {
final Span span = Span.current();
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
annotateHeapBytesRead(attributesBuilder, bytesRead);
span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
}
copyToByteBuff(buf, 0, bytesRead, buff); copyToByteBuff(buf, 0, bytesRead, buff);
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
} }
private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
int directBytesRead = 0, heapBytesRead = 0;
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
ByteBuffer[] buffers = buff.nioByteBuffers(); ByteBuffer[] buffers = buff.nioByteBuffers();
ByteBuffer cur = buffers[idx]; ByteBuffer cur = buffers[idx];
int lengthMustRead = readAllBytes ? remain : necessaryLen; int lengthMustRead = readAllBytes ? remain : necessaryLen;
try {
while (bytesRead < lengthMustRead) { while (bytesRead < lengthMustRead) {
int ret; int ret;
while (!cur.hasRemaining()) { while (!cur.hasRemaining()) {
if (++idx >= buffers.length) { if (++idx >= buffers.length) {
throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); throw new IOException(
"Not enough ByteBuffers to read the reminding " + remain + "bytes");
} }
cur = buffers[idx]; cur = buffers[idx];
} }
@ -313,6 +379,17 @@ public final class BlockIOUtils {
} }
bytesRead += ret; bytesRead += ret;
remain -= ret; remain -= ret;
if (cur.isDirect()) {
directBytesRead += bytesRead;
} else {
heapBytesRead += bytesRead;
}
}
} finally {
final Span span = Span.current();
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
} }
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
@ -340,4 +417,38 @@ public final class BlockIOUtils {
} }
return len; return len;
} }
/**
* Construct a fresh {@link AttributesBuilder} from the provided {@link Context}, populated with
* relevant attributes populated by {@link HFileContextAttributesBuilderConsumer#CONTEXT_KEY}.
*/
private static AttributesBuilder builderFromContext(Context context) {
final AttributesBuilder attributesBuilder = Attributes.builder();
Optional.ofNullable(context)
.map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY))
.ifPresent(c -> c.accept(attributesBuilder));
return attributesBuilder;
}
/**
* Conditionally annotate {@code span} with the appropriate attribute when value is non-zero.
*/
private static void annotateHeapBytesRead(AttributesBuilder attributesBuilder,
int heapBytesRead) {
annotateBytesRead(attributesBuilder, 0, heapBytesRead);
}
/**
* Conditionally annotate {@code attributesBuilder} with appropriate attributes when values are
* non-zero.
*/
private static void annotateBytesRead(AttributesBuilder attributesBuilder, long directBytesRead,
long heapBytesRead) {
if (directBytesRead > 0) {
attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead);
}
if (heapBytesRead > 0) {
attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead);
}
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -56,6 +57,66 @@ public final class HBaseSemanticAttributes {
AttributeKey.booleanKey("db.hbase.rowlock.readlock"); AttributeKey.booleanKey("db.hbase.rowlock.readlock");
public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl"); public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
/**
* Indicates the amount of data was read into a {@link ByteBuffer} of type
* {@link ByteBuffer#isDirect() direct}.
*/
public static final AttributeKey<Long> DIRECT_BYTES_READ_KEY =
AttributeKey.longKey("db.hbase.io.direct_bytes_read");
/**
* Indicates the amount of data was read into a {@link ByteBuffer} not of type
* {@link ByteBuffer#isDirect() direct}.
*/
public static final AttributeKey<Long> HEAP_BYTES_READ_KEY =
AttributeKey.longKey("db.hbase.io.heap_bytes_read");
/**
* Indicates the {@link org.apache.hadoop.hbase.io.compress.Compression.Algorithm} used to encode
* an HFile.
*/
public static final AttributeKey<String> COMPRESSION_ALGORITHM_KEY =
AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding");
/**
* Indicates the {@link org.apache.hadoop.hbase.io.encoding.DataBlockEncoding} algorithm used to
* encode this HFile.
*/
public static final AttributeKey<String> DATA_BLOCK_ENCODING_KEY =
AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding");
/**
* Indicates the {@link org.apache.hadoop.hbase.io.crypto.Cipher} used to encrypt this HFile.
*/
public static final AttributeKey<String> ENCRYPTION_CIPHER_KEY =
AttributeKey.stringKey("db.hbase.io.hfile.encryption_cipher");
/**
* Indicates the {@link org.apache.hadoop.hbase.util.ChecksumType} used to encode this HFile.
*/
public static final AttributeKey<String> CHECKSUM_KEY =
AttributeKey.stringKey("db.hbase.io.hfile.checksum_type");
/**
* Indicates the name of the HFile accessed.
*/
public static final AttributeKey<String> HFILE_NAME_KEY =
AttributeKey.stringKey("db.hbase.io.hfile.file_name");
/**
* Indicated the type of read.
*/
public static final AttributeKey<String> READ_TYPE_KEY =
AttributeKey.stringKey("db.hbase.io.hfile.read_type");
/**
* Identifies an entry in the Block Cache.
*/
public static final AttributeKey<String> BLOCK_CACHE_KEY_KEY =
AttributeKey.stringKey("db.hbase.io.hfile.block_cache_key");
/**
* These values represent the different IO read strategies HBase may employ for accessing
* filesystem data.
*/
public enum ReadType {
// TODO: promote this to the FSReader#readBlockData API. Or somehow instead use Scan.ReadType.
POSITIONAL_READ,
SEEK_PLUS_READ,
}
/** /**
* These are values used with {@link #DB_OPERATION}. They correspond with the implementations of * These are values used with {@link #DB_OPERATION}. They correspond with the implementations of
* {@code org.apache.hadoop.hbase.client.Operation}, as well as * {@code org.apache.hadoop.hbase.client.Operation}, as well as

View File

@ -18,7 +18,13 @@
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -26,6 +32,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -46,11 +53,13 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -620,7 +629,9 @@ public class HFileBlock implements Cacheable {
HFileBlock unpacked = shallowClone(this, newBuf); HFileBlock unpacked = shallowClone(this, newBuf);
boolean succ = false; boolean succ = false;
try { final Context context =
Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext));
try (Scope ignored = context.makeCurrent()) {
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
? reader.getBlockDecodingContext() ? reader.getBlockDecodingContext()
: reader.getDefaultBlockDecodingContext(); : reader.getDefaultBlockDecodingContext();
@ -1479,17 +1490,21 @@ public class HFileBlock implements Cacheable {
boolean updateMetrics, boolean intoHeap) throws IOException { boolean updateMetrics, boolean intoHeap) throws IOException {
// Get a copy of the current state of whether to validate // Get a copy of the current state of whether to validate
// hbase checksums or not for this read call. This is not // hbase checksums or not for this read call. This is not
// thread-safe but the one constaint is that if we decide // thread-safe but the one constraint is that if we decide
// to skip hbase checksum verification then we are // to skip hbase checksum verification then we are
// guaranteed to use hdfs checksum verification. // guaranteed to use hdfs checksum verification.
boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
final Context context = Context.current().with(CONTEXT_KEY,
new HFileContextAttributesBuilderConsumer(fileContext)
.setSkipChecksum(doVerificationThruHBaseChecksum)
.setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ));
try (Scope ignored = context.makeCurrent()) {
HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum, updateMetrics, intoHeap); doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
if (blk == null) { if (blk == null) {
HFile.LOG.warn("HBase checksum verification failed for file " + pathName + " at offset " HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}."
+ offset + " filesize " + fileSize + ". Retrying read with HDFS checksums turned on..."); + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize);
if (!doVerificationThruHBaseChecksum) { if (!doVerificationThruHBaseChecksum) {
String msg = "HBase checksum verification failed for file " + pathName + " at offset " String msg = "HBase checksum verification failed for file " + pathName + " at offset "
@ -1511,8 +1526,9 @@ public class HFileBlock implements Cacheable {
blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum, updateMetrics, intoHeap); doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
if (blk != null) { if (blk != null) {
HFile.LOG.warn("HDFS checksum verification succeeded for file " + pathName + " at offset " HFile.LOG.warn(
+ offset + " filesize " + fileSize); "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}",
pathName, offset, fileSize);
} }
} }
if (blk == null && !doVerificationThruHBaseChecksum) { if (blk == null && !doVerificationThruHBaseChecksum) {
@ -1533,6 +1549,7 @@ public class HFileBlock implements Cacheable {
streamWrapper.checksumOk(); streamWrapper.checksumOk();
return blk; return blk;
} }
}
/** /**
* Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int * Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
@ -1629,6 +1646,11 @@ public class HFileBlock implements Cacheable {
throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize="
+ onDiskSizeWithHeaderL + ")"); + onDiskSizeWithHeaderL + ")");
} }
final Span span = Span.current();
final AttributesBuilder attributesBuilder = Attributes.builder();
Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
.ifPresent(c -> c.accept(attributesBuilder));
int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
// Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
// and will save us having to seek the stream backwards to reread the header we // and will save us having to seek the stream backwards to reread the header we
@ -1653,8 +1675,9 @@ public class HFileBlock implements Cacheable {
// in a LOG every time we seek. See HBASE-17072 for more detail. // in a LOG every time we seek. See HBASE-17072 for more detail.
if (headerBuf == null) { if (headerBuf == null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException()); LOG.trace("Extra seek to get block size!", new RuntimeException());
} }
span.addEvent("Extra seek to get block size!", attributesBuilder.build());
headerBuf = HEAP.allocate(hdrSize); headerBuf = HEAP.allocate(hdrSize);
readAtOffset(is, headerBuf, hdrSize, false, offset, pread); readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
headerBuf.rewind(); headerBuf.rewind();
@ -1707,6 +1730,7 @@ public class HFileBlock implements Cacheable {
hFileBlock.sanityCheckUncompressed(); hFileBlock.sanityCheckUncompressed();
} }
LOG.trace("Read {} in {} ms", hFileBlock, duration); LOG.trace("Read {} in {} ms", hFileBlock, duration);
span.addEvent("Read block", attributesBuilder.build());
// Cache next block header if we read it for the next time through here. // Cache next block header if we read it for the next time through here.
if (nextBlockOnDiskSize != -1) { if (nextBlockOnDiskSize != -1) {
cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,

View File

@ -17,8 +17,10 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.DataInput; import java.io.DataInput;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -44,7 +46,6 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.IdLock;
@ -1252,11 +1253,12 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
BlockCacheKey cacheKey = BlockCacheKey cacheKey =
new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType); new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);
Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString());
boolean useLock = false; boolean useLock = false;
IdLock.Entry lockEntry = null; IdLock.Entry lockEntry = null;
Span span = TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan(); final Span span = Span.current();
try (Scope traceScope = span.makeCurrent()) { try {
while (true) { while (true) {
// Check cache for block. If found return. // Check cache for block. If found return.
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) { if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
@ -1269,9 +1271,9 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) { if (cachedBlock != null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("From Cache " + cachedBlock); LOG.trace("From Cache {}", cachedBlock);
} }
span.addEvent("blockCacheHit"); span.addEvent("block cache hit", attributes);
assert cachedBlock.isUnpacked() : "Packed block leak."; assert cachedBlock.isUnpacked() : "Packed block leak.";
if (cachedBlock.getBlockType().isData()) { if (cachedBlock.getBlockType().isData()) {
if (updateCacheMetrics) { if (updateCacheMetrics) {
@ -1301,7 +1303,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Carry on, please load. // Carry on, please load.
} }
span.addEvent("blockCacheMiss"); span.addEvent("block cache miss", attributes);
// Load block from filesystem. // Load block from filesystem.
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType)); !isCompaction, shouldUseHeap(expectedBlockType));
@ -1331,7 +1333,6 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
if (lockEntry != null) { if (lockEntry != null) {
offsetLock.releaseLockEntry(lockEntry); offsetLock.releaseLockEntry(lockEntry);
} }
span.end();
} }
} }

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -88,23 +89,22 @@ public final class PrefetchExecutor {
delay = 0; delay = 0;
} }
try { try {
if (LOG.isDebugEnabled()) { LOG.debug("Prefetch requested for {}, delay={} ms", path, delay);
LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms"); final Runnable tracedRunnable =
} TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request");
prefetchFutures.put(path, final Future<?> future =
prefetchExecutorPool.schedule(runnable, delay, TimeUnit.MILLISECONDS)); prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
prefetchFutures.put(path, future);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
prefetchFutures.remove(path); prefetchFutures.remove(path);
LOG.warn("Prefetch request rejected for " + path); LOG.warn("Prefetch request rejected for {}", path);
} }
} }
} }
public static void complete(Path path) { public static void complete(Path path) {
prefetchFutures.remove(path); prefetchFutures.remove(path);
if (LOG.isDebugEnabled()) { LOG.debug("Prefetch completed for {}", path);
LOG.debug("Prefetch completed for " + path);
}
} }
public static void cancel(Path path) { public static void cancel(Path path) {
@ -113,9 +113,7 @@ public final class PrefetchExecutor {
// ok to race with other cancellation attempts // ok to race with other cancellation attempts
future.cancel(true); future.cancel(true);
prefetchFutures.remove(path); prefetchFutures.remove(path);
if (LOG.isDebugEnabled()) { LOG.debug("Prefetch cancelled for {}", path);
LOG.debug("Prefetch cancelled for " + path);
}
} }
} }

View File

@ -17,6 +17,15 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
import static org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -29,11 +38,17 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -42,6 +57,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers;
import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@ -52,6 +70,7 @@ import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -74,6 +93,9 @@ public class TestBlockIOUtils {
@Rule @Rule
public ExpectedException exception = ExpectedException.none(); public ExpectedException exception = ExpectedException.none();
@Rule
public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final int NUM_TEST_BLOCKS = 2; private static final int NUM_TEST_BLOCKS = 2;
@ -93,6 +115,7 @@ public class TestBlockIOUtils {
@Test @Test
public void testReadFully() throws IOException { public void testReadFully() throws IOException {
TraceUtil.trace(() -> {
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
String s = "hello world"; String s = "hello world";
@ -107,6 +130,14 @@ public class TestBlockIOUtils {
byte[] heapBuf = new byte[s.length()]; byte[] heapBuf = new byte[s.length()];
buf.get(heapBuf, 0, heapBuf.length); buf.get(heapBuf, 0, heapBuf.length);
assertArrayEquals(Bytes.toBytes(s), heapBuf); assertArrayEquals(Bytes.toBytes(s), heapBuf);
}, testName.getMethodName());
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11))))))));
} }
@Test @Test
@ -214,6 +245,9 @@ public class TestBlockIOUtils {
try (FSDataOutputStream out = fs.create(p)) { try (FSDataOutputStream out = fs.create(p)) {
out.writeBytes(s); out.writeBytes(s);
} }
Span span = TraceUtil.createSpan(testName.getMethodName());
try (Scope ignored = span.makeCurrent()) {
ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
try (FSDataInputStream in = fs.open(p)) { try (FSDataInputStream in = fs.open(p)) {
assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
@ -222,25 +256,58 @@ public class TestBlockIOUtils {
byte[] heapBuf = new byte[buf.capacity()]; byte[] heapBuf = new byte[buf.capacity()];
buf.get(heapBuf, 0, heapBuf.length); buf.get(heapBuf, 0, heapBuf.length);
assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
} finally {
span.end();
}
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 8L))))))));
buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); otelRule.clearSpans();
span = TraceUtil.createSpan(testName.getMethodName());
try (Scope ignored = span.makeCurrent()) {
ByteBuff buf =
new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
try (FSDataInputStream in = fs.open(p)) { try (FSDataInputStream in = fs.open(p)) {
assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
} }
buf.rewind(); buf.rewind();
heapBuf = new byte[11]; byte[] heapBuf = new byte[11];
buf.get(heapBuf, 0, heapBuf.length); buf.get(heapBuf, 0, heapBuf.length);
assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
} finally {
span.end();
}
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L))))))));
otelRule.clearSpans();
span = TraceUtil.createSpan(testName.getMethodName());
try (Scope ignored = span.makeCurrent()) {
ByteBuff buf =
new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
buf.position(0).limit(12); buf.position(0).limit(12);
exception.expect(IOException.class);
try (FSDataInputStream in = fs.open(p)) { try (FSDataInputStream in = fs.open(p)) {
try {
BlockIOUtils.readWithExtra(buf, in, 12, 0); BlockIOUtils.readWithExtra(buf, in, 12, 0);
fail("Should only read 11 bytes"); fail("Should only read 11 bytes");
} catch (IOException e) {
} }
} finally {
span.end();
} }
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L))))))));
} }
@Test @Test
@ -255,11 +322,20 @@ public class TestBlockIOUtils {
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
when(in.hasCapability(anyString())).thenReturn(false); when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); boolean ret =
TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
testName.getMethodName());
assertFalse("Expect false return when no extra bytes requested", ret); assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString()); verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
} }
@Test @Test
@ -275,12 +351,21 @@ public class TestBlockIOUtils {
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 5)).thenReturn(5); when(in.read(5, buf, 5, 5)).thenReturn(5);
when(in.hasCapability(anyString())).thenReturn(false); when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); boolean ret =
TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
testName.getMethodName());
assertFalse("Expect false return when no extra bytes requested", ret); assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 5); verify(in).read(5, buf, 5, 5);
verify(in).hasCapability(anyString()); verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
} }
@Test @Test
@ -295,11 +380,20 @@ public class TestBlockIOUtils {
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
when(in.hasCapability(anyString())).thenReturn(false); when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); boolean ret =
TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
testName.getMethodName());
assertTrue("Expect true return when reading extra bytes succeeds", ret); assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString()); verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
} }
@Test @Test
@ -314,11 +408,20 @@ public class TestBlockIOUtils {
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
when(in.hasCapability(anyString())).thenReturn(false); when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); boolean ret =
TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
testName.getMethodName());
assertFalse("Expect false return when reading extra bytes fails", ret); assertFalse("Expect false return when reading extra bytes fails", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString()); verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", necessaryLen))))))));
} }
@Test @Test
@ -334,12 +437,21 @@ public class TestBlockIOUtils {
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 10)).thenReturn(10); when(in.read(5, buf, 5, 10)).thenReturn(10);
when(in.hasCapability(anyString())).thenReturn(false); when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); boolean ret =
TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
testName.getMethodName());
assertTrue("Expect true return when reading extra bytes succeeds", ret); assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 10); verify(in).read(5, buf, 5, 10);
verify(in).hasCapability(anyString()); verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
} }
@Test @Test
@ -357,7 +469,23 @@ public class TestBlockIOUtils {
when(in.hasCapability(anyString())).thenReturn(false); when(in.hasCapability(anyString())).thenReturn(false);
exception.expect(IOException.class); exception.expect(IOException.class);
exception.expectMessage("EOF"); exception.expectMessage("EOF");
Span span = TraceUtil.createSpan(testName.getMethodName());
try (Scope ignored = span.makeCurrent()) {
BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
span.setStatus(StatusCode.OK);
} catch (IOException e) {
TraceUtil.setError(span, e);
throw e;
} finally {
span.end();
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
assertThat(otelRule.getSpans(),
hasItems(allOf(hasName(testName.getMethodName()),
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
hasAttributes(AttributesMatchers.isEmpty())))))));
}
} }
/** /**

View File

@ -17,12 +17,23 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -30,21 +41,28 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ IOTests.class, MediumTests.class }) @Category({ IOTests.class, MediumTests.class })
public class TestPrefetch { public class TestPrefetch {
private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class);
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
@ -61,6 +79,9 @@ public class TestPrefetch {
private FileSystem fs; private FileSystem fs;
private BlockCache blockCache; private BlockCache blockCache;
@Rule
public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
@ -82,8 +103,23 @@ public class TestPrefetch {
@Test @Test
public void testPrefetch() throws Exception { public void testPrefetch() throws Exception {
TraceUtil.trace(() -> {
Path storeFile = writeStoreFile("TestPrefetch"); Path storeFile = writeStoreFile("TestPrefetch");
readStoreFile(storeFile); readStoreFile(storeFile);
}, "testPrefetch");
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans,
hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request"))));
final List<SpanData> spans = otelRule.getSpans();
if (LOG.isDebugEnabled()) {
StringTraceRenderer renderer = new StringTraceRenderer(spans);
renderer.render(LOG::debug);
}
final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst()
.orElseThrow(AssertionError::new);
assertThat("prefetch spans happen on their own threads, detached from file open.", spans,
hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan)))));
} }
@Test @Test