diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java index aec96897179..f71defb329e 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.trace.hamcrest; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -54,6 +55,10 @@ public final class AttributesMatchers { return containsEntry(AttributeKey.stringKey(key), value); } + public static Matcher containsEntry(String key, long value) { + return containsEntry(AttributeKey.longKey(key), value); + } + public static Matcher containsEntryWithStringValuesOf(String key, String... values) { return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values)); } @@ -63,6 +68,10 @@ public final class AttributesMatchers { return new IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher); } + public static Matcher isEmpty() { + return hasProperty("empty", is(true)); + } + private static final class IsAttributesContaining extends TypeSafeMatcher { private final Matcher> keyMatcher; private final Matcher valueMatcher; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java new file mode 100644 index 00000000000..663a745cc45 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CHECKSUM_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.COMPRESSION_ALGORITHM_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DATA_BLOCK_ENCODING_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ENCRYPTION_CIPHER_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HFILE_NAME_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.READ_TYPE_KEY; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import java.util.Objects; +import java.util.function.Consumer; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; +import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.yetus.audience.InterfaceAudience; + +/** + *

+ * Populate fields on an {@link AttributesBuilder} based on an {@link HFileContext}. Passed around + * inside an active {@link Context}, indexed under {@link #CONTEXT_KEY}. The class is designed such + * that calls to the {@link #accept(AttributesBuilder)} method are idempotent with regards to the + * instance of this class. + *

+ *

+ * The true and truly ridiculous class name should be something more like + * {@code HFileContext_ContextAttributes_AttributesBuilder_Consumer}. + *

+ */ +@InterfaceAudience.Private +public class HFileContextAttributesBuilderConsumer implements Consumer { + + /** + * Used to place extract attributes pertaining to the {@link HFileContext} that scopes the active + * {@link Context}. + */ + public static final ContextKey> CONTEXT_KEY = + ContextKey.named("db.hbase.io.hfile.context_attributes"); + + private final HFileContext hFileContext; + + private boolean skipChecksum = false; + private ReadType readType = null; + + public HFileContextAttributesBuilderConsumer(final HFileContext hFileContext) { + this.hFileContext = Objects.requireNonNull(hFileContext); + } + + /** + * Specify that the {@link ChecksumType} should not be included in the attributes. + */ + public HFileContextAttributesBuilderConsumer setSkipChecksum(final boolean skipChecksum) { + this.skipChecksum = skipChecksum; + return this; + } + + /** + * Specify the {@link ReadType} involced in this IO operation. + */ + public HFileContextAttributesBuilderConsumer setReadType(final ReadType readType) { + // TODO: this is not a part of the HFileBlock, its context of the operation. Should track this + // detail elsewhere. + this.readType = readType; + return this; + } + + @Override + public void accept(AttributesBuilder builder) { + if (hFileContext.getHFileName() != null) { + builder.put(HFILE_NAME_KEY, hFileContext.getHFileName()); + } + if (hFileContext.getCompression() != null) { + builder.put(COMPRESSION_ALGORITHM_KEY, hFileContext.getCompression().getName()); + } + if (hFileContext.getDataBlockEncoding() != null) { + builder.put(DATA_BLOCK_ENCODING_KEY, hFileContext.getDataBlockEncoding().name()); + } + if ( + hFileContext.getEncryptionContext() != null + && hFileContext.getEncryptionContext().getCipher() != null + ) { + builder.put(ENCRYPTION_CIPHER_KEY, hFileContext.getEncryptionContext().getCipher().getName()); + } + if (!skipChecksum && hFileContext.getChecksumType() != null) { + builder.put(CHECKSUM_KEY, hFileContext.getChecksumType().getName()); + } + if (readType != null) { + builder.put(READ_TYPE_KEY, readType.name()); + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java index 1720cae2300..86c6317556b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java @@ -17,13 +17,22 @@ */ package org.apache.hadoop.hbase.io.util; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.util.Optional; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.io.IOUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -76,33 +85,48 @@ public final class BlockIOUtils { * @throws IOException exception to throw if any error happen */ public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); if (!isByteBufferReadable(dis)) { // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to // the destination ByteBuff. byte[] heapBuf = new byte[length]; IOUtils.readFully(dis, heapBuf, 0, length); + annotateHeapBytesRead(attributesBuilder, length); + span.addEvent("BlockIOUtils.readFully", attributesBuilder.build()); copyToByteBuff(heapBuf, 0, length, buf); return; } + int directBytesRead = 0, heapBytesRead = 0; ByteBuffer[] buffers = buf.nioByteBuffers(); int remain = length; int idx = 0; ByteBuffer cur = buffers[idx]; - while (remain > 0) { - while (!cur.hasRemaining()) { - if (++idx >= buffers.length) { - throw new IOException( - "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); + try { + while (remain > 0) { + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException( + "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + int bytesRead = dis.read(cur); + if (bytesRead < 0) { + throw new IOException( + "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); + } + remain -= bytesRead; + if (cur.isDirect()) { + directBytesRead += bytesRead; + } else { + heapBytesRead += bytesRead; } - cur = buffers[idx]; } - cur.limit(cur.position() + Math.min(remain, cur.remaining())); - int bytesRead = dis.read(cur); - if (bytesRead < 0) { - throw new IOException( - "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); - } - remain -= bytesRead; + } finally { + annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); + span.addEvent("BlockIOUtils.readFully", attributesBuilder.build()); } } @@ -116,19 +140,28 @@ public final class BlockIOUtils { */ public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) throws IOException { - byte[] buffer = new byte[1024]; if (length < 0) { throw new IllegalArgumentException("Length must not be negative: " + length); } + int heapBytesRead = 0; int remain = length, count; - while (remain > 0) { - count = in.read(buffer, 0, Math.min(remain, buffer.length)); - if (count < 0) { - throw new IOException( - "Premature EOF from inputStream, but still need " + remain + " bytes"); + byte[] buffer = new byte[1024]; + try { + while (remain > 0) { + count = in.read(buffer, 0, Math.min(remain, buffer.length)); + if (count < 0) { + throw new IOException( + "Premature EOF from inputStream, but still need " + remain + " bytes"); + } + out.put(buffer, 0, count); + remain -= count; + heapBytesRead += count; } - out.put(buffer, 0, count); - remain -= count; + } finally { + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); + annotateHeapBytesRead(attributesBuilder, heapBytesRead); + span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", attributesBuilder.build()); } } @@ -147,20 +180,29 @@ public final class BlockIOUtils { */ private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, int necessaryLen, int extraLen) throws IOException { + int heapBytesRead = 0; int bytesRemaining = necessaryLen + extraLen; - while (bytesRemaining > 0) { - int ret = in.read(buf, bufOffset, bytesRemaining); - if (ret < 0) { - if (bytesRemaining <= extraLen) { - // We could not read the "extra data", but that is OK. - break; + try { + while (bytesRemaining > 0) { + int ret = in.read(buf, bufOffset, bytesRemaining); + if (ret < 0) { + if (bytesRemaining <= extraLen) { + // We could not read the "extra data", but that is OK. + break; + } + throw new IOException("Premature EOF from inputStream (read " + "returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); } - throw new IOException("Premature EOF from inputStream (read " + "returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); + bufOffset += ret; + bytesRemaining -= ret; + heapBytesRead += ret; } - bufOffset += ret; - bytesRemaining -= ret; + } finally { + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); + annotateHeapBytesRead(attributesBuilder, heapBytesRead); + span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build()); } return bytesRemaining <= 0; } @@ -186,27 +228,41 @@ public final class BlockIOUtils { copyToByteBuff(heapBuf, 0, heapBuf.length, buf); return ret; } + int directBytesRead = 0, heapBytesRead = 0; ByteBuffer[] buffers = buf.nioByteBuffers(); int bytesRead = 0; int remain = necessaryLen + extraLen; int idx = 0; ByteBuffer cur = buffers[idx]; - while (bytesRead < necessaryLen) { - while (!cur.hasRemaining()) { - if (++idx >= buffers.length) { - throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); + try { + while (bytesRead < necessaryLen) { + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException( + "Not enough ByteBuffers to read the reminding " + remain + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + int ret = dis.read(cur); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + if (cur.isDirect()) { + directBytesRead += ret; + } else { + heapBytesRead += ret; } - cur = buffers[idx]; } - cur.limit(cur.position() + Math.min(remain, cur.remaining())); - int ret = dis.read(cur); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (read returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, successfully read " + bytesRead); - } - bytesRead += ret; - remain -= ret; + } finally { + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); + annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); + span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build()); } return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); } @@ -264,15 +320,22 @@ public final class BlockIOUtils { byte[] buf = new byte[remain]; int bytesRead = 0; int lengthMustRead = readAllBytes ? remain : necessaryLen; - while (bytesRead < lengthMustRead) { - int ret = dis.read(position + bytesRead, buf, bytesRead, remain); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (positional read returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, successfully read " + bytesRead); + try { + while (bytesRead < lengthMustRead) { + int ret = dis.read(position + bytesRead, buf, bytesRead, remain); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (positional read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; } - bytesRead += ret; - remain -= ret; + } finally { + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); + annotateHeapBytesRead(attributesBuilder, bytesRead); + span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build()); } copyToByteBuff(buf, 0, bytesRead, buff); return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); @@ -280,39 +343,53 @@ public final class BlockIOUtils { private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { + int directBytesRead = 0, heapBytesRead = 0; int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; ByteBuffer[] buffers = buff.nioByteBuffers(); ByteBuffer cur = buffers[idx]; int lengthMustRead = readAllBytes ? remain : necessaryLen; - while (bytesRead < lengthMustRead) { - int ret; - while (!cur.hasRemaining()) { - if (++idx >= buffers.length) { - throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); + try { + while (bytesRead < lengthMustRead) { + int ret; + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException( + "Not enough ByteBuffers to read the reminding " + remain + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + try { + ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); + } catch (IllegalAccessException e) { + throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " + + bytesRead + " bytes from position " + position, e); + } catch (InvocationTargetException e) { + throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" + + " when trying to read " + bytesRead + " bytes from position " + position, e); + } catch (NullPointerException e) { + throw new IOException("something is null"); + } catch (Exception e) { + throw e; + } + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (positional read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + if (cur.isDirect()) { + directBytesRead += bytesRead; + } else { + heapBytesRead += bytesRead; } - cur = buffers[idx]; } - cur.limit(cur.position() + Math.min(remain, cur.remaining())); - try { - ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); - } catch (IllegalAccessException e) { - throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " - + bytesRead + " bytes from position " + position, e); - } catch (InvocationTargetException e) { - throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" - + " when trying to read " + bytesRead + " bytes from position " + position, e); - } catch (NullPointerException e) { - throw new IOException("something is null"); - } catch (Exception e) { - throw e; - } - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (positional read returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, successfully read " + bytesRead); - } - bytesRead += ret; - remain -= ret; + } finally { + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); + annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); + span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build()); } return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); @@ -340,4 +417,38 @@ public final class BlockIOUtils { } return len; } + + /** + * Construct a fresh {@link AttributesBuilder} from the provided {@link Context}, populated with + * relevant attributes populated by {@link HFileContextAttributesBuilderConsumer#CONTEXT_KEY}. + */ + private static AttributesBuilder builderFromContext(Context context) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + Optional.ofNullable(context) + .map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY)) + .ifPresent(c -> c.accept(attributesBuilder)); + return attributesBuilder; + } + + /** + * Conditionally annotate {@code span} with the appropriate attribute when value is non-zero. + */ + private static void annotateHeapBytesRead(AttributesBuilder attributesBuilder, + int heapBytesRead) { + annotateBytesRead(attributesBuilder, 0, heapBytesRead); + } + + /** + * Conditionally annotate {@code attributesBuilder} with appropriate attributes when values are + * non-zero. + */ + private static void annotateBytesRead(AttributesBuilder attributesBuilder, long directBytesRead, + long heapBytesRead) { + if (directBytesRead > 0) { + attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead); + } + if (heapBytesRead > 0) { + attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead); + } + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 40dfc1dce4f..a629761b87b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.trace; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.ByteBuffer; import java.util.List; import org.apache.yetus.audience.InterfaceAudience; @@ -56,6 +57,66 @@ public final class HBaseSemanticAttributes { AttributeKey.booleanKey("db.hbase.rowlock.readlock"); public static final AttributeKey WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl"); + /** + * Indicates the amount of data was read into a {@link ByteBuffer} of type + * {@link ByteBuffer#isDirect() direct}. + */ + public static final AttributeKey DIRECT_BYTES_READ_KEY = + AttributeKey.longKey("db.hbase.io.direct_bytes_read"); + /** + * Indicates the amount of data was read into a {@link ByteBuffer} not of type + * {@link ByteBuffer#isDirect() direct}. + */ + public static final AttributeKey HEAP_BYTES_READ_KEY = + AttributeKey.longKey("db.hbase.io.heap_bytes_read"); + /** + * Indicates the {@link org.apache.hadoop.hbase.io.compress.Compression.Algorithm} used to encode + * an HFile. + */ + public static final AttributeKey COMPRESSION_ALGORITHM_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding"); + /** + * Indicates the {@link org.apache.hadoop.hbase.io.encoding.DataBlockEncoding} algorithm used to + * encode this HFile. + */ + public static final AttributeKey DATA_BLOCK_ENCODING_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding"); + /** + * Indicates the {@link org.apache.hadoop.hbase.io.crypto.Cipher} used to encrypt this HFile. + */ + public static final AttributeKey ENCRYPTION_CIPHER_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.encryption_cipher"); + /** + * Indicates the {@link org.apache.hadoop.hbase.util.ChecksumType} used to encode this HFile. + */ + public static final AttributeKey CHECKSUM_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.checksum_type"); + /** + * Indicates the name of the HFile accessed. + */ + public static final AttributeKey HFILE_NAME_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.file_name"); + /** + * Indicated the type of read. + */ + public static final AttributeKey READ_TYPE_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.read_type"); + /** + * Identifies an entry in the Block Cache. + */ + public static final AttributeKey BLOCK_CACHE_KEY_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.block_cache_key"); + + /** + * These values represent the different IO read strategies HBase may employ for accessing + * filesystem data. + */ + public enum ReadType { + // TODO: promote this to the FSReader#readBlockData API. Or somehow instead use Scan.ReadType. + POSITIONAL_READ, + SEEK_PLUS_READ, + } + /** * These are values used with {@link #DB_OPERATION}. They correspond with the implementations of * {@code org.apache.hadoop.hbase.client.Operation}, as well as diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 7bc2d689f87..b44a10cbfbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -18,7 +18,13 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; +import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -26,6 +32,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -46,11 +53,13 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.regionserver.ShipperListener; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -620,7 +629,9 @@ public class HFileBlock implements Cacheable { HFileBlock unpacked = shallowClone(this, newBuf); boolean succ = false; - try { + final Context context = + Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext)); + try (Scope ignored = context.makeCurrent()) { HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); @@ -1479,59 +1490,65 @@ public class HFileBlock implements Cacheable { boolean updateMetrics, boolean intoHeap) throws IOException { // Get a copy of the current state of whether to validate // hbase checksums or not for this read call. This is not - // thread-safe but the one constaint is that if we decide + // thread-safe but the one constraint is that if we decide // to skip hbase checksum verification then we are // guaranteed to use hdfs checksum verification. boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); - - HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, - doVerificationThruHBaseChecksum, updateMetrics, intoHeap); - if (blk == null) { - HFile.LOG.warn("HBase checksum verification failed for file " + pathName + " at offset " - + offset + " filesize " + fileSize + ". Retrying read with HDFS checksums turned on..."); - - if (!doVerificationThruHBaseChecksum) { - String msg = "HBase checksum verification failed for file " + pathName + " at offset " - + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " - + doVerificationThruHBaseChecksum; - HFile.LOG.warn(msg); - throw new IOException(msg); // cannot happen case here - } - HFile.CHECKSUM_FAILURES.increment(); // update metrics - - // If we have a checksum failure, we fall back into a mode where - // the next few reads use HDFS level checksums. We aim to make the - // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid - // hbase checksum verification, but since this value is set without - // holding any locks, it can so happen that we might actually do - // a few more than precisely this number. - is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); - doVerificationThruHBaseChecksum = false; - blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, + final Context context = Context.current().with(CONTEXT_KEY, + new HFileContextAttributesBuilderConsumer(fileContext) + .setSkipChecksum(doVerificationThruHBaseChecksum) + .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ)); + try (Scope ignored = context.makeCurrent()) { + HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, doVerificationThruHBaseChecksum, updateMetrics, intoHeap); - if (blk != null) { - HFile.LOG.warn("HDFS checksum verification succeeded for file " + pathName + " at offset " - + offset + " filesize " + fileSize); - } - } - if (blk == null && !doVerificationThruHBaseChecksum) { - String msg = - "readBlockData failed, possibly due to " + "checksum verification failed for file " - + pathName + " at offset " + offset + " filesize " + fileSize; - HFile.LOG.warn(msg); - throw new IOException(msg); - } + if (blk == null) { + HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}." + + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize); - // If there is a checksum mismatch earlier, then retry with - // HBase checksums switched off and use HDFS checksum verification. - // This triggers HDFS to detect and fix corrupt replicas. The - // next checksumOffCount read requests will use HDFS checksums. - // The decrementing of this.checksumOffCount is not thread-safe, - // but it is harmless because eventually checksumOffCount will be - // a negative number. - streamWrapper.checksumOk(); - return blk; + if (!doVerificationThruHBaseChecksum) { + String msg = "HBase checksum verification failed for file " + pathName + " at offset " + + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " + + doVerificationThruHBaseChecksum; + HFile.LOG.warn(msg); + throw new IOException(msg); // cannot happen case here + } + HFile.CHECKSUM_FAILURES.increment(); // update metrics + + // If we have a checksum failure, we fall back into a mode where + // the next few reads use HDFS level checksums. We aim to make the + // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid + // hbase checksum verification, but since this value is set without + // holding any locks, it can so happen that we might actually do + // a few more than precisely this number. + is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); + doVerificationThruHBaseChecksum = false; + blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, + doVerificationThruHBaseChecksum, updateMetrics, intoHeap); + if (blk != null) { + HFile.LOG.warn( + "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}", + pathName, offset, fileSize); + } + } + if (blk == null && !doVerificationThruHBaseChecksum) { + String msg = + "readBlockData failed, possibly due to " + "checksum verification failed for file " + + pathName + " at offset " + offset + " filesize " + fileSize; + HFile.LOG.warn(msg); + throw new IOException(msg); + } + + // If there is a checksum mismatch earlier, then retry with + // HBase checksums switched off and use HDFS checksum verification. + // This triggers HDFS to detect and fix corrupt replicas. The + // next checksumOffCount read requests will use HDFS checksums. + // The decrementing of this.checksumOffCount is not thread-safe, + // but it is harmless because eventually checksumOffCount will be + // a negative number. + streamWrapper.checksumOk(); + return blk; + } } /** @@ -1629,6 +1646,11 @@ public class HFileBlock implements Cacheable { throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); } + + final Span span = Span.current(); + final AttributesBuilder attributesBuilder = Attributes.builder(); + Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY)) + .ifPresent(c -> c.accept(attributesBuilder)); int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 // and will save us having to seek the stream backwards to reread the header we @@ -1653,8 +1675,9 @@ public class HFileBlock implements Cacheable { // in a LOG every time we seek. See HBASE-17072 for more detail. if (headerBuf == null) { if (LOG.isTraceEnabled()) { - LOG.trace("Extra see to get block size!", new RuntimeException()); + LOG.trace("Extra seek to get block size!", new RuntimeException()); } + span.addEvent("Extra seek to get block size!", attributesBuilder.build()); headerBuf = HEAP.allocate(hdrSize); readAtOffset(is, headerBuf, hdrSize, false, offset, pread); headerBuf.rewind(); @@ -1707,6 +1730,7 @@ public class HFileBlock implements Cacheable { hFileBlock.sanityCheckUncompressed(); } LOG.trace("Read {} in {} ms", hFileBlock, duration); + span.addEvent("Read block", attributesBuilder.build()); // Cache next block header if we read it for the next time through here. if (nextBlockOnDiskSize != -1) { cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index fc2ffaac876..852306bd251 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY; + +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; @@ -44,7 +46,6 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; @@ -1252,11 +1253,12 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType); + Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString()); boolean useLock = false; IdLock.Entry lockEntry = null; - Span span = TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan(); - try (Scope traceScope = span.makeCurrent()) { + final Span span = Span.current(); + try { while (true) { // Check cache for block. If found return. if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) { @@ -1269,9 +1271,9 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { if (LOG.isTraceEnabled()) { - LOG.trace("From Cache " + cachedBlock); + LOG.trace("From Cache {}", cachedBlock); } - span.addEvent("blockCacheHit"); + span.addEvent("block cache hit", attributes); assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { if (updateCacheMetrics) { @@ -1301,7 +1303,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { // Carry on, please load. } - span.addEvent("blockCacheMiss"); + span.addEvent("block cache miss", attributes); // Load block from filesystem. HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction, shouldUseHeap(expectedBlockType)); @@ -1331,7 +1333,6 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } - span.end(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 8561fc1c893..8bf63909fcb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -88,23 +89,22 @@ public final class PrefetchExecutor { delay = 0; } try { - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms"); - } - prefetchFutures.put(path, - prefetchExecutorPool.schedule(runnable, delay, TimeUnit.MILLISECONDS)); + LOG.debug("Prefetch requested for {}, delay={} ms", path, delay); + final Runnable tracedRunnable = + TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request"); + final Future future = + prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS); + prefetchFutures.put(path, future); } catch (RejectedExecutionException e) { prefetchFutures.remove(path); - LOG.warn("Prefetch request rejected for " + path); + LOG.warn("Prefetch request rejected for {}", path); } } } public static void complete(Path path) { prefetchFutures.remove(path); - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch completed for " + path); - } + LOG.debug("Prefetch completed for {}", path); } public static void cancel(Path path) { @@ -113,9 +113,7 @@ public final class PrefetchExecutor { // ok to race with other cancellation attempts future.cancel(true); prefetchFutures.remove(path); - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch cancelled for " + path); - } + LOG.debug("Prefetch cancelled for {}", path); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java index efc66111a9f..cf61d574e38 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -29,11 +38,17 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -42,6 +57,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MatcherPredicate; +import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers; +import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -52,6 +70,7 @@ import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.ClassRule; @@ -74,6 +93,9 @@ public class TestBlockIOUtils { @Rule public ExpectedException exception = ExpectedException.none(); + @Rule + public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final int NUM_TEST_BLOCKS = 2; @@ -93,20 +115,29 @@ public class TestBlockIOUtils { @Test public void testReadFully() throws IOException { - FileSystem fs = TEST_UTIL.getTestFileSystem(); - Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); - String s = "hello world"; - try (FSDataOutputStream out = fs.create(p)) { - out.writeBytes(s); - } - ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11)); - try (FSDataInputStream in = fs.open(p)) { - BlockIOUtils.readFully(buf, in, 11); - } - buf.rewind(); - byte[] heapBuf = new byte[s.length()]; - buf.get(heapBuf, 0, heapBuf.length); - assertArrayEquals(Bytes.toBytes(s), heapBuf); + TraceUtil.trace(() -> { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); + String s = "hello world"; + try (FSDataOutputStream out = fs.create(p)) { + out.writeBytes(s); + } + ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11)); + try (FSDataInputStream in = fs.open(p)) { + BlockIOUtils.readFully(buf, in, 11); + } + buf.rewind(); + byte[] heapBuf = new byte[s.length()]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes(s), heapBuf); + }, testName.getMethodName()); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11)))))))); } @Test @@ -214,33 +245,69 @@ public class TestBlockIOUtils { try (FSDataOutputStream out = fs.create(p)) { out.writeBytes(s); } - ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); - try (FSDataInputStream in = fs.open(p)) { - assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); - } - buf.rewind(); - byte[] heapBuf = new byte[buf.capacity()]; - buf.get(heapBuf, 0, heapBuf.length); - assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); - buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); - try (FSDataInputStream in = fs.open(p)) { - assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); + Span span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); + try (FSDataInputStream in = fs.open(p)) { + assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); + } + buf.rewind(); + byte[] heapBuf = new byte[buf.capacity()]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); + } finally { + span.end(); } - buf.rewind(); - heapBuf = new byte[11]; - buf.get(heapBuf, 0, heapBuf.length); - assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 8L)))))))); - buf.position(0).limit(12); - try (FSDataInputStream in = fs.open(p)) { - try { + otelRule.clearSpans(); + span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + ByteBuff buf = + new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); + try (FSDataInputStream in = fs.open(p)) { + assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); + } + buf.rewind(); + byte[] heapBuf = new byte[11]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); + } finally { + span.end(); + } + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); + + otelRule.clearSpans(); + span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + ByteBuff buf = + new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); + buf.position(0).limit(12); + exception.expect(IOException.class); + try (FSDataInputStream in = fs.open(p)) { BlockIOUtils.readWithExtra(buf, in, 12, 0); fail("Should only read 11 bytes"); - } catch (IOException e) { - } + } finally { + span.end(); } + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); } @Test @@ -255,11 +322,20 @@ public class TestBlockIOUtils { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -275,12 +351,21 @@ public class TestBlockIOUtils { when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 5)).thenReturn(5); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 5); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -295,11 +380,20 @@ public class TestBlockIOUtils { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -314,11 +408,20 @@ public class TestBlockIOUtils { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertFalse("Expect false return when reading extra bytes fails", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", necessaryLen)))))))); } @Test @@ -334,12 +437,21 @@ public class TestBlockIOUtils { when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 10)).thenReturn(10); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 10); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -357,7 +469,23 @@ public class TestBlockIOUtils { when(in.hasCapability(anyString())).thenReturn(false); exception.expect(IOException.class); exception.expectMessage("EOF"); - BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + Span span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + span.setStatus(StatusCode.OK); + } catch (IOException e) { + TraceUtil.setError(span, e); + throw e; + } finally { + span.end(); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(AttributesMatchers.isEmpty()))))))); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 0657799a5f9..1e4f675b238 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -17,12 +17,23 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; +import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,21 +41,28 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Category({ IOTests.class, MediumTests.class }) public class TestPrefetch { + private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -61,6 +79,9 @@ public class TestPrefetch { private FileSystem fs; private BlockCache blockCache; + @Rule + public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); + @Before public void setUp() throws IOException { conf = TEST_UTIL.getConfiguration(); @@ -82,8 +103,23 @@ public class TestPrefetch { @Test public void testPrefetch() throws Exception { - Path storeFile = writeStoreFile("TestPrefetch"); - readStoreFile(storeFile); + TraceUtil.trace(() -> { + Path storeFile = writeStoreFile("TestPrefetch"); + readStoreFile(storeFile); + }, "testPrefetch"); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans, + hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); + final List spans = otelRule.getSpans(); + if (LOG.isDebugEnabled()) { + StringTraceRenderer renderer = new StringTraceRenderer(spans); + renderer.render(LOG::debug); + } + + final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() + .orElseThrow(AssertionError::new); + assertThat("prefetch spans happen on their own threads, detached from file open.", spans, + hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); } @Test