HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
5e32e08782
commit
2e5a664233
|
@ -24,5 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
public interface MetricsReplicationSourceFactory {
|
||||
public MetricsReplicationSinkSource getSink();
|
||||
public MetricsReplicationSourceSource getSource(String id);
|
||||
public MetricsReplicationTableSource getTableSource(String tableName);
|
||||
public MetricsReplicationSourceSource getGlobalSource();
|
||||
}
|
||||
|
|
|
@ -35,6 +35,10 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
|
|||
return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
|
||||
}
|
||||
|
||||
@Override public MetricsReplicationTableSource getTableSource(String tableName) {
|
||||
return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
|
||||
}
|
||||
|
||||
@Override public MetricsReplicationSourceSource getGlobalSource() {
|
||||
return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public interface MetricsReplicationTableSource extends BaseSource {
|
||||
|
||||
void setLastShippedAge(long age);
|
||||
void incrShippedBytes(long size);
|
||||
long getShippedBytes();
|
||||
void clear();
|
||||
long getLastShippedAge();
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
|
||||
import org.apache.hadoop.metrics2.lib.MutableHistogram;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* This is the metric source for table level replication metrics.
|
||||
* We can easy monitor some useful table level replication metrics such as
|
||||
* ageOfLastShippedOp and shippedBytes
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
|
||||
|
||||
private final MetricsReplicationSourceImpl rms;
|
||||
private final String tableName;
|
||||
private final String ageOfLastShippedOpKey;
|
||||
private String keyPrefix;
|
||||
|
||||
private final String shippedBytesKey;
|
||||
|
||||
private final MutableHistogram ageOfLastShippedOpHist;
|
||||
private final MutableFastCounter shippedBytesCounter;
|
||||
|
||||
public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
|
||||
this.rms = rms;
|
||||
this.tableName = tableName;
|
||||
this.keyPrefix = "source." + this.tableName + ".";
|
||||
|
||||
ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
|
||||
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
|
||||
|
||||
shippedBytesKey = this.keyPrefix + "shippedBytes";
|
||||
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastShippedAge(long age) {
|
||||
ageOfLastShippedOpHist.add(age);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrShippedBytes(long size) {
|
||||
shippedBytesCounter.incr(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
rms.removeMetric(ageOfLastShippedOpKey);
|
||||
rms.removeMetric(shippedBytesKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastShippedAge() {
|
||||
return ageOfLastShippedOpHist.getMax();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getShippedBytes() {
|
||||
return shippedBytesCounter.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
rms.init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setGauge(String gaugeName, long value) {
|
||||
rms.setGauge(this.keyPrefix + gaugeName, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incGauge(String gaugeName, long delta) {
|
||||
rms.incGauge(this.keyPrefix + gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decGauge(String gaugeName, long delta) {
|
||||
rms.decGauge(this.keyPrefix + gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeMetric(String key) {
|
||||
rms.removeMetric(this.keyPrefix + key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incCounters(String counterName, long delta) {
|
||||
rms.incCounters(this.keyPrefix + counterName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateHistogram(String name, long value) {
|
||||
rms.updateHistogram(this.keyPrefix + name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsContext() {
|
||||
return rms.getMetricsContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsDescription() {
|
||||
return rms.getMetricsDescription();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsJmxContext() {
|
||||
return rms.getMetricsJmxContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsName() {
|
||||
return rms.getMetricsName();
|
||||
}
|
||||
}
|
|
@ -19,8 +19,11 @@
|
|||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -49,7 +52,7 @@ public class MetricsSource implements BaseSource {
|
|||
|
||||
private final MetricsReplicationSourceSource singleSourceSource;
|
||||
private final MetricsReplicationSourceSource globalSourceSource;
|
||||
private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
|
||||
private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
|
||||
|
||||
/**
|
||||
* Constructor used to register the metrics
|
||||
|
@ -73,7 +76,7 @@ public class MetricsSource implements BaseSource {
|
|||
*/
|
||||
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
|
||||
MetricsReplicationSourceSource globalSourceSource,
|
||||
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
|
||||
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
|
||||
this.id = id;
|
||||
this.singleSourceSource = singleSourceSource;
|
||||
this.globalSourceSource = globalSourceSource;
|
||||
|
@ -93,6 +96,29 @@ public class MetricsSource implements BaseSource {
|
|||
this.lastShippedTimeStamps.put(walGroup, timestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the table level replication metrics per table
|
||||
*
|
||||
* @param walEntries List of pairs of WAL entry and it's size
|
||||
*/
|
||||
public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
|
||||
for (Pair<Entry, Long> walEntryWithSize : walEntries) {
|
||||
Entry entry = walEntryWithSize.getFirst();
|
||||
long entrySize = walEntryWithSize.getSecond();
|
||||
String tableName = entry.getKey().getTableName().getNameAsString();
|
||||
long writeTime = entry.getKey().getWriteTime();
|
||||
long age = EnvironmentEdgeManager.currentTime() - writeTime;
|
||||
|
||||
// get the replication metrics source for table at the run time
|
||||
MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
|
||||
.computeIfAbsent(tableName,
|
||||
t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
|
||||
.getTableSource(t));
|
||||
tableSource.setLastShippedAge(age);
|
||||
tableSource.incrShippedBytes(entrySize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the age of the last edit that was shipped group by table
|
||||
* @param timestamp write time of the edit
|
||||
|
@ -102,7 +128,7 @@ public class MetricsSource implements BaseSource {
|
|||
long age = EnvironmentEdgeManager.currentTime() - timestamp;
|
||||
this.getSingleSourceSourceByTable().computeIfAbsent(
|
||||
tableName, t -> CompatibilitySingletonFactory
|
||||
.getInstance(MetricsReplicationSourceFactory.class).getSource(t))
|
||||
.getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
|
||||
.setLastShippedAge(age);
|
||||
}
|
||||
|
||||
|
@ -111,7 +137,7 @@ public class MetricsSource implements BaseSource {
|
|||
* @param walGroup which group we are getting
|
||||
* @return age
|
||||
*/
|
||||
public long getAgeofLastShippedOp(String walGroup) {
|
||||
public long getAgeOfLastShippedOp(String walGroup) {
|
||||
return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
|
||||
}
|
||||
|
||||
|
@ -425,7 +451,7 @@ public class MetricsSource implements BaseSource {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
|
||||
public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
|
||||
return singleSourceSourceByTable;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -348,7 +348,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
|
||||
String walGroupId = walGroupShipper.getKey();
|
||||
ReplicationSourceShipper shipper = walGroupShipper.getValue();
|
||||
ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
|
||||
ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
|
||||
int queueSize = queues.get(walGroupId).size();
|
||||
replicationDelay = metrics.getReplicationDelay();
|
||||
Path currentPath = shipper.getCurrentPath();
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
@ -208,16 +207,13 @@ public class ReplicationSourceShipper extends Thread {
|
|||
for (Entry entry : entries) {
|
||||
cleanUpHFileRefs(entry.getEdit());
|
||||
LOG.trace("shipped entry {}: ", entry);
|
||||
TableName tableName = entry.getKey().getTableName();
|
||||
source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
|
||||
tableName.getNameAsString());
|
||||
}
|
||||
// Log and clean up WAL logs
|
||||
updateLogPosition(entryBatch);
|
||||
|
||||
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
|
||||
//this sizeExcludeBulkLoad has to use same calculation that when calling
|
||||
//acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
|
||||
//acquireBufferQuota() in ReplicationSourceWALReader because they maintain
|
||||
//same variable: totalBufferUsed
|
||||
source.postShipEdits(entries, sizeExcludeBulkLoad);
|
||||
// FIXME check relationship between wal group and overall
|
||||
|
@ -225,6 +221,8 @@ public class ReplicationSourceShipper extends Thread {
|
|||
entryBatch.getNbHFiles());
|
||||
source.getSourceMetrics().setAgeOfLastShippedOp(
|
||||
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
|
||||
source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWaEntriesWithSize());
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.debug("Replicated {} entries or {} operations in {} ms",
|
||||
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
|
||||
|
|
|
@ -175,7 +175,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
|
||||
long entrySize = getEntrySizeIncludeBulkLoad(entry);
|
||||
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||
batch.addEntry(entry);
|
||||
batch.addEntry(entry, entrySize);
|
||||
updateBatchStats(batch, entry, entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
|
||||
|
||||
|
@ -311,9 +311,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
|
||||
private long getEntrySizeIncludeBulkLoad(Entry entry) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
WALKey key = entry.getKey();
|
||||
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
|
||||
key.estimatedSerializedSizeOf();
|
||||
return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
|
||||
}
|
||||
|
||||
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
|
||||
|
|
|
@ -21,7 +21,9 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -34,7 +36,8 @@ class WALEntryBatch {
|
|||
// used by recovered replication queue to indicate that all the entries have been read.
|
||||
public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
|
||||
|
||||
private List<Entry> walEntries;
|
||||
private List<Pair<Entry, Long>> walEntriesWithSize;
|
||||
|
||||
// last WAL that was read
|
||||
private Path lastWalPath;
|
||||
// position in WAL of last entry in this batch
|
||||
|
@ -54,7 +57,7 @@ class WALEntryBatch {
|
|||
* @param lastWalPath Path of the WAL the last entry in this batch was read from
|
||||
*/
|
||||
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
|
||||
this.walEntries = new ArrayList<>(maxNbEntries);
|
||||
this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
|
||||
this.lastWalPath = lastWalPath;
|
||||
}
|
||||
|
||||
|
@ -66,15 +69,22 @@ class WALEntryBatch {
|
|||
return batch;
|
||||
}
|
||||
|
||||
public void addEntry(Entry entry) {
|
||||
walEntries.add(entry);
|
||||
public void addEntry(Entry entry, long entrySize) {
|
||||
walEntriesWithSize.add(new Pair<>(entry, entrySize));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the WAL Entries.
|
||||
*/
|
||||
public List<Entry> getWalEntries() {
|
||||
return walEntries;
|
||||
return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the WAL Entries.
|
||||
*/
|
||||
public List<Pair<Entry, Long>> getWaEntriesWithSize() {
|
||||
return walEntriesWithSize;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -96,7 +106,7 @@ class WALEntryBatch {
|
|||
}
|
||||
|
||||
public int getNbEntries() {
|
||||
return walEntries.size();
|
||||
return walEntriesWithSize.size();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,7 +170,7 @@ class WALEntryBatch {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
|
||||
return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
|
||||
", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
|
||||
nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
|
||||
endOfFile + "]";
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -35,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -47,13 +50,17 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobal
|
|||
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -303,11 +310,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMetricsSourceBaseSourcePassthrough() {
|
||||
public void testMetricsSourceBaseSourcePassThrough() {
|
||||
/*
|
||||
* The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
|
||||
* MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
|
||||
* those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
|
||||
* The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
|
||||
* MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
|
||||
* so that metrics get written to both namespaces. Both of those classes wrap a
|
||||
* MetricsReplicationSourceImpl that implements BaseSource, which allows
|
||||
* for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
|
||||
* MetricsSource actually calls down through the two layers of wrapping to the actual
|
||||
* BaseSource.
|
||||
|
@ -326,9 +334,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
|
||||
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
|
||||
|
||||
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
|
||||
MetricsSource source =
|
||||
new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
|
||||
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
|
||||
new HashMap<>();
|
||||
MetricsSource source = new MetricsSource(id, singleSourceSource,
|
||||
spyglobalSourceSource, singleSourceSourceByTable);
|
||||
|
||||
|
||||
String gaugeName = "gauge";
|
||||
|
@ -377,16 +386,37 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
|
||||
.containsKey("RandomNewTable");
|
||||
Assert.assertEquals(false, containsRandomNewTable);
|
||||
source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
|
||||
source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
|
||||
containsRandomNewTable = source.getSingleSourceSourceByTable()
|
||||
.containsKey("RandomNewTable");
|
||||
Assert.assertEquals(true, containsRandomNewTable);
|
||||
MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
|
||||
MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
|
||||
.get("RandomNewTable");
|
||||
// cannot put more concreate value here to verify because the age is arbitrary.
|
||||
// as long as it's greater than 0, we see it as correct answer.
|
||||
Assert.assertTrue(msr.getLastShippedAge() > 0);
|
||||
|
||||
// age should be greater than zero we created the entry with time in the past
|
||||
Assert.assertTrue(msr.getLastShippedAge() > 0);
|
||||
Assert.assertTrue(msr.getShippedBytes() > 0);
|
||||
|
||||
}
|
||||
|
||||
private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
|
||||
List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
|
||||
byte[] a = new byte[] { 'a' };
|
||||
Entry entry = createEntry(tableName, null, a);
|
||||
walEntriesWithSize.add(new Pair<>(entry, 10L));
|
||||
return walEntriesWithSize;
|
||||
}
|
||||
|
||||
private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
|
||||
WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
|
||||
System.currentTimeMillis() - 1L,
|
||||
scopes);
|
||||
WALEdit edit1 = new WALEdit();
|
||||
|
||||
for (byte[] kv : kvs) {
|
||||
edit1.add(new KeyValue(kv, kv, kv));
|
||||
}
|
||||
return new Entry(key1, edit1);
|
||||
}
|
||||
|
||||
private void doPut(byte[] row) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue