HBASE-26472 Adhere to semantic conventions regarding table data operations (addendum)

Ensure table data operations emit one and only one span.
This commit is contained in:
Nick Dimiduk 2022-01-26 17:04:44 -08:00 committed by Nick Dimiduk
parent 5e01534494
commit 94e126fecb
3 changed files with 57 additions and 67 deletions

View File

@ -433,33 +433,27 @@ public class HTable implements Table {
@Override @Override
public void batch(final List<? extends Row> actions, final Object[] results) public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException { throws InterruptedException, IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) int rpcTimeout = writeRpcTimeoutMs;
.setTableName(tableName) boolean hasRead = false;
.setOperation(HBaseSemanticAttributes.Operation.BATCH) boolean hasWrite = false;
.setContainerOperations(actions); for (Row action : actions) {
TraceUtil.traceWithIOException(() -> { if (action instanceof Mutation) {
int rpcTimeout = writeRpcTimeoutMs; hasWrite = true;
boolean hasRead = false; } else {
boolean hasWrite = false; hasRead = true;
for (Row action : actions) {
if (action instanceof Mutation) {
hasWrite = true;
} else {
hasRead = true;
}
if (hasRead && hasWrite) {
break;
}
} }
if (hasRead && !hasWrite) { if (hasRead && hasWrite) {
rpcTimeout = readRpcTimeoutMs; break;
} }
try { }
batch(actions, results, rpcTimeout); if (hasRead && !hasWrite) {
} catch (InterruptedException e) { rpcTimeout = readRpcTimeoutMs;
throw (InterruptedIOException) new InterruptedIOException().initCause(e); }
} try {
}, supplier); batch(actions, results, rpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
} }
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
@ -555,29 +549,23 @@ public class HTable implements Table {
@Override @Override
public void delete(final List<Delete> deletes) throws IOException { public void delete(final List<Delete> deletes) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) Object[] results = new Object[deletes.size()];
.setTableName(tableName) try {
.setOperation(HBaseSemanticAttributes.Operation.BATCH) batch(deletes, results, writeRpcTimeoutMs);
.setContainerOperations(deletes); } catch (InterruptedException e) {
TraceUtil.traceWithIOException(() -> { throw (InterruptedIOException) new InterruptedIOException().initCause(e);
Object[] results = new Object[deletes.size()]; } finally {
try { // TODO: to be consistent with batch put(), do not modify input list
batch(deletes, results, writeRpcTimeoutMs); // mutate list so that it is empty for complete success, or contains only failed records
} catch (InterruptedException e) { // results are returned in the same order as the requests in list walk the list backwards,
throw (InterruptedIOException) new InterruptedIOException().initCause(e); // so we can remove from list without impacting the indexes of earlier members
} finally { for (int i = results.length - 1; i >= 0; i--) {
// TODO: to be consistent with batch put(), do not modify input list // if result is not null, it succeeded
// mutate list so that it is empty for complete success, or contains only failed records if (results[i] instanceof Result) {
// results are returned in the same order as the requests in list walk the list backwards, deletes.remove(i);
// so we can remove from list without impacting the indexes of earlier members
for (int i = results.length - 1; i >= 0; i--) {
// if result is not null, it succeeded
if (results[i] instanceof Result) {
deletes.remove(i);
}
} }
} }
}, supplier); }
} }
@Override @Override
@ -605,21 +593,15 @@ public class HTable implements Table {
@Override @Override
public void put(final List<Put> puts) throws IOException { public void put(final List<Put> puts) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) for (Put put : puts) {
.setTableName(tableName) validatePut(put);
.setOperation(HBaseSemanticAttributes.Operation.BATCH) }
.setContainerOperations(puts); Object[] results = new Object[puts.size()];
TraceUtil.traceWithIOException(() -> { try {
for (Put put : puts) { batch(puts, results, writeRpcTimeoutMs);
validatePut(put); } catch (InterruptedException e) {
} throw (InterruptedIOException) new InterruptedIOException().initCause(e);
Object[] results = new Object[puts.size()]; }
try {
batch(puts, results, writeRpcTimeoutMs);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}, supplier);
} }
@Override @Override

View File

@ -29,6 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
@ -41,9 +42,11 @@ import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderFactory;
@ -253,11 +256,12 @@ public class TestAsyncTableTracing {
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>( Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
"waiting for span to emit", "waiting for span to emit",
() -> traceRule.getSpans(), hasItem(spanLocator))); () -> traceRule.getSpans(), hasItem(spanLocator)));
SpanData data = traceRule.getSpans() List<SpanData> candidateSpans = traceRule.getSpans()
.stream() .stream()
.filter(spanLocator::matches) .filter(spanLocator::matches)
.findFirst() .collect(Collectors.toList());
.orElseThrow(AssertionError::new); assertThat(candidateSpans, hasSize(1));
SpanData data = candidateSpans.iterator().next();
assertThat(data, allOf( assertThat(data, allOf(
hasName(expectedName), hasName(expectedName),
hasKind(SpanKind.CLIENT), hasKind(SpanKind.CLIENT),

View File

@ -29,6 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
@ -42,8 +43,10 @@ import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellBuilderType;
@ -251,11 +254,12 @@ public class TestHTableTracing extends TestTracingBase {
Waiter.waitFor(conf, 1000, new MatcherPredicate<>( Waiter.waitFor(conf, 1000, new MatcherPredicate<>(
"waiting for span to emit", "waiting for span to emit",
() -> TRACE_RULE.getSpans(), hasItem(spanLocator))); () -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
SpanData data = TRACE_RULE.getSpans() List<SpanData> candidateSpans = TRACE_RULE.getSpans()
.stream() .stream()
.filter(spanLocator::matches) .filter(spanLocator::matches)
.findFirst() .collect(Collectors.toList());
.orElseThrow(AssertionError::new); assertThat(candidateSpans, hasSize(1));
SpanData data = candidateSpans.iterator().next();
assertThat(data, allOf( assertThat(data, allOf(
hasName(expectedName), hasName(expectedName),
hasKind(SpanKind.CLIENT), hasKind(SpanKind.CLIENT),