From 3439c5bd901c41ef916f708d71fae52652f5ac8a Mon Sep 17 00:00:00 2001 From: Bri Augenreich Date: Tue, 26 Apr 2022 17:42:54 -0400 Subject: [PATCH] HBASE-26581 Add metrics for failed replication edits (#4347) Co-authored-by: Briana Augenreich Signed-off-by: Andrew Purtell Signed-off-by: Bryan Beaudreault --- .../MetricsReplicationSinkSource.java | 3 + .../MetricsReplicationSourceSource.java | 2 + ...ricsReplicationGlobalSourceSourceImpl.java | 7 ++ .../MetricsReplicationSinkSourceImpl.java | 13 ++++ .../MetricsReplicationSourceSourceImpl.java | 10 +++ .../replication/regionserver/MetricsSink.java | 15 +++++ .../regionserver/MetricsSource.java | 9 +++ .../regionserver/ReplicationSink.java | 3 + .../ReplicationSourceShipper.java | 1 + .../regionserver/TestReplicationSink.java | 66 ++++++++++++++++++- 10 files changed, 127 insertions(+), 2 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java index 2498e3426a5..fe11c1049ce 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java @@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience; public interface MetricsReplicationSinkSource { public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; + public static final String SINK_FAILED_BATCHES = "sink.failedBatches"; public static final String SINK_APPLIED_OPS = "sink.appliedOps"; public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles"; void setLastAppliedOpAge(long age); void incrAppliedBatches(long batches); void incrAppliedOps(long batchsize); + void incrFailedBatches(); long getLastAppliedOpAge(); void incrAppliedHFiles(long hfileSize); long getSinkAppliedOps(); + long getFailedBatches(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index a6cf79b710f..d37dc133e2c 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + public static final String SOURCE_FAILED_BATCHES = "source.failedBatches"; @Deprecated /** @deprecated Use SOURCE_SHIPPED_BYTES instead */ @@ -60,6 +61,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { void decrSizeOfLogQueue(int size); void incrLogEditsFiltered(long size); void incrBatchesShipped(int batches); + void incrFailedBatches(); void incrOpsShipped(long ops); void incrShippedBytes(long size); void incrLogReadInBytes(long size); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index 5eb5deb03f6..cc97d7491a9 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; + private final MutableFastCounter failedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedBytesCounter; @@ -68,6 +69,8 @@ public class MetricsReplicationGlobalSourceSourceImpl shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); + failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L); + shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); @@ -127,6 +130,10 @@ public class MetricsReplicationGlobalSourceSourceImpl shippedBatchesCounter.incr(batches); } + @Override public void incrFailedBatches() { + failedBatchesCounter.incr(); + } + @Override public void incrOpsShipped(long ops) { shippedOpsCounter.incr(ops); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java index ce45af5ccec..86bc60577a6 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java @@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS private final MutableHistogram ageHist; private final MutableFastCounter batchesCounter; + private final MutableFastCounter failedBatchesCounter; private final MutableFastCounter opsCounter; private final MutableFastCounter hfilesCounter; public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) { ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP); batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L); + failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L); opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L); hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L); } @@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS opsCounter.incr(batchsize); } + @Override + public void incrFailedBatches(){ + failedBatchesCounter.incr(); + } + + @Override + public long getFailedBatches() { + return failedBatchesCounter.value(); + } + @Override public long getLastAppliedOpAge() { return ageHist.getMax(); @@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS @Override public long getSinkAppliedOps() { return opsCounter.value(); } + } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 28499120119..bf1392ce9b7 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logEditsFilteredKey; private final String shippedBatchesKey; private final String shippedOpsKey; + private final String failedBatchesKey; private String keyPrefix; /** @@ -53,6 +54,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; + private final MutableFastCounter failedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedKBsCounter; private final MutableFastCounter shippedBytesCounter; @@ -91,6 +93,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou shippedBatchesKey = this.keyPrefix + "shippedBatches"; shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L); + failedBatchesKey = this.keyPrefix + "failedBatches"; + failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L); + shippedOpsKey = this.keyPrefix + "shippedOps"; shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L); @@ -167,6 +172,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou shippedBatchesCounter.incr(batches); } + @Override public void incrFailedBatches() { + failedBatchesCounter.incr(); + } + @Override public void incrOpsShipped(long ops) { shippedOpsCounter.incr(ops); } @@ -187,6 +196,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou rms.removeMetric(sizeOfLogQueueKey); rms.removeMetric(shippedBatchesKey); + rms.removeMetric(failedBatchesKey); rms.removeMetric(shippedOpsKey); rms.removeMetric(shippedKBsKey); rms.removeMetric(shippedBytesKey); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index ce785bbffd5..8f07c08c4eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -84,6 +84,21 @@ public class MetricsSink { mss.incrAppliedHFiles(hfileSize); } + /** + * Convenience method to update metrics when batch of operations has failed. + */ + public void incrementFailedBatches(){ + mss.incrFailedBatches(); + } + + /** + * Get the count of the failed bathes + * @return failedBatches + */ + protected long getFailedBatches() { + return mss.getFailedBatches(); + } + /** * Get the Age of Last Applied Op * @return ageOfLastAppliedOp diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 3b97e1ed3ab..3510254959b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource { globalSourceSource.incrShippedBytes(sizeInBytes); } + /** + * Convenience method to update metrics when batch of operations has failed. + */ + public void incrementFailedBatches(){ + singleSourceSource.incrFailedBatches(); + globalSourceSource.incrFailedBatches(); + } + + /** * Gets the number of edits not eligible for replication this source queue logs so far. * @return logEditsFiltered non-replicable edits filtered from this queue logs. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index b947c806373..34fbb55b1b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -190,6 +190,7 @@ public class ReplicationSink { for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off if (!cells.advance()) { + this.metrics.incrementFailedBatches(); throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } } @@ -202,6 +203,7 @@ public class ReplicationSink { for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off if (!cells.advance()) { + this.metrics.incrementFailedBatches(); throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } Cell cell = cells.current(); @@ -276,6 +278,7 @@ public class ReplicationSink { this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); + this.metrics.incrementFailedBatches(); throw ex; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index c40cf7e00a4..95719273828 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -222,6 +222,7 @@ public class ReplicationSourceShipper extends Thread { } break; } catch (Exception ex) { + source.getSourceMetrics().incrementFailedBatches(); LOG.warn("{} threw unknown exception:", source.getReplicationEndpoint().getClass().getName(), ex); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index 02eb3d92d05..a7625e6029c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; + +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -69,15 +71,14 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; + @Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSink { @@ -427,6 +428,67 @@ public class TestReplicationSink { // Clean up the created hfiles or it will mess up subsequent tests } + /** + * Test failure metrics produced for failed replication edits + */ + @Test + public void testFailedReplicationSinkMetrics() throws IOException { + long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches(); + long errorCount = 0L; + List entries = new ArrayList<>(BATCH_SIZE); + List cells = new ArrayList<>(); + for(int i = 0; i < BATCH_SIZE; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + cells.clear(); // cause IndexOutOfBoundsException + try { + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + Assert.fail("Should re-throw ArrayIndexOutOfBoundsException."); + } catch (ArrayIndexOutOfBoundsException e) { + errorCount++; + assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); + } + + entries.clear(); + cells.clear(); + TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException + for (int i = 0; i < BATCH_SIZE; i++) { + entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); + } + try { + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + Assert.fail("Should re-throw TableNotFoundException."); + } catch (TableNotFoundException e) { + errorCount++; + assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); + } + + entries.clear(); + cells.clear(); + for(int i = 0; i < BATCH_SIZE; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + // cause IOException in batch() + try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { + try (Admin admin = conn.getAdmin()) { + admin.disableTable(TABLE_NAME1); + try { + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + Assert.fail("Should re-throw IOException."); + } catch (IOException e) { + errorCount++; + assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); + } finally { + admin.enableTable(TABLE_NAME1); + } + } + } + } + + private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List cells) { byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row);