From bcc7f7f06dde3470f3e218da48c8b0bde59815ad Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 23 Mar 2020 11:45:46 -0700 Subject: [PATCH] HBASE-24034 [Flakey Tests] A couple of fixes and cleanups hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java Remove spurious assert. Just before this it waits an arbitrary 10 seconds. Compactions could have completed inside this time. The spirit of the test remains. hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java Get log cleaner to go down promptly; its sticking around. See if this helps with TestMasterShutdown hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java We get a rare NPE trying to sync. Make local copy of SyncFuture and see if that helps. hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java Compaction may have completed when not expected; allow for it. hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java Add wait before testing. Compaction may not have completed. Let compaction complete before progressing and then test for empty cache. hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java Less resources. hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java Less resources. hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java Wait till online before we try and do compaction (else request is ignored) hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java Disable test that fails randomly w/ mockito complaint on some mac os x's. TestMasterShutdown... fix NPE in RSRpcDispatcher... catch it and covert to false and have master check for successful startup. --- .../hadoop/hbase/client/ConnectionUtils.java | 2 +- .../TestRSGroupMajorCompactionTTL.java | 4 +- .../apache/hadoop/hbase/master/HMaster.java | 4 +- .../hbase/master/cleaner/HFileCleaner.java | 2 +- .../procedure/RSProcedureDispatcher.java | 16 ++++--- .../hadoop/hbase/regionserver/wal/FSHLog.java | 14 +++++-- .../hbase/client/TestAsyncRegionAdminApi.java | 6 +-- .../client/TestBlockEvictionFromClient.java | 15 +++++-- .../hbase/master/TestMasterShutdown.java | 4 +- .../balancer/TestDefaultLoadBalancer.java | 7 ++-- .../quotas/TestClusterScopeQuotaThrottle.java | 2 +- .../TestEndToEndSplitTransaction.java | 16 +++++-- .../hadoop/hbase/tool/TestCanaryTool.java | 42 +++++++++++++------ 13 files changed, 90 insertions(+), 44 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index cad0c65516d..8f9dc277f4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -643,7 +643,7 @@ public final class ConnectionUtils { } LOG.trace("{} cache is null, try fetching from registry", type); if (futureRef.compareAndSet(null, new CompletableFuture<>())) { - LOG.debug("Start fetching{} from registry", type); + LOG.debug("Start fetching {} from registry", type); CompletableFuture future = futureRef.get(); addListener(fetch.get(), (value, error) -> { if (error != null) { diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java index 9b3dcbbc39e..f685780a14f 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java @@ -88,8 +88,6 @@ public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL { for (TableName tableName : tableNames) { int numberOfRegions = admin.getRegions(tableName).size(); int numHFiles = utility.getNumHFiles(tableName, FAMILY); - // we should have a table with more store files than we would before we major compacted. - assertTrue(numberOfRegions < numHFiles); modifyTTL(tableName); } @@ -103,4 +101,4 @@ public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL { assertEquals(numberOfRegions, numHFiles); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2f4bea8799f..08043ef4e24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1541,7 +1541,9 @@ public class HMaster extends HRegionServer implements MasterServices { // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more // details. procedureExecutor.init(numThreads, abortOnCorruption); - procEnv.getRemoteDispatcher().start(); + if (!procEnv.getRemoteDispatcher().start()) { + throw new HBaseIOException("Failed start of remote dispatcher"); + } } private void startProcedureExecutor() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 1747da164b6..0ddc8825ce4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -101,7 +101,7 @@ public class HFileCleaner extends CleanerChore private long cleanerThreadTimeoutMsec; private long cleanerThreadCheckIntervalMsec; private List threads = new ArrayList(); - private boolean running; + private volatile boolean running; private AtomicLong deletedLargeFiles = new AtomicLong(); private AtomicLong deletedSmallFiles = new AtomicLong(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index b469cb86e63..10b823ca673 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -93,11 +94,16 @@ public class RSProcedureDispatcher if (!super.start()) { return false; } - - master.getServerManager().registerListener(this); - procedureEnv = master.getMasterProcedureExecutor().getEnvironment(); - for (ServerName serverName: master.getServerManager().getOnlineServersList()) { - addNode(serverName); + // Around startup, if failed, some of the below may be set back to null so NPE is possible. + try { + master.getServerManager().registerListener(this); + procedureEnv = master.getMasterProcedureExecutor().getEnvironment(); + for (ServerName serverName : master.getServerManager().getOnlineServersList()) { + addNode(serverName); + } + } catch (Exception e) { + LOG.info("Failed start", e); + return false; } return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 02be831f86c..12155511d4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -552,12 +552,20 @@ public class FSHLog extends AbstractFSWAL { int syncCount = 0; try { + // Make a local copy of takeSyncFuture after we get it. We've been running into NPEs + // 2020-03-22 16:54:32,180 WARN [sync.1] wal.FSHLog$SyncRunner(589): UNEXPECTED + // java.lang.NullPointerException + // at org.apache.hadoop.hbase.regionserver.wal.FSHLog$SyncRunner.run(FSHLog.java:582) + // at java.lang.Thread.run(Thread.java:748) + SyncFuture sf; while (true) { takeSyncFuture = null; // We have to process what we 'take' from the queue takeSyncFuture = this.syncFutures.take(); + // Make local copy. + sf = takeSyncFuture; currentSequence = this.sequence; - long syncFutureSequence = takeSyncFuture.getTxid(); + long syncFutureSequence = sf.getTxid(); if (syncFutureSequence > currentSequence) { throw new IllegalStateException("currentSequence=" + currentSequence + ", syncFutureSequence=" + syncFutureSequence); @@ -565,7 +573,7 @@ public class FSHLog extends AbstractFSWAL { // See if we can process any syncfutures BEFORE we go sync. long currentHighestSyncedSequence = highestSyncedTxid.get(); if (currentSequence < currentHighestSyncedSequence) { - syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); + syncCount += releaseSyncFuture(sf, currentHighestSyncedSequence, null); // Done with the 'take'. Go around again and do a new 'take'. continue; } @@ -579,7 +587,7 @@ public class FSHLog extends AbstractFSWAL { Throwable lastException = null; try { TraceUtil.addTimelineAnnotation("syncing writer"); - writer.sync(takeSyncFuture.isForceSync()); + writer.sync(sf.isForceSync()); TraceUtil.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index aeff96e8964..33778a7f0d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -434,11 +434,11 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { } else { int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; // assert only change was to single column family - assertTrue(singleFamDiff == (countBefore - countAfter)); + assertEquals(singleFamDiff, (countBefore - countAfter)); if (expectedState == CompactionState.MAJOR) { - assertTrue(1 == countAfterSingleFamily); + assertEquals(1, countAfterSingleFamily); } else { - assertTrue(1 < countAfterSingleFamily); + assertTrue("" + countAfterSingleFamily, 1 <= countAfterSingleFamily); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index c243c4ac897..24596b5e9e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -583,11 +584,17 @@ public class TestBlockEvictionFromClient { region.flush(true); ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); - LOG.info("About to SPLIT on {} {}", Bytes.toString(ROW1), region.getRegionInfo()); + LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), + regionCount); TEST_UTIL.getAdmin().split(tableName, ROW1); // Wait for splits TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); - LOG.info("Split finished, is region closed {}", region.isClosed()); + List regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); + for (HRegion r: regions) { + LOG.info("" + r.getCompactionState()); + TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); + } + LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); Iterator iterator = cache.iterator(); // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners // should be closed inorder to return those blocks @@ -1189,11 +1196,11 @@ public class TestBlockEvictionFromClient { CachedBlock next = iterator.next(); BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); if (cache instanceof BucketCache) { - LOG.info("BucketCache {}", cacheKey); refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); + LOG.info("BucketCache {} {}", cacheKey, refCount); } else if (cache instanceof CombinedBlockCache) { - LOG.info("CombinedBlockCache {}", cacheKey); refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); + LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); } else { continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java index bcee4148ac6..a5e596f79d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java @@ -85,8 +85,8 @@ public class TestMasterShutdown { htu = new HBaseTestingUtility(conf); StartMiniClusterOption option = StartMiniClusterOption.builder() .numMasters(3) - .numRegionServers(3) - .numDataNodes(3) + .numRegionServers(1) + .numDataNodes(1) .build(); final MiniHBaseCluster cluster = htu.startMiniCluster(option); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java index ffbb6c0c436..111dae9bffa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java @@ -129,17 +129,16 @@ public class TestDefaultLoadBalancer extends BalancerTestBase { * * Invariant is that all servers should be hosting either floor(average) or * ceiling(average) at both table level and cluster level - * - * @throws Exception */ @Test public void testBalanceClusterOverall() throws Exception { Map>> clusterLoad = new TreeMap<>(); for (int[] mockCluster : clusterStateMocks) { - Map> clusterServers = mockClusterServers(mockCluster, 50); + Map> clusterServers = mockClusterServers(mockCluster, 30); List clusterList = convertToList(clusterServers); clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers); - HashMap>> result = mockClusterServersWithTables(clusterServers); + HashMap>> result = + mockClusterServersWithTables(clusterServers); loadBalancer.setClusterLoad(clusterLoad); List clusterplans = new ArrayList<>(); List> regionAmountList = new ArrayList<>(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java index fd791ca06be..cdce1222d51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java @@ -181,7 +181,7 @@ public class TestClusterScopeQuotaThrottle { triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES); } - @org.junit.Ignore @Test // Spews the log w/ triggering of scheduler? + @org.junit.Ignore @Test // Spews the log w/ triggering of scheduler? HBASE-24035 public void testUserNamespaceClusterScopeQuota() throws Exception { final Admin admin = TEST_UTIL.getAdmin(); final String userName = User.getCurrent().getShortName(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index c9884a701d0..0d7dd9cb19b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; @@ -69,7 +69,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; @@ -402,6 +401,17 @@ public class TestEndToEndSplitTransaction { public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName) throws IOException, InterruptedException { log("Compacting region: " + Bytes.toStringBinary(regionName)); + // Wait till its online before we do compact else it comes back with NoServerForRegionException + try { + TEST_UTIL.waitFor(10000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return rs.getServerName().equals(MetaTableAccessor. + getRegionLocation(admin.getConnection(), regionName).getServerName()); + } + }); + } catch (Exception e) { + throw new IOException(e); + } admin.majorCompactRegion(regionName); log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName)); Threads.sleepWithoutInterrupt(500); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java index e7b88e76987..c56ab34ed7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -60,7 +60,7 @@ import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.mockito.ArgumentMatcher; import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.junit.MockitoJUnitRunner; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; @RunWith(MockitoJUnitRunner.class) @@ -183,16 +183,30 @@ public class TestCanaryTool { } } - @Test + // Ignore this test. It fails w/ the below on some mac os x. + // [ERROR] Failures: + // [ERROR] TestCanaryTool.testReadTableTimeouts:216 + // Argument(s) are different! Wanted: + // mockAppender.doAppend( + // + // ); + // -> at org.apache.hadoop.hbase.tool.TestCanaryTool.testReadTableTimeouts(TestCanaryTool.java:216) + // Actual invocations have different arguments: + // mockAppender.doAppend( + // org.apache.log4j.spi.LoggingEvent@2055cfc1 + // ); + // ) + // ) + // + @org.junit.Ignore @Test public void testReadTableTimeouts() throws Exception { - final TableName [] tableNames = new TableName[2]; - tableNames[0] = TableName.valueOf(name.getMethodName() + "1"); - tableNames[1] = TableName.valueOf(name.getMethodName() + "2"); + final TableName [] tableNames = new TableName[] {TableName.valueOf(name.getMethodName() + "1"), + TableName.valueOf(name.getMethodName() + "2")}; // Create 2 test tables. - for (int j = 0; j<2; j++) { + for (int j = 0; j < 2; j++) { Table table = testingUtility.createTable(tableNames[j], new byte[][] { FAMILY }); // insert some test rows - for (int i=0; i<1000; i++) { + for (int i = 0; i < 10; i++) { byte[] iBytes = Bytes.toBytes(i + j); Put p = new Put(iBytes); p.addColumn(FAMILY, COLUMN, iBytes); @@ -208,9 +222,11 @@ public class TestCanaryTool { name.getMethodName() + "2"}; assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); verify(sink, times(tableNames.length)).initializeAndGetReadLatencyForTable(isA(String.class)); - for (int i=0; i<2; i++) { - assertNotEquals("verify non-null read latency", null, sink.getReadLatencyMap().get(tableNames[i].getNameAsString())); - assertNotEquals("verify non-zero read latency", 0L, sink.getReadLatencyMap().get(tableNames[i].getNameAsString())); + for (int i = 0; i < 2; i++) { + assertNotEquals("verify non-null read latency", null, + sink.getReadLatencyMap().get(tableNames[i].getNameAsString())); + assertNotEquals("verify non-zero read latency", 0L, + sink.getReadLatencyMap().get(tableNames[i].getNameAsString())); } // One table's timeout is set for 0 ms and thus, should lead to an error. verify(mockAppender, times(1)).doAppend(argThat(new ArgumentMatcher() {