HBASE-26581 Add metrics for failed replication edits (#4347)

Co-authored-by: Briana Augenreich <baugenreich@hubspot.com>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
Bri Augenreich 2022-04-26 17:42:54 -04:00 committed by Bryan Beaudreault
parent 828025bb37
commit 282c1c2737
10 changed files with 127 additions and 2 deletions

View File

@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface MetricsReplicationSinkSource { public interface MetricsReplicationSinkSource {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
public static final String SINK_FAILED_BATCHES = "sink.failedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps"; public static final String SINK_APPLIED_OPS = "sink.appliedOps";
public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles"; public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
void setLastAppliedOpAge(long age); void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches); void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize); void incrAppliedOps(long batchsize);
void incrFailedBatches();
long getLastAppliedOpAge(); long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize); void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps(); long getSinkAppliedOps();
long getFailedBatches();
} }

View File

@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
public static final String SOURCE_FAILED_BATCHES = "source.failedBatches";
@Deprecated @Deprecated
/** @deprecated Use SOURCE_SHIPPED_BYTES instead */ /** @deprecated Use SOURCE_SHIPPED_BYTES instead */
@ -60,6 +61,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void decrSizeOfLogQueue(int size); void decrSizeOfLogQueue(int size);
void incrLogEditsFiltered(long size); void incrLogEditsFiltered(long size);
void incrBatchesShipped(int batches); void incrBatchesShipped(int batches);
void incrFailedBatches();
void incrOpsShipped(long ops); void incrOpsShipped(long ops);
void incrShippedBytes(long size); void incrShippedBytes(long size);
void incrLogReadInBytes(long size); void incrLogReadInBytes(long size);

View File

@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter; private final MutableFastCounter shippedBytesCounter;
@ -68,6 +69,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
@ -127,6 +130,10 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter.incr(batches); shippedBatchesCounter.incr(batches);
} }
@Override public void incrFailedBatches() {
failedBatchesCounter.incr();
}
@Override public void incrOpsShipped(long ops) { @Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops); shippedOpsCounter.incr(ops);
} }

View File

@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
private final MutableHistogram ageHist; private final MutableHistogram ageHist;
private final MutableFastCounter batchesCounter; private final MutableFastCounter batchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter opsCounter; private final MutableFastCounter opsCounter;
private final MutableFastCounter hfilesCounter; private final MutableFastCounter hfilesCounter;
public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) { public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP); ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L); batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L); opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L); hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
} }
@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
opsCounter.incr(batchsize); opsCounter.incr(batchsize);
} }
@Override
public void incrFailedBatches(){
failedBatchesCounter.incr();
}
@Override
public long getFailedBatches() {
return failedBatchesCounter.value();
}
@Override @Override
public long getLastAppliedOpAge() { public long getLastAppliedOpAge() {
return ageHist.getMax(); return ageHist.getMax();
@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
@Override public long getSinkAppliedOps() { @Override public long getSinkAppliedOps() {
return opsCounter.value(); return opsCounter.value();
} }
} }

View File

