diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 7b83fad4f31..f57ca091a35 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -433,33 +433,27 @@ public class HTable implements Table { @Override public void batch(final List actions, final Object[] results) throws InterruptedException, IOException { - final Supplier supplier = new TableOperationSpanBuilder(connection) - .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH) - .setContainerOperations(actions); - TraceUtil.traceWithIOException(() -> { - int rpcTimeout = writeRpcTimeoutMs; - boolean hasRead = false; - boolean hasWrite = false; - for (Row action : actions) { - if (action instanceof Mutation) { - hasWrite = true; - } else { - hasRead = true; - } - if (hasRead && hasWrite) { - break; - } + int rpcTimeout = writeRpcTimeoutMs; + boolean hasRead = false; + boolean hasWrite = false; + for (Row action : actions) { + if (action instanceof Mutation) { + hasWrite = true; + } else { + hasRead = true; } - if (hasRead && !hasWrite) { - rpcTimeout = readRpcTimeoutMs; + if (hasRead && hasWrite) { + break; } - try { - batch(actions, results, rpcTimeout); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - }, supplier); + } + if (hasRead && !hasWrite) { + rpcTimeout = readRpcTimeoutMs; + } + try { + batch(actions, results, rpcTimeout); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } } public void batch(final List actions, final Object[] results, int rpcTimeout) @@ -555,29 +549,23 @@ public class HTable implements Table { @Override public void delete(final List deletes) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder(connection) - .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH) - .setContainerOperations(deletes); - TraceUtil.traceWithIOException(() -> { - Object[] results = new Object[deletes.size()]; - try { - batch(deletes, results, writeRpcTimeoutMs); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } finally { - // TODO: to be consistent with batch put(), do not modify input list - // mutate list so that it is empty for complete success, or contains only failed records - // results are returned in the same order as the requests in list walk the list backwards, - // so we can remove from list without impacting the indexes of earlier members - for (int i = results.length - 1; i >= 0; i--) { - // if result is not null, it succeeded - if (results[i] instanceof Result) { - deletes.remove(i); - } + Object[] results = new Object[deletes.size()]; + try { + batch(deletes, results, writeRpcTimeoutMs); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } finally { + // TODO: to be consistent with batch put(), do not modify input list + // mutate list so that it is empty for complete success, or contains only failed records + // results are returned in the same order as the requests in list walk the list backwards, + // so we can remove from list without impacting the indexes of earlier members + for (int i = results.length - 1; i >= 0; i--) { + // if result is not null, it succeeded + if (results[i] instanceof Result) { + deletes.remove(i); } } - }, supplier); + } } @Override @@ -605,21 +593,15 @@ public class HTable implements Table { @Override public void put(final List puts) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder(connection) - .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH) - .setContainerOperations(puts); - TraceUtil.traceWithIOException(() -> { - for (Put put : puts) { - validatePut(put); - } - Object[] results = new Object[puts.size()]; - try { - batch(puts, results, writeRpcTimeoutMs); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - }, supplier); + for (Put put : puts) { + validatePut(put); + } + Object[] results = new Object[puts.size()]; + try { + batch(puts, results, writeRpcTimeoutMs); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index a2e4f5ca85d..04df972d7a1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -29,6 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -41,9 +42,11 @@ import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; @@ -253,11 +256,12 @@ public class TestAsyncTableTracing { Waiter.waitFor(CONF, 1000, new MatcherPredicate<>( "waiting for span to emit", () -> traceRule.getSpans(), hasItem(spanLocator))); - SpanData data = traceRule.getSpans() + List candidateSpans = traceRule.getSpans() .stream() .filter(spanLocator::matches) - .findFirst() - .orElseThrow(AssertionError::new); + .collect(Collectors.toList()); + assertThat(candidateSpans, hasSize(1)); + SpanData data = candidateSpans.iterator().next(); assertThat(data, allOf( hasName(expectedName), hasKind(SpanKind.CLIENT), diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java index 4b94ad9f36d..a4adfe5988a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java @@ -29,6 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; @@ -42,8 +43,10 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; @@ -251,11 +254,12 @@ public class TestHTableTracing extends TestTracingBase { Waiter.waitFor(conf, 1000, new MatcherPredicate<>( "waiting for span to emit", () -> TRACE_RULE.getSpans(), hasItem(spanLocator))); - SpanData data = TRACE_RULE.getSpans() + List candidateSpans = TRACE_RULE.getSpans() .stream() .filter(spanLocator::matches) - .findFirst() - .orElseThrow(AssertionError::new); + .collect(Collectors.toList()); + assertThat(candidateSpans, hasSize(1)); + SpanData data = candidateSpans.iterator().next(); assertThat(data, allOf( hasName(expectedName), hasKind(SpanKind.CLIENT),