HBASE-27360 The trace related assertions are flaky for async client tests (#4767)
Signed-off-by: GeorryHuang <huangzhuoyue@apache.org> (cherry picked from commit f3f88ff6c17633d98facf9dccb57e63b87200e4a)
This commit is contained in:
parent
a1fc483ba8
commit
7ae66a1d2e
@ -236,7 +236,9 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
|
|||||||
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
|
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
|
||||||
span = scanner.getSpan();
|
span = scanner.getSpan();
|
||||||
try (Scope ignored = span.makeCurrent()) {
|
try (Scope ignored = span.makeCurrent()) {
|
||||||
consumer.onScanMetricsCreated(scanner.getScanMetrics());
|
if (scan.isScanMetricsEnabled()) {
|
||||||
|
consumer.onScanMetricsCreated(scanner.getScanMetrics());
|
||||||
|
}
|
||||||
for (Result result; (result = scanner.next()) != null;) {
|
for (Result result; (result = scanner.next()) != null;) {
|
||||||
if (!consumer.onNext(result)) {
|
if (!consumer.onNext(result)) {
|
||||||
break;
|
break;
|
||||||
|
@ -37,12 +37,14 @@ import java.io.IOException;
|
|||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ConnectionRule;
|
import org.apache.hadoop.hbase.ConnectionRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
@ -70,18 +72,18 @@ import org.junit.rules.TestRule;
|
|||||||
|
|
||||||
public abstract class AbstractTestAsyncTableScan {
|
public abstract class AbstractTestAsyncTableScan {
|
||||||
|
|
||||||
protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
|
protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
|
||||||
protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
|
protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder()
|
||||||
.setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build();
|
.setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build();
|
||||||
|
|
||||||
protected static final ConnectionRule connectionRule =
|
protected static final ConnectionRule CONN_RULE =
|
||||||
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
|
ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);
|
||||||
|
|
||||||
private static final class Setup extends ExternalResource {
|
private static final class Setup extends ExternalResource {
|
||||||
@Override
|
@Override
|
||||||
protected void before() throws Throwable {
|
protected void before() throws Throwable {
|
||||||
final HBaseTestingUtility testingUtil = miniClusterRule.getTestingUtility();
|
final HBaseTestingUtility testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
|
||||||
final AsyncConnection conn = connectionRule.getAsyncConnection();
|
final AsyncConnection conn = CONN_RULE.getAsyncConnection();
|
||||||
|
|
||||||
byte[][] splitKeys = new byte[8][];
|
byte[][] splitKeys = new byte[8][];
|
||||||
for (int i = 111; i < 999; i += 111) {
|
for (int i = 111; i < 999; i += 111) {
|
||||||
@ -99,11 +101,11 @@ public abstract class AbstractTestAsyncTableScan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
|
public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
|
||||||
.around(miniClusterRule).around(connectionRule).around(new Setup());
|
.around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
|
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final TestName testName = new TestName();
|
public final TestName testName = new TestName();
|
||||||
@ -136,11 +138,11 @@ public abstract class AbstractTestAsyncTableScan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static AsyncTable<?> getRawTable() {
|
private static AsyncTable<?> getRawTable() {
|
||||||
return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
|
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static AsyncTable<?> getTable() {
|
private static AsyncTable<?> getTable() {
|
||||||
return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
|
private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
|
||||||
@ -198,16 +200,20 @@ public abstract class AbstractTestAsyncTableScan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
|
protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
|
||||||
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
|
final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
|
||||||
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
|
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
|
||||||
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
|
"Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Stream<SpanData> spanStream() {
|
||||||
|
return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScanAll() throws Exception {
|
public void testScanAll() throws Exception {
|
||||||
List<Result> results = doScan(createScan(), -1);
|
List<Result> results = doScan(createScan(), -1);
|
||||||
// make sure all scanners are closed at RS side
|
// make sure all scanners are closed at RS side
|
||||||
miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
|
MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
|
||||||
.map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
|
.map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
|
||||||
rs -> assertEquals(
|
rs -> assertEquals(
|
||||||
"The scanner count of " + rs.getServerName() + " is "
|
"The scanner count of " + rs.getServerName() + " is "
|
||||||
|
@ -24,13 +24,11 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.allOf;
|
import static org.hamcrest.Matchers.allOf;
|
||||||
import static org.hamcrest.Matchers.hasItem;
|
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.StatusCode;
|
import io.opentelemetry.api.trace.StatusCode;
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -76,7 +74,7 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||||||
@Override
|
@Override
|
||||||
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
||||||
AsyncTable<ScanResultConsumer> table =
|
AsyncTable<ScanResultConsumer> table =
|
||||||
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||||
List<Result> results;
|
List<Result> results;
|
||||||
if (closeAfter > 0) {
|
if (closeAfter > 0) {
|
||||||
// these tests batch settings with the sample data result in each result being
|
// these tests batch settings with the sample data result in each result being
|
||||||
@ -108,38 +106,31 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||||||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
waitForSpan(scanOperationSpanMatcher);
|
||||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
.map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> onScanMetricsCreatedMatcher =
|
|
||||||
hasName("TracedScanResultConsumer#onScanMetricsCreated");
|
|
||||||
assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
|
|
||||||
spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span,
|
|
||||||
allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded())));
|
|
||||||
|
|
||||||
final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
|
final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
|
||||||
assertThat(spans, hasItem(onNextMatcher));
|
waitForSpan(onNextMatcher);
|
||||||
spans.stream().filter(onNextMatcher::matches)
|
spanStream().filter(onNextMatcher::matches)
|
||||||
.forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
|
.forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
|
||||||
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||||
|
|
||||||
final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
|
final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
|
||||||
assertThat(spans, hasItem(onCompleteMatcher));
|
waitForSpan(onCompleteMatcher);
|
||||||
spans.stream().filter(onCompleteMatcher::matches)
|
spanStream().filter(onCompleteMatcher::matches)
|
||||||
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
||||||
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||||
}
|
}
|
||||||
@ -151,27 +142,26 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||||||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||||
hasException(exceptionMatcher), hasEnded());
|
hasException(exceptionMatcher), hasEnded());
|
||||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
waitForSpan(scanOperationSpanMatcher);
|
||||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
.map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
|
final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
|
||||||
assertThat(spans, hasItem(onErrorMatcher));
|
waitForSpan(onErrorMatcher);
|
||||||
spans.stream().filter(onErrorMatcher::matches)
|
spanStream().filter(onErrorMatcher::matches)
|
||||||
.forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
|
.forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
|
||||||
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||||
}
|
}
|
||||||
|
@ -22,15 +22,12 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
|
||||||
import static org.hamcrest.Matchers.allOf;
|
import static org.hamcrest.Matchers.allOf;
|
||||||
import static org.hamcrest.Matchers.hasItem;
|
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.StatusCode;
|
import io.opentelemetry.api.trace.StatusCode;
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
@ -99,20 +96,19 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
|
|||||||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
waitForSpan(scanOperationSpanMatcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -122,20 +118,19 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
|
|||||||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||||
hasException(exceptionMatcher), hasEnded());
|
hasException(exceptionMatcher), hasEnded());
|
||||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
waitForSpan(scanOperationSpanMatcher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,16 +22,13 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
|
||||||
import static org.hamcrest.Matchers.allOf;
|
import static org.hamcrest.Matchers.allOf;
|
||||||
import static org.hamcrest.Matchers.hasItem;
|
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.StatusCode;
|
import io.opentelemetry.api.trace.StatusCode;
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -83,7 +80,7 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
|
|||||||
@Override
|
@Override
|
||||||
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
||||||
AsyncTable<?> table =
|
AsyncTable<?> table =
|
||||||
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||||
List<Result> results = new ArrayList<>();
|
List<Result> results = new ArrayList<>();
|
||||||
// these tests batch settings with the sample data result in each result being
|
// these tests batch settings with the sample data result in each result being
|
||||||
// split in two. so we must allow twice the expected results in order to reach
|
// split in two. so we must allow twice the expected results in order to reach
|
||||||
@ -112,19 +109,17 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
|
|||||||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
assertThat(spans,
|
waitForSpan(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()));
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -134,20 +129,19 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
|
|||||||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||||
hasException(exceptionMatcher), hasEnded());
|
hasException(exceptionMatcher), hasEnded());
|
||||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
waitForSpan(scanOperationSpanMatcher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,15 +24,12 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.has
|
|||||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.allOf;
|
import static org.hamcrest.Matchers.allOf;
|
||||||
import static org.hamcrest.Matchers.hasItem;
|
|
||||||
import static org.hamcrest.Matchers.not;
|
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.StatusCode;
|
import io.opentelemetry.api.trace.StatusCode;
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
@ -77,7 +74,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||||||
@Override
|
@Override
|
||||||
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
|
||||||
TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer();
|
TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer();
|
||||||
connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer);
|
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer);
|
||||||
List<Result> results = new ArrayList<>();
|
List<Result> results = new ArrayList<>();
|
||||||
// these tests batch settings with the sample data result in each result being
|
// these tests batch settings with the sample data result in each result being
|
||||||
// split in two. so we must allow twice the expected results in order to reach
|
// split in two. so we must allow twice the expected results in order to reach
|
||||||
@ -104,39 +101,33 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||||||
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
|
||||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
waitForSpan(scanOperationSpanMatcher);
|
||||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
|
||||||
|
|
||||||
// RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug?
|
|
||||||
final Matcher<SpanData> onScanMetricsCreatedMatcher =
|
|
||||||
hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated");
|
|
||||||
assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher)));
|
|
||||||
|
|
||||||
|
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||||
|
.map(SpanData::getSpanId).findAny().get();
|
||||||
final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext");
|
final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext");
|
||||||
assertThat(spans, hasItem(onNextMatcher));
|
waitForSpan(onNextMatcher);
|
||||||
spans.stream().filter(onNextMatcher::matches)
|
spanStream().filter(onNextMatcher::matches)
|
||||||
.forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId)));
|
.forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId)));
|
||||||
assertThat(spans, hasItem(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
|
waitForSpan(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
|
||||||
hasStatusWithCode(StatusCode.OK), hasEnded())));
|
hasStatusWithCode(StatusCode.OK), hasEnded()));
|
||||||
|
|
||||||
final Matcher<SpanData> onCompleteMatcher =
|
final Matcher<SpanData> onCompleteMatcher =
|
||||||
hasName("TracedAdvancedScanResultConsumer#onComplete");
|
hasName("TracedAdvancedScanResultConsumer#onComplete");
|
||||||
assertThat(spans, hasItem(onCompleteMatcher));
|
waitForSpan(onCompleteMatcher);
|
||||||
spans.stream().filter(onCompleteMatcher::matches)
|
spanStream().filter(onCompleteMatcher::matches)
|
||||||
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
||||||
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||||
}
|
}
|
||||||
@ -148,27 +139,26 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||||||
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
|
||||||
waitForSpan(parentSpanMatcher);
|
waitForSpan(parentSpanMatcher);
|
||||||
|
|
||||||
final List<SpanData> spans =
|
|
||||||
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
|
StringTraceRenderer stringTraceRenderer =
|
||||||
|
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
|
||||||
stringTraceRenderer.render(logger::debug);
|
stringTraceRenderer.render(logger::debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
|
final String parentSpanId =
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> scanOperationSpanMatcher =
|
final Matcher<SpanData> scanOperationSpanMatcher =
|
||||||
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
|
||||||
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
|
||||||
hasException(exceptionMatcher), hasEnded());
|
hasException(exceptionMatcher), hasEnded());
|
||||||
assertThat(spans, hasItem(scanOperationSpanMatcher));
|
waitForSpan(scanOperationSpanMatcher);
|
||||||
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
|
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
|
||||||
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
|
.map(SpanData::getSpanId).findAny().get();
|
||||||
|
|
||||||
final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError");
|
final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError");
|
||||||
assertThat(spans, hasItem(onCompleteMatcher));
|
waitForSpan(onCompleteMatcher);
|
||||||
spans.stream().filter(onCompleteMatcher::matches)
|
spanStream().filter(onCompleteMatcher::matches)
|
||||||
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
|
||||||
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user