@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey; private final String logEditsFilteredKey;
private final String shippedBatchesKey; private final String shippedBatchesKey;
private final String shippedOpsKey; private final String shippedOpsKey;
private final String failedBatchesKey;
private String keyPrefix; private String keyPrefix;
/** /**
@ -53,6 +54,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedKBsCounter; private final MutableFastCounter shippedKBsCounter;
private final MutableFastCounter shippedBytesCounter; private final MutableFastCounter shippedBytesCounter;
@ -91,6 +93,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesKey = this.keyPrefix + "shippedBatches"; shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L); shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
failedBatchesKey = this.keyPrefix + "failedBatches";
failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);
shippedOpsKey = this.keyPrefix + "shippedOps"; shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L); shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
@ -167,6 +172,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesCounter.incr(batches); shippedBatchesCounter.incr(batches);
} }
@Override public void incrFailedBatches() {
failedBatchesCounter.incr();
}
@Override public void incrOpsShipped(long ops) { @Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops); shippedOpsCounter.incr(ops);
} }
@ -187,6 +196,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(sizeOfLogQueueKey); rms.removeMetric(sizeOfLogQueueKey);
rms.removeMetric(shippedBatchesKey); rms.removeMetric(shippedBatchesKey);
rms.removeMetric(failedBatchesKey);
rms.removeMetric(shippedOpsKey); rms.removeMetric(shippedOpsKey);
rms.removeMetric(shippedKBsKey); rms.removeMetric(shippedKBsKey);
rms.removeMetric(shippedBytesKey); rms.removeMetric(shippedBytesKey);

View File

@ -83,6 +83,21 @@ public class MetricsSink {
mss.incrAppliedHFiles(hfileSize); mss.incrAppliedHFiles(hfileSize);
} }
/**
* Convenience method to update metrics when batch of operations has failed.
*/
public void incrementFailedBatches(){
mss.incrFailedBatches();
}
/**
* Get the count of the failed bathes
* @return failedBatches
*/
protected long getFailedBatches() {
return mss.getFailedBatches();
}
/** /**
* Get the Age of Last Applied Op * Get the Age of Last Applied Op
* @return ageOfLastAppliedOp * @return ageOfLastAppliedOp

View File

@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrShippedBytes(sizeInBytes); globalSourceSource.incrShippedBytes(sizeInBytes);
} }
/**
* Convenience method to update metrics when batch of operations has failed.
*/
public void incrementFailedBatches(){
singleSourceSource.incrFailedBatches();
globalSourceSource.incrFailedBatches();
}
/** /**
* Gets the number of edits not eligible for replication this source queue logs so far. * Gets the number of edits not eligible for replication this source queue logs so far.
* @return logEditsFiltered non-replicable edits filtered from this queue logs. * @return logEditsFiltered non-replicable edits filtered from this queue logs.

View File

@ -190,6 +190,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off // Throw index out of bounds if our cell count is off
if (!cells.advance()) { if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
} }
} }
@ -202,6 +203,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off // Throw index out of bounds if our cell count is off
if (!cells.advance()) { if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
} }
Cell cell = cells.current(); Cell cell = cells.current();
@ -276,6 +278,7 @@ public class ReplicationSink {
this.totalReplicatedEdits.addAndGet(totalReplicated); this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex); LOG.error("Unable to accept edit because:", ex);
this.metrics.incrementFailedBatches();
throw ex; throw ex;
} }
} }

View File

@ -221,6 +221,7 @@ public class ReplicationSourceShipper extends Thread {
} }
break; break;
} catch (Exception ex) { } catch (Exception ex) {
source.getSourceMetrics().incrementFailedBatches();
LOG.warn("{} threw unknown exception:", LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex); source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -68,15 +70,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
@Category({ReplicationTests.class, LargeTests.class}) @Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink { public class TestReplicationSink {
@ -426,6 +427,67 @@ public class TestReplicationSink {
// Clean up the created hfiles or it will mess up subsequent tests // Clean up the created hfiles or it will mess up subsequent tests
} }
/**
* Test failure metrics produced for failed replication edits
*/
@Test
public void testFailedReplicationSinkMetrics() throws IOException {
long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
long errorCount = 0L;
List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
List<Cell> cells = new ArrayList<>();
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
cells.clear(); // cause IndexOutOfBoundsException
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
} catch (ArrayIndexOutOfBoundsException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
}
entries.clear();
cells.clear();
TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
for (int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
}
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw TableNotFoundException.");
} catch (TableNotFoundException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
}
entries.clear();
cells.clear();
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
// cause IOException in batch()
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
try (Admin admin = conn.getAdmin()) {
admin.disableTable(TABLE_NAME1);
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw IOException.");
} catch (IOException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
} finally {
admin.enableTable(TABLE_NAME1);
}
}
}
}
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) { private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row); byte[] rowBytes = Bytes.toBytes(row);