HBASE-24034 [Flakey Tests] A couple of fixes and cleanups

hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java
 Remove spurious assert. Just before this it waits an arbitrary 10
 seconds. Compactions could have completed inside this time. The spirit
 of the test remains.

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
 Get log cleaner to go down promptly; its sticking around. See if this
 helps with TestMasterShutdown

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 We get a rare NPE trying to sync. Make local copy of SyncFuture and see
 if that helps.

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
 Compaction  may have completed when not expected; allow for it.

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
 Add wait before testing. Compaction may not have completed. Let
 compaction complete before progressing and then test for empty cache.

hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java
 Less resources.

hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
 Less resources.

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
 Wait till online before we try and do compaction (else request is
 ignored)

hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
 Disable test that fails randomly w/ mockito complaint on some mac os
 x's.

TestMasterShutdown... fix NPE in RSRpcDispatcher... catch it and covert
to false and have master check for successful startup.
This commit is contained in:
stack 2020-03-23 11:45:46 -07:00
parent 12e5b07258
commit bcc7f7f06d
13 changed files with 90 additions and 44 deletions

View File

@ -643,7 +643,7 @@ public final class ConnectionUtils {
} }
LOG.trace("{} cache is null, try fetching from registry", type); LOG.trace("{} cache is null, try fetching from registry", type);
if (futureRef.compareAndSet(null, new CompletableFuture<>())) { if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
LOG.debug("Start fetching{} from registry", type); LOG.debug("Start fetching {} from registry", type);
CompletableFuture<T> future = futureRef.get(); CompletableFuture<T> future = futureRef.get();
addListener(fetch.get(), (value, error) -> { addListener(fetch.get(), (value, error) -> {
if (error != null) { if (error != null) {

View File

@ -88,8 +88,6 @@ public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL {
for (TableName tableName : tableNames) { for (TableName tableName : tableNames) {
int numberOfRegions = admin.getRegions(tableName).size(); int numberOfRegions = admin.getRegions(tableName).size();
int numHFiles = utility.getNumHFiles(tableName, FAMILY); int numHFiles = utility.getNumHFiles(tableName, FAMILY);
// we should have a table with more store files than we would before we major compacted.
assertTrue(numberOfRegions < numHFiles);
modifyTTL(tableName); modifyTTL(tableName);
} }
@ -103,4 +101,4 @@ public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL {
assertEquals(numberOfRegions, numHFiles); assertEquals(numberOfRegions, numHFiles);
} }
} }
} }

View File

@ -1541,7 +1541,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
// details. // details.
procedureExecutor.init(numThreads, abortOnCorruption); procedureExecutor.init(numThreads, abortOnCorruption);
procEnv.getRemoteDispatcher().start(); if (!procEnv.getRemoteDispatcher().start()) {
throw new HBaseIOException("Failed start of remote dispatcher");
}
} }
private void startProcedureExecutor() throws IOException { private void startProcedureExecutor() throws IOException {

View File

@ -101,7 +101,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
private long cleanerThreadTimeoutMsec; private long cleanerThreadTimeoutMsec;
private long cleanerThreadCheckIntervalMsec; private long cleanerThreadCheckIntervalMsec;
private List<Thread> threads = new ArrayList<Thread>(); private List<Thread> threads = new ArrayList<Thread>();
private boolean running; private volatile boolean running;
private AtomicLong deletedLargeFiles = new AtomicLong(); private AtomicLong deletedLargeFiles = new AtomicLong();
private AtomicLong deletedSmallFiles = new AtomicLong(); private AtomicLong deletedSmallFiles = new AtomicLong();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
@ -93,11 +94,16 @@ public class RSProcedureDispatcher
if (!super.start()) { if (!super.start()) {
return false; return false;
} }
// Around startup, if failed, some of the below may be set back to null so NPE is possible.
master.getServerManager().registerListener(this); try {
procedureEnv = master.getMasterProcedureExecutor().getEnvironment(); master.getServerManager().registerListener(this);
for (ServerName serverName: master.getServerManager().getOnlineServersList()) { procedureEnv = master.getMasterProcedureExecutor().getEnvironment();
addNode(serverName); for (ServerName serverName : master.getServerManager().getOnlineServersList()) {
addNode(serverName);
}
} catch (Exception e) {
LOG.info("Failed start", e);
return false;
} }
return true; return true;
} }

View File

@ -552,12 +552,20 @@ public class FSHLog extends AbstractFSWAL<Writer> {
int syncCount = 0; int syncCount = 0;
try { try {
// Make a local copy of takeSyncFuture after we get it. We've been running into NPEs
// 2020-03-22 16:54:32,180 WARN [sync.1] wal.FSHLog$SyncRunner(589): UNEXPECTED
// java.lang.NullPointerException
// at org.apache.hadoop.hbase.regionserver.wal.FSHLog$SyncRunner.run(FSHLog.java:582)
// at java.lang.Thread.run(Thread.java:748)
SyncFuture sf;
while (true) { while (true) {
takeSyncFuture = null; takeSyncFuture = null;
// We have to process what we 'take' from the queue // We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take(); takeSyncFuture = this.syncFutures.take();
// Make local copy.
sf = takeSyncFuture;
currentSequence = this.sequence; currentSequence = this.sequence;
long syncFutureSequence = takeSyncFuture.getTxid(); long syncFutureSequence = sf.getTxid();
if (syncFutureSequence > currentSequence) { if (syncFutureSequence > currentSequence) {
throw new IllegalStateException("currentSequence=" + currentSequence throw new IllegalStateException("currentSequence=" + currentSequence
+ ", syncFutureSequence=" + syncFutureSequence); + ", syncFutureSequence=" + syncFutureSequence);
@ -565,7 +573,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// See if we can process any syncfutures BEFORE we go sync. // See if we can process any syncfutures BEFORE we go sync.
long currentHighestSyncedSequence = highestSyncedTxid.get(); long currentHighestSyncedSequence = highestSyncedTxid.get();
if (currentSequence < currentHighestSyncedSequence) { if (currentSequence < currentHighestSyncedSequence) {
syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); syncCount += releaseSyncFuture(sf, currentHighestSyncedSequence, null);
// Done with the 'take'. Go around again and do a new 'take'. // Done with the 'take'. Go around again and do a new 'take'.
continue; continue;
} }
@ -579,7 +587,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
Throwable lastException = null; Throwable lastException = null;
try { try {
TraceUtil.addTimelineAnnotation("syncing writer"); TraceUtil.addTimelineAnnotation("syncing writer");
writer.sync(takeSyncFuture.isForceSync()); writer.sync(sf.isForceSync());
TraceUtil.addTimelineAnnotation("writer synced"); TraceUtil.addTimelineAnnotation("writer synced");
currentSequence = updateHighestSyncedSequence(currentSequence); currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) { } catch (IOException e) {

View File

@ -434,11 +434,11 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
} else { } else {
int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
// assert only change was to single column family // assert only change was to single column family
assertTrue(singleFamDiff == (countBefore - countAfter)); assertEquals(singleFamDiff, (countBefore - countAfter));
if (expectedState == CompactionState.MAJOR) { if (expectedState == CompactionState.MAJOR) {
assertTrue(1 == countAfterSingleFamily); assertEquals(1, countAfterSingleFamily);
} else { } else {
assertTrue(1 < countAfterSingleFamily); assertTrue("" + countAfterSingleFamily, 1 <= countAfterSingleFamily);
} }
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@ -583,11 +584,17 @@ public class TestBlockEvictionFromClient {
region.flush(true); region.flush(true);
ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
LOG.info("About to SPLIT on {} {}", Bytes.toString(ROW1), region.getRegionInfo()); LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
regionCount);
TEST_UTIL.getAdmin().split(tableName, ROW1); TEST_UTIL.getAdmin().split(tableName, ROW1);
// Wait for splits // Wait for splits
TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
LOG.info("Split finished, is region closed {}", region.isClosed()); List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
for (HRegion r: regions) {
LOG.info("" + r.getCompactionState());
TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
}
LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
Iterator<CachedBlock> iterator = cache.iterator(); Iterator<CachedBlock> iterator = cache.iterator();
// Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
// should be closed inorder to return those blocks // should be closed inorder to return those blocks
@ -1189,11 +1196,11 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next(); CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) { if (cache instanceof BucketCache) {
LOG.info("BucketCache {}", cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
LOG.info("BucketCache {} {}", cacheKey, refCount);
} else if (cache instanceof CombinedBlockCache) { } else if (cache instanceof CombinedBlockCache) {
LOG.info("CombinedBlockCache {}", cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
} else { } else {
continue; continue;
} }

View File

@ -85,8 +85,8 @@ public class TestMasterShutdown {
htu = new HBaseTestingUtility(conf); htu = new HBaseTestingUtility(conf);
StartMiniClusterOption option = StartMiniClusterOption.builder() StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(3) .numMasters(3)
.numRegionServers(3) .numRegionServers(1)
.numDataNodes(3) .numDataNodes(1)
.build(); .build();
final MiniHBaseCluster cluster = htu.startMiniCluster(option); final MiniHBaseCluster cluster = htu.startMiniCluster(option);

View File

@ -129,17 +129,16 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
* *
* Invariant is that all servers should be hosting either floor(average) or * Invariant is that all servers should be hosting either floor(average) or
* ceiling(average) at both table level and cluster level * ceiling(average) at both table level and cluster level
*
* @throws Exception
*/ */
@Test @Test
public void testBalanceClusterOverall() throws Exception { public void testBalanceClusterOverall() throws Exception {
Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad = new TreeMap<>(); Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad = new TreeMap<>();
for (int[] mockCluster : clusterStateMocks) { for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> clusterServers = mockClusterServers(mockCluster, 50); Map<ServerName, List<RegionInfo>> clusterServers = mockClusterServers(mockCluster, 30);
List<ServerAndLoad> clusterList = convertToList(clusterServers); List<ServerAndLoad> clusterList = convertToList(clusterServers);
clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers); clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers);
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> result = mockClusterServersWithTables(clusterServers); HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> result =
mockClusterServersWithTables(clusterServers);
loadBalancer.setClusterLoad(clusterLoad); loadBalancer.setClusterLoad(clusterLoad);
List<RegionPlan> clusterplans = new ArrayList<>(); List<RegionPlan> clusterplans = new ArrayList<>();
List<Pair<TableName, Integer>> regionAmountList = new ArrayList<>(); List<Pair<TableName, Integer>> regionAmountList = new ArrayList<>();

View File

@ -181,7 +181,7 @@ public class TestClusterScopeQuotaThrottle {
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
} }
@org.junit.Ignore @Test // Spews the log w/ triggering of scheduler? @org.junit.Ignore @Test // Spews the log w/ triggering of scheduler? HBASE-24035
public void testUserNamespaceClusterScopeQuota() throws Exception { public void testUserNamespaceClusterScopeQuota() throws Exception {
final Admin admin = TEST_UTIL.getAdmin(); final Admin admin = TEST_UTIL.getAdmin();
final String userName = User.getCurrent().getShortName(); final String userName = User.getCurrent().getShortName();

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -69,7 +69,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@ -402,6 +401,17 @@ public class TestEndToEndSplitTransaction {
public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName) public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
throws IOException, InterruptedException { throws IOException, InterruptedException {
log("Compacting region: " + Bytes.toStringBinary(regionName)); log("Compacting region: " + Bytes.toStringBinary(regionName));
// Wait till its online before we do compact else it comes back with NoServerForRegionException
try {
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return rs.getServerName().equals(MetaTableAccessor.
getRegionLocation(admin.getConnection(), regionName).getServerName());
}
});
} catch (Exception e) {
throw new IOException(e);
}
admin.majorCompactRegion(regionName); admin.majorCompactRegion(regionName);
log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName)); log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
Threads.sleepWithoutInterrupt(500); Threads.sleepWithoutInterrupt(500);

View File

@ -22,10 +22,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Matchers.anyLong; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.eq; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -60,7 +60,7 @@ import org.junit.rules.TestName;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@ -183,16 +183,30 @@ public class TestCanaryTool {
} }
} }
@Test // Ignore this test. It fails w/ the below on some mac os x.
// [ERROR] Failures:
// [ERROR] TestCanaryTool.testReadTableTimeouts:216
// Argument(s) are different! Wanted:
// mockAppender.doAppend(
// <custom argument matcher>
// );
// -> at org.apache.hadoop.hbase.tool.TestCanaryTool.testReadTableTimeouts(TestCanaryTool.java:216)
// Actual invocations have different arguments:
// mockAppender.doAppend(
// org.apache.log4j.spi.LoggingEvent@2055cfc1
// );
// )
// )
//
@org.junit.Ignore @Test
public void testReadTableTimeouts() throws Exception { public void testReadTableTimeouts() throws Exception {
final TableName [] tableNames = new TableName[2]; final TableName [] tableNames = new TableName[] {TableName.valueOf(name.getMethodName() + "1"),
tableNames[0] = TableName.valueOf(name.getMethodName() + "1"); TableName.valueOf(name.getMethodName() + "2")};
tableNames[1] = TableName.valueOf(name.getMethodName() + "2");
// Create 2 test tables. // Create 2 test tables.
for (int j = 0; j<2; j++) { for (int j = 0; j < 2; j++) {
Table table = testingUtility.createTable(tableNames[j], new byte[][] { FAMILY }); Table table = testingUtility.createTable(tableNames[j], new byte[][] { FAMILY });
// insert some test rows // insert some test rows
for (int i=0; i<1000; i++) { for (int i = 0; i < 10; i++) {
byte[] iBytes = Bytes.toBytes(i + j); byte[] iBytes = Bytes.toBytes(i + j);
Put p = new Put(iBytes); Put p = new Put(iBytes);
p.addColumn(FAMILY, COLUMN, iBytes); p.addColumn(FAMILY, COLUMN, iBytes);
@ -208,9 +222,11 @@ public class TestCanaryTool {
name.getMethodName() + "2"}; name.getMethodName() + "2"};
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
verify(sink, times(tableNames.length)).initializeAndGetReadLatencyForTable(isA(String.class)); verify(sink, times(tableNames.length)).initializeAndGetReadLatencyForTable(isA(String.class));
for (int i=0; i<2; i++) { for (int i = 0; i < 2; i++) {
assertNotEquals("verify non-null read latency", null, sink.getReadLatencyMap().get(tableNames[i].getNameAsString())); assertNotEquals("verify non-null read latency", null,
assertNotEquals("verify non-zero read latency", 0L, sink.getReadLatencyMap().get(tableNames[i].getNameAsString())); sink.getReadLatencyMap().get(tableNames[i].getNameAsString()));
assertNotEquals("verify non-zero read latency", 0L,
sink.getReadLatencyMap().get(tableNames[i].getNameAsString()));
} }
// One table's timeout is set for 0 ms and thus, should lead to an error. // One table's timeout is set for 0 ms and thus, should lead to an error.
verify(mockAppender, times(1)).doAppend(argThat(new ArgumentMatcher<LoggingEvent>() { verify(mockAppender, times(1)).doAppend(argThat(new ArgumentMatcher<LoggingEvent>() {