From 2e5a6642338f00a5496d1fc17a0d585166014717 Mon Sep 17 00:00:00 2001 From: Sandeep Pal <50725353+sandeepvinayak@users.noreply.github.com> Date: Thu, 14 May 2020 10:34:51 -0700 Subject: [PATCH] HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704) Signed-off-by: Wellington Chevreuil Signed-off-by: Andrew Purtell --- .../MetricsReplicationSourceFactory.java | 1 + .../MetricsReplicationSourceFactoryImpl.java | 4 + .../MetricsReplicationTableSource.java | 32 +++++ .../MetricsReplicationTableSourceImpl.java | 134 ++++++++++++++++++ .../regionserver/MetricsSource.java | 36 ++++- .../regionserver/ReplicationSource.java | 2 +- .../ReplicationSourceShipper.java | 8 +- .../ReplicationSourceWALReader.java | 6 +- .../regionserver/WALEntryBatch.java | 24 +++- .../replication/TestReplicationEndpoint.java | 54 +++++-- 10 files changed, 267 insertions(+), 34 deletions(-) create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java index 6534b111205..2816f832edf 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java @@ -24,5 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience; public interface MetricsReplicationSourceFactory { public MetricsReplicationSinkSource getSink(); public MetricsReplicationSourceSource getSource(String id); + public MetricsReplicationTableSource getTableSource(String tableName); public MetricsReplicationSourceSource getGlobalSource(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java index af310f05445..a3b34620041 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java @@ -35,6 +35,10 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id); } + @Override public MetricsReplicationTableSource getTableSource(String tableName) { + return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName); + } + @Override public MetricsReplicationSourceSource getGlobalSource() { return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java new file mode 100644 index 00000000000..faa944a6870 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface MetricsReplicationTableSource extends BaseSource { + + void setLastShippedAge(long age); + void incrShippedBytes(long size); + long getShippedBytes(); + void clear(); + long getLastShippedAge(); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java new file mode 100644 index 00000000000..7120a73d49a --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableHistogram; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This is the metric source for table level replication metrics. + * We can easy monitor some useful table level replication metrics such as + * ageOfLastShippedOp and shippedBytes + */ +@InterfaceAudience.Private +public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource { + + private final MetricsReplicationSourceImpl rms; + private final String tableName; + private final String ageOfLastShippedOpKey; + private String keyPrefix; + + private final String shippedBytesKey; + + private final MutableHistogram ageOfLastShippedOpHist; + private final MutableFastCounter shippedBytesCounter; + + public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) { + this.rms = rms; + this.tableName = tableName; + this.keyPrefix = "source." + this.tableName + "."; + + ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp"; + ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey); + + shippedBytesKey = this.keyPrefix + "shippedBytes"; + shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L); + } + + @Override + public void setLastShippedAge(long age) { + ageOfLastShippedOpHist.add(age); + } + + @Override + public void incrShippedBytes(long size) { + shippedBytesCounter.incr(size); + } + + @Override + public void clear() { + rms.removeMetric(ageOfLastShippedOpKey); + rms.removeMetric(shippedBytesKey); + } + + @Override + public long getLastShippedAge() { + return ageOfLastShippedOpHist.getMax(); + } + + @Override + public long getShippedBytes() { + return shippedBytesCounter.value(); + } + + @Override + public void init() { + rms.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + rms.setGauge(this.keyPrefix + gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + rms.incGauge(this.keyPrefix + gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + rms.decGauge(this.keyPrefix + gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + rms.removeMetric(this.keyPrefix + key); + } + + @Override + public void incCounters(String counterName, long delta) { + rms.incCounters(this.keyPrefix + counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + rms.updateHistogram(this.keyPrefix + name, value); + } + + @Override + public String getMetricsContext() { + return rms.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return rms.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return rms.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return rms.getMetricsName(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index aa8f241f03f..39fe7b429d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +52,7 @@ public class MetricsSource implements BaseSource { private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; - private Map singleSourceSourceByTable; + private Map singleSourceSourceByTable; /** * Constructor used to register the metrics @@ -73,7 +76,7 @@ public class MetricsSource implements BaseSource { */ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, MetricsReplicationSourceSource globalSourceSource, - Map singleSourceSourceByTable) { + Map singleSourceSourceByTable) { this.id = id; this.singleSourceSource = singleSourceSource; this.globalSourceSource = globalSourceSource; @@ -93,6 +96,29 @@ public class MetricsSource implements BaseSource { this.lastShippedTimeStamps.put(walGroup, timestamp); } + /** + * Update the table level replication metrics per table + * + * @param walEntries List of pairs of WAL entry and it's size + */ + public void updateTableLevelMetrics(List> walEntries) { + for (Pair walEntryWithSize : walEntries) { + Entry entry = walEntryWithSize.getFirst(); + long entrySize = walEntryWithSize.getSecond(); + String tableName = entry.getKey().getTableName().getNameAsString(); + long writeTime = entry.getKey().getWriteTime(); + long age = EnvironmentEdgeManager.currentTime() - writeTime; + + // get the replication metrics source for table at the run time + MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable() + .computeIfAbsent(tableName, + t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getTableSource(t)); + tableSource.setLastShippedAge(age); + tableSource.incrShippedBytes(entrySize); + } + } + /** * Set the age of the last edit that was shipped group by table * @param timestamp write time of the edit @@ -102,7 +128,7 @@ public class MetricsSource implements BaseSource { long age = EnvironmentEdgeManager.currentTime() - timestamp; this.getSingleSourceSourceByTable().computeIfAbsent( tableName, t -> CompatibilitySingletonFactory - .getInstance(MetricsReplicationSourceFactory.class).getSource(t)) + .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)) .setLastShippedAge(age); } @@ -111,7 +137,7 @@ public class MetricsSource implements BaseSource { * @param walGroup which group we are getting * @return age */ - public long getAgeofLastShippedOp(String walGroup) { + public long getAgeOfLastShippedOp(String walGroup) { return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); } @@ -425,7 +451,7 @@ public class MetricsSource implements BaseSource { } @VisibleForTesting - public Map getSingleSourceSourceByTable() { + public Map getSingleSourceSourceByTable() { return singleSourceSourceByTable; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index b6f2ee1e731..c4936e68e26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -348,7 +348,7 @@ public class ReplicationSource implements ReplicationSourceInterface { for (Map.Entry walGroupShipper : workerThreads.entrySet()) { String walGroupId = walGroupShipper.getKey(); ReplicationSourceShipper shipper = walGroupShipper.getValue(); - ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); + ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); int queueSize = queues.get(walGroupId).size(); replicationDelay = metrics.getReplicationDelay(); Path currentPath = shipper.getCurrentPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 732b6872c0b..ad76c5c4616 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -208,16 +207,13 @@ public class ReplicationSourceShipper extends Thread { for (Entry entry : entries) { cleanUpHFileRefs(entry.getEdit()); LOG.trace("shipped entry {}: ", entry); - TableName tableName = entry.getKey().getTableName(); - source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), - tableName.getNameAsString()); } // Log and clean up WAL logs updateLogPosition(entryBatch); //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) //this sizeExcludeBulkLoad has to use same calculation that when calling - //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain + //acquireBufferQuota() in ReplicationSourceWALReader because they maintain //same variable: totalBufferUsed source.postShipEdits(entries, sizeExcludeBulkLoad); // FIXME check relationship between wal group and overall @@ -225,6 +221,8 @@ public class ReplicationSourceShipper extends Thread { entryBatch.getNbHFiles()); source.getSourceMetrics().setAgeOfLastShippedOp( entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); + source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWaEntriesWithSize()); + if (LOG.isTraceEnabled()) { LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 386651316d8..7e0e550106e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -175,7 +175,7 @@ class ReplicationSourceWALReader extends Thread { entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); - batch.addEntry(entry); + batch.addEntry(entry, entrySize); updateBatchStats(batch, entry, entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); @@ -311,9 +311,7 @@ class ReplicationSourceWALReader extends Thread { private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); - WALKey key = entry.getKey(); - return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + - key.estimatedSerializedSizeOf(); + return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); } public static long getEntrySizeExcludeBulkLoad(Entry entry) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 22b2de78173..bc600d08880 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -34,7 +36,8 @@ class WALEntryBatch { // used by recovered replication queue to indicate that all the entries have been read. public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null); - private List walEntries; + private List> walEntriesWithSize; + // last WAL that was read private Path lastWalPath; // position in WAL of last entry in this batch @@ -54,7 +57,7 @@ class WALEntryBatch { * @param lastWalPath Path of the WAL the last entry in this batch was read from */ WALEntryBatch(int maxNbEntries, Path lastWalPath) { - this.walEntries = new ArrayList<>(maxNbEntries); + this.walEntriesWithSize = new ArrayList<>(maxNbEntries); this.lastWalPath = lastWalPath; } @@ -66,15 +69,22 @@ class WALEntryBatch { return batch; } - public void addEntry(Entry entry) { - walEntries.add(entry); + public void addEntry(Entry entry, long entrySize) { + walEntriesWithSize.add(new Pair<>(entry, entrySize)); } /** * @return the WAL Entries. */ public List getWalEntries() { - return walEntries; + return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList()); + } + + /** + * @return the WAL Entries. + */ + public List> getWaEntriesWithSize() { + return walEntriesWithSize; } /** @@ -96,7 +106,7 @@ class WALEntryBatch { } public int getNbEntries() { - return walEntries.size(); + return walEntriesWithSize.size(); } /** @@ -160,7 +170,7 @@ class WALEntryBatch { @Override public String toString() { - return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath + + return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" + endOfFile + "]"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 4588ace5900..1735f83564f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -47,13 +50,17 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobal import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.junit.AfterClass; @@ -303,11 +310,12 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Test - public void testMetricsSourceBaseSourcePassthrough() { + public void testMetricsSourceBaseSourcePassThrough() { /* - * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a - * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of - * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows + * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, + * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, + * so that metrics get written to both namespaces. Both of those classes wrap a + * MetricsReplicationSourceImpl that implements BaseSource, which allows * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on * MetricsSource actually calls down through the two layers of wrapping to the actual * BaseSource. @@ -326,9 +334,10 @@ public class TestReplicationEndpoint extends TestReplicationBase { MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); - Map singleSourceSourceByTable = new HashMap<>(); - MetricsSource source = - new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); + Map singleSourceSourceByTable = + new HashMap<>(); + MetricsSource source = new MetricsSource(id, singleSourceSource, + spyglobalSourceSource, singleSourceSourceByTable); String gaugeName = "gauge"; @@ -377,16 +386,37 @@ public class TestReplicationEndpoint extends TestReplicationBase { boolean containsRandomNewTable = source.getSingleSourceSourceByTable() .containsKey("RandomNewTable"); Assert.assertEquals(false, containsRandomNewTable); - source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable"); + source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); containsRandomNewTable = source.getSingleSourceSourceByTable() .containsKey("RandomNewTable"); Assert.assertEquals(true, containsRandomNewTable); - MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable() + MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable() .get("RandomNewTable"); - // cannot put more concreate value here to verify because the age is arbitrary. - // as long as it's greater than 0, we see it as correct answer. - Assert.assertTrue(msr.getLastShippedAge() > 0); + // age should be greater than zero we created the entry with time in the past + Assert.assertTrue(msr.getLastShippedAge() > 0); + Assert.assertTrue(msr.getShippedBytes() > 0); + + } + + private List> createWALEntriesWithSize(String tableName) { + List> walEntriesWithSize = new ArrayList<>(); + byte[] a = new byte[] { 'a' }; + Entry entry = createEntry(tableName, null, a); + walEntriesWithSize.add(new Pair<>(entry, 10L)); + return walEntriesWithSize; + } + + private Entry createEntry(String tableName, TreeMap scopes, byte[]... kvs) { + WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), + System.currentTimeMillis() - 1L, + scopes); + WALEdit edit1 = new WALEdit(); + + for (byte[] kv : kvs) { + edit1.add(new KeyValue(kv, kv, kv)); + } + return new Entry(key1, edit1); } private void doPut(byte[] row) throws IOException {