HBASE-26581 Add metrics for failed replication edits (#4347)

Co-authored-by: Briana Augenreich <baugenreich@hubspot.com>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
Bri Augenreich 2022-04-26 17:42:54 -04:00 committed by Bryan Beaudreault
parent 4fb47b4800
commit 3439c5bd90
10 changed files with 127 additions and 2 deletions

View File

@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface MetricsReplicationSinkSource {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
public static final String SINK_FAILED_BATCHES = "sink.failedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize);
void incrFailedBatches();
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps();
long getFailedBatches();
}

View File

@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
public static final String SOURCE_FAILED_BATCHES = "source.failedBatches";
@Deprecated
/** @deprecated Use SOURCE_SHIPPED_BYTES instead */
@ -60,6 +61,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void decrSizeOfLogQueue(int size);
void incrLogEditsFiltered(long size);
void incrBatchesShipped(int batches);
void incrFailedBatches();
void incrOpsShipped(long ops);
void incrShippedBytes(long size);
void incrLogReadInBytes(long size);

View File

@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
@ -68,6 +69,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
@ -127,6 +130,10 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter.incr(batches);
}
@Override public void incrFailedBatches() {
failedBatchesCounter.incr();
}
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}

View File

@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
private final MutableHistogram ageHist;
private final MutableFastCounter batchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter opsCounter;
private final MutableFastCounter hfilesCounter;
public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
}
@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
opsCounter.incr(batchsize);
}
@Override
public void incrFailedBatches(){
failedBatchesCounter.incr();
}
@Override
public long getFailedBatches() {
return failedBatchesCounter.value();
}
@Override
public long getLastAppliedOpAge() {
return ageHist.getMax();
@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
@Override public long getSinkAppliedOps() {
return opsCounter.value();
}
}

View File

@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
private final String failedBatchesKey;
private String keyPrefix;
/**
@ -53,6 +54,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedKBsCounter;
private final MutableFastCounter shippedBytesCounter;
@ -91,6 +93,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
failedBatchesKey = this.keyPrefix + "failedBatches";
failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);
shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
@ -167,6 +172,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesCounter.incr(batches);
}
@Override public void incrFailedBatches() {
failedBatchesCounter.incr();
}
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
@ -187,6 +196,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(sizeOfLogQueueKey);
rms.removeMetric(shippedBatchesKey);
rms.removeMetric(failedBatchesKey);
rms.removeMetric(shippedOpsKey);
rms.removeMetric(shippedKBsKey);
rms.removeMetric(shippedBytesKey);

View File

@ -84,6 +84,21 @@ public class MetricsSink {
mss.incrAppliedHFiles(hfileSize);
}
/**
* Convenience method to update metrics when batch of operations has failed.
*/
public void incrementFailedBatches(){
mss.incrFailedBatches();
}
/**
* Get the count of the failed bathes
* @return failedBatches
*/
protected long getFailedBatches() {
return mss.getFailedBatches();
}
/**
* Get the Age of Last Applied Op
* @return ageOfLastAppliedOp

View File

@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrShippedBytes(sizeInBytes);
}
/**
* Convenience method to update metrics when batch of operations has failed.
*/
public void incrementFailedBatches(){
singleSourceSource.incrFailedBatches();
globalSourceSource.incrFailedBatches();
}
/**
* Gets the number of edits not eligible for replication this source queue logs so far.
* @return logEditsFiltered non-replicable edits filtered from this queue logs.

View File

@ -190,6 +190,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
}
@ -202,6 +203,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
@ -276,6 +278,7 @@ public class ReplicationSink {
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
this.metrics.incrementFailedBatches();
throw ex;
}
}

View File

@ -222,6 +222,7 @@ public class ReplicationSourceShipper extends Thread {
}
break;
} catch (Exception ex) {
source.getSourceMetrics().incrementFailedBatches();
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -69,15 +71,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {
@ -427,6 +428,67 @@ public class TestReplicationSink {
// Clean up the created hfiles or it will mess up subsequent tests
}
/**
* Test failure metrics produced for failed replication edits
*/
@Test
public void testFailedReplicationSinkMetrics() throws IOException {
long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
long errorCount = 0L;
List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
List<Cell> cells = new ArrayList<>();
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
cells.clear(); // cause IndexOutOfBoundsException
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
} catch (ArrayIndexOutOfBoundsException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
}
entries.clear();
cells.clear();
TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
for (int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
}
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw TableNotFoundException.");
} catch (TableNotFoundException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
}
entries.clear();
cells.clear();
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
// cause IOException in batch()
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
try (Admin admin = conn.getAdmin()) {
admin.disableTable(TABLE_NAME1);
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw IOException.");
} catch (IOException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
} finally {
admin.enableTable(TABLE_NAME1);
}
}
}
}
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);