HBASE-27360 The trace related assertions are flaky for async client tests (#4767)
Signed-off-by: GeorryHuang <huangzhuoyue@apache.org>
This commit is contained in:
parent
793f020e13
commit
f3f88ff6c1
|
@ -237,7 +237,9 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
|
|||
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
|
||||
span = scanner.getSpan();
|
||||
try (Scope ignored = span.makeCurrent()) {
|
||||
consumer.onScanMetricsCreated(scanner.getScanMetrics());
|
||||
if (scan.isScanMetricsEnabled()) {
|
||||
consumer.onScanMetricsCreated(scanner.getScanMetrics());
|
||||
}
|
||||
for (Result result; (result = scanner.next()) != null;) {
|
||||
if (!consumer.onNext(result)) {
|
||||
break;
|
||||
|
|
|
@ -37,12 +37,14 @@ import java.io.IOException;
|
|||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ConnectionRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
|
@ -70,18 +72,18 @@ import org.junit.rules.TestRule;
|
|||
|
||||
public abstract class AbstractTestAsyncTableScan {
|
||||
|
||||
protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
|
||||
protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
|
||||
protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
|
||||
protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder()
|
||||
.setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build();
|
||||
|
||||
protected static final ConnectionRule connectionRule =
|
||||
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
|
||||
protected static final ConnectionRule CONN_RULE =
|
||||
ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);
|
||||
|
||||
private static final class Setup extends ExternalResource {
|
||||
@Override
|
||||
protected void before() throws Throwable {
|
||||
final HBaseTestingUtil testingUtil = miniClusterRule.getTestingUtility();
|
||||
final AsyncConnection conn = connectionRule.getAsyncConnection();
|
||||
final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
|
||||
final AsyncConnection conn = CONN_RULE.getAsyncConnection();
|
||||
|
||||
byte[][] splitKeys = new byte[8][];
|
||||
for (int i = 111; i < 999; i += 111) {
|
||||
|
@ -99,11 +101,11 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
}
|
||||
|
||||
@ClassRule
|
||||
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
|
||||
.around(miniClusterRule).around(connectionRule).around(new Setup());
|
||||
public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
|
||||
.around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());
|
||||
|
||||
@Rule
|
||||
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
|
||||
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);
|
||||
|
||||
@Rule
|
||||
public final TestName testName = new TestName();
|
||||
|
@ -136,11 +138,11 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
}
|
||||
|
||||
private static AsyncTable<?> getRawTable() {
|
||||
return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
|
||||
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
private static AsyncTable<?> getTable() {
|
||||
return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
}
|
||||
|
||||
private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
|
||||
|
@ -198,16 +200,20 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
}
|
||||
|
||||
protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
|
||||
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
|
||||
final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
|
||||
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
|
||||
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
|
||||
"Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
|
||||
}
|
||||
|
||||
protected static Stream<SpanData> spanStream() {
|
||||
return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanAll() throws Exception {
|
||||
List<Result> results = doScan(createScan(), -1);
|
||||
// make sure all scanners are closed at RS side
|
||||
miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
|
||||
MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
|
||||
.map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
|
||||
rs -> assertEquals(
|
||||
"The scanner count of " + rs.getServerName() + " is "
|
||||
|
|
|
@ -24,13 +24,11 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -76,7 +74,7 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
@Override
|
||||
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
||||
AsyncTable<ScanResultConsumer> table =
|
||||
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
List<Result> results;
|
||||
if (closeAfter > 0) {
|
||||
// these tests batch settings with the sample data result in each result being
|
||||
|
@ -108,38 +106,31 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
|
||||
final Matcher<SpanData> onScanMetricsCreatedMatcher =
|
||||
hasName("TracedScanResultConsumer#onScanMetricsCreated");
|
||||
assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
|
||||
spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span,
|
||||
allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded())));
|
||||
waitForSpan(scanOperationSpanMatcher);
|
||||
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
|
||||
assertThat(spans, hasItem(onNextMatcher));
|
||||
spans.stream().filter(onNextMatcher::matches)
|
||||
waitForSpan(onNextMatcher);
|
||||
spanStream().filter(onNextMatcher::matches)
|
||||
.forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
|
||||
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||
|
||||
final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
|
||||
assertThat(spans, hasItem(onCompleteMatcher));
|
||||
spans.stream().filter(onCompleteMatcher::matches)
|
||||
waitForSpan(onCompleteMatcher);
|
||||
spanStream().filter(onCompleteMatcher::matches)
|
||||
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
||||
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||
}
|
||||
|
@ -151,27 +142,26 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||
hasException(exceptionMatcher), hasEnded());
|
||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
waitForSpan(scanOperationSpanMatcher);
|
||||
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
|
||||
assertThat(spans, hasItem(onErrorMatcher));
|
||||
spans.stream().filter(onErrorMatcher::matches)
|
||||
waitForSpan(onErrorMatcher);
|
||||
spanStream().filter(onErrorMatcher::matches)
|
||||
.forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
|
||||
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||
}
|
||||
|
|
|
@ -22,15 +22,12 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -99,20 +96,19 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
|
|||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
||||
waitForSpan(scanOperationSpanMatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -122,20 +118,19 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
|
|||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||
hasException(exceptionMatcher), hasEnded());
|
||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
||||
waitForSpan(scanOperationSpanMatcher);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,16 +22,13 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -83,7 +80,7 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
|
|||
@Override
|
||||
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
||||
AsyncTable<?> table =
|
||||
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
List<Result> results = new ArrayList<>();
|
||||
// these tests batch settings with the sample data result in each result being
|
||||
// split in two. so we must allow twice the expected results in order to reach
|
||||
|
@ -112,19 +109,17 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
|
|||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
assertThat(spans,
|
||||
hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||
waitForSpan(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,20 +129,19 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
|
|||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||
hasException(exceptionMatcher), hasEnded());
|
||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
||||
waitForSpan(scanOperationSpanMatcher);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,15 +24,12 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -77,7 +74,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
@Override
|
||||
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
||||
TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer();
|
||||
connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer);
|
||||
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer);
|
||||
List<Result> results = new ArrayList<>();
|
||||
// these tests batch settings with the sample data result in each result being
|
||||
// split in two. so we must allow twice the expected results in order to reach
|
||||
|
@ -104,39 +101,33 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
|
||||
// RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug?
|
||||
final Matcher<SpanData> onScanMetricsCreatedMatcher =
|
||||
hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated");
|
||||
assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher)));
|
||||
waitForSpan(scanOperationSpanMatcher);
|
||||
|
||||
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().get();
|
||||
final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext");
|
||||
assertThat(spans, hasItem(onNextMatcher));
|
||||
spans.stream().filter(onNextMatcher::matches)
|
||||
waitForSpan(onNextMatcher);
|
||||
spanStream().filter(onNextMatcher::matches)
|
||||
.forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId)));
|
||||
assertThat(spans, hasItem(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
|
||||
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||
waitForSpan(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
|
||||
hasStatusWithCode(StatusCode.OK), hasEnded()));
|
||||
|
||||
final Matcher<SpanData> onCompleteMatcher =
|
||||
hasName("TracedAdvancedScanResultConsumer#onComplete");
|
||||
assertThat(spans, hasItem(onCompleteMatcher));
|
||||
spans.stream().filter(onCompleteMatcher::matches)
|
||||
waitForSpan(onCompleteMatcher);
|
||||
spanStream().filter(onCompleteMatcher::matches)
|
||||
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
||||
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||
}
|
||||
|
@ -148,27 +139,26 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||
waitForSpan(parentSpanMatcher);
|
||||
|
||||
final List<SpanData> spans =
|
||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
||||
StringTraceRenderer stringTraceRenderer =
|
||||
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||
stringTraceRenderer.render(logger::debug);
|
||||
}
|
||||
|
||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
final String parentSpanId =
|
||||
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||
hasException(exceptionMatcher), hasEnded());
|
||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
||||
waitForSpan(scanOperationSpanMatcher);
|
||||
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||
.map(SpanData::getSpanId).findAny().get();
|
||||
|
||||
final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError");
|
||||
assertThat(spans, hasItem(onCompleteMatcher));
|
||||
spans.stream().filter(onCompleteMatcher::matches)
|
||||
waitForSpan(onCompleteMatcher);
|
||||
spanStream().filter(onCompleteMatcher::matches)
|
||||
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
||||
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue