HBASE-26581 Add metrics for failed replication edits (#4347)
Co-authored-by: Briana Augenreich <baugenreich@hubspot.com> Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
parent
ba713ac379
commit
ee0c921258
|
@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
|
|||
private final MutableFastCounter logReadInEditsCounter;
|
||||
private final MutableFastCounter walEditsFilteredCounter;
|
||||
private final MutableFastCounter shippedBatchesCounter;
|
||||
private final MutableFastCounter failedBatchesCounter;
|
||||
private final MutableFastCounter shippedOpsCounter;
|
||||
private final MutableFastCounter shippedBytesCounter;
|
||||
private final MutableFastCounter logReadInBytesCounter;
|
||||
|
@ -62,6 +63,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
|
|||
|
||||
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
|
||||
|
||||
failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
|
||||
|
||||
shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
|
||||
|
||||
shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
|
||||
|
@ -119,6 +122,10 @@ public class MetricsReplicationGlobalSourceSourceImpl
|
|||
shippedBatchesCounter.incr(batches);
|
||||
}
|
||||
|
||||
@Override public void incrFailedBatches() {
|
||||
failedBatchesCounter.incr();
|
||||
}
|
||||
|
||||
@Override public void incrOpsShipped(long ops) {
|
||||
shippedOpsCounter.incr(ops);
|
||||
}
|
||||
|
|
|
@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
public interface MetricsReplicationSinkSource {
|
||||
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
|
||||
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
|
||||
public static final String SINK_FAILED_BATCHES = "sink.failedBatches";
|
||||
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
|
||||
public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
|
||||
|
||||
void setLastAppliedOpAge(long age);
|
||||
void incrAppliedBatches(long batches);
|
||||
void incrAppliedOps(long batchsize);
|
||||
void incrFailedBatches();
|
||||
long getLastAppliedOpAge();
|
||||
void incrAppliedHFiles(long hfileSize);
|
||||
long getSinkAppliedOps();
|
||||
long getFailedBatches();
|
||||
}
|
||||
|
|
|
@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
|
|||
|
||||
private final MutableHistogram ageHist;
|
||||
private final MutableFastCounter batchesCounter;
|
||||
private final MutableFastCounter failedBatchesCounter;
|
||||
private final MutableFastCounter opsCounter;
|
||||
private final MutableFastCounter hfilesCounter;
|
||||
|
||||
public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
|
||||
ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
|
||||
batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
|
||||
failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L);
|
||||
opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
|
||||
hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
|
||||
}
|
||||
|
@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
|
|||
opsCounter.incr(batchsize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrFailedBatches(){
|
||||
failedBatchesCounter.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFailedBatches() {
|
||||
return failedBatchesCounter.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastAppliedOpAge() {
|
||||
return ageHist.getMax();
|
||||
|
@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
|
|||
@Override public long getSinkAppliedOps() {
|
||||
return opsCounter.value();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
|||
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
|
||||
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
|
||||
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
|
||||
public static final String SOURCE_FAILED_BATCHES = "source.failedBatches";
|
||||
|
||||
public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
|
||||
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
|
||||
|
@ -57,6 +58,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
|||
void decrSizeOfLogQueue(int size);
|
||||
void incrLogEditsFiltered(long size);
|
||||
void incrBatchesShipped(int batches);
|
||||
void incrFailedBatches();
|
||||
void incrOpsShipped(long ops);
|
||||
void incrShippedBytes(long size);
|
||||
void incrLogReadInBytes(long size);
|
||||
|
|
|
@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final String logEditsFilteredKey;
|
||||
private final String shippedBatchesKey;
|
||||
private final String shippedOpsKey;
|
||||
private final String failedBatchesKey;
|
||||
private String keyPrefix;
|
||||
|
||||
private final String shippedBytesKey;
|
||||
|
@ -48,6 +49,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final MutableFastCounter logReadInEditsCounter;
|
||||
private final MutableFastCounter walEditsFilteredCounter;
|
||||
private final MutableFastCounter shippedBatchesCounter;
|
||||
private final MutableFastCounter failedBatchesCounter;
|
||||
private final MutableFastCounter shippedOpsCounter;
|
||||
private final MutableFastCounter shippedBytesCounter;
|
||||
private final MutableFastCounter logReadInBytesCounter;
|
||||
|
@ -85,6 +87,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
shippedBatchesKey = this.keyPrefix + "shippedBatches";
|
||||
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
|
||||
|
||||
failedBatchesKey = this.keyPrefix + "failedBatches";
|
||||
failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);
|
||||
|
||||
shippedOpsKey = this.keyPrefix + "shippedOps";
|
||||
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
|
||||
|
||||
|
@ -158,6 +163,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
shippedBatchesCounter.incr(batches);
|
||||
}
|
||||
|
||||
@Override public void incrFailedBatches() {
|
||||
failedBatchesCounter.incr();
|
||||
}
|
||||
|
||||
@Override public void incrOpsShipped(long ops) {
|
||||
shippedOpsCounter.incr(ops);
|
||||
}
|
||||
|
@ -176,6 +185,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
rms.removeMetric(sizeOfLogQueueKey);
|
||||
|
||||
rms.removeMetric(shippedBatchesKey);
|
||||
rms.removeMetric(failedBatchesKey);
|
||||
rms.removeMetric(shippedOpsKey);
|
||||
rms.removeMetric(shippedBytesKey);
|
||||
|
||||
|
|
|
@ -84,6 +84,21 @@ public class MetricsSink {
|
|||
mss.incrAppliedHFiles(hfileSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to update metrics when batch of operations has failed.
|
||||
*/
|
||||
public void incrementFailedBatches(){
|
||||
mss.incrFailedBatches();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the count of the failed bathes
|
||||
* @return failedBatches
|
||||
*/
|
||||
protected long getFailedBatches() {
|
||||
return mss.getFailedBatches();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Age of Last Applied Op
|
||||
* @return ageOfLastAppliedOp
|
||||
|
|
|
@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource {
|
|||
globalSourceSource.incrShippedBytes(sizeInBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to update metrics when batch of operations has failed.
|
||||
*/
|
||||
public void incrementFailedBatches(){
|
||||
singleSourceSource.incrFailedBatches();
|
||||
globalSourceSource.incrFailedBatches();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets the number of edits not eligible for replication this source queue logs so far.
|
||||
* @return logEditsFiltered non-replicable edits filtered from this queue logs.
|
||||
|
|
|
@ -193,6 +193,7 @@ public class ReplicationSink {
|
|||
for (int i = 0; i < count; i++) {
|
||||
// Throw index out of bounds if our cell count is off
|
||||
if (!cells.advance()) {
|
||||
this.metrics.incrementFailedBatches();
|
||||
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
|
||||
}
|
||||
}
|
||||
|
@ -205,6 +206,7 @@ public class ReplicationSink {
|
|||
for (int i = 0; i < count; i++) {
|
||||
// Throw index out of bounds if our cell count is off
|
||||
if (!cells.advance()) {
|
||||
this.metrics.incrementFailedBatches();
|
||||
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
|
||||
}
|
||||
Cell cell = cells.current();
|
||||
|
@ -281,6 +283,7 @@ public class ReplicationSink {
|
|||
this.totalReplicatedEdits.addAndGet(totalReplicated);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Unable to accept edit because:", ex);
|
||||
this.metrics.incrementFailedBatches();
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -230,6 +230,7 @@ public class ReplicationSourceShipper extends Thread {
|
|||
}
|
||||
break;
|
||||
} catch (Exception ex) {
|
||||
source.getSourceMetrics().incrementFailedBatches();
|
||||
LOG.warn("{} threw unknown exception:",
|
||||
source.getReplicationEndpoint().getClass().getName(), ex);
|
||||
if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -69,15 +71,14 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
|
||||
|
||||
|
||||
@Category({ReplicationTests.class, LargeTests.class})
|
||||
public class TestReplicationSink {
|
||||
|
||||
|
@ -427,6 +428,67 @@ public class TestReplicationSink {
|
|||
// Clean up the created hfiles or it will mess up subsequent tests
|
||||
}
|
||||
|
||||
/**
|
||||
* Test failure metrics produced for failed replication edits
|
||||
*/
|
||||
@Test
|
||||
public void testFailedReplicationSinkMetrics() throws IOException {
|
||||
long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
|
||||
long errorCount = 0L;
|
||||
List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
for(int i = 0; i < BATCH_SIZE; i++) {
|
||||
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
|
||||
}
|
||||
cells.clear(); // cause IndexOutOfBoundsException
|
||||
try {
|
||||
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
errorCount++;
|
||||
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
|
||||
}
|
||||
|
||||
entries.clear();
|
||||
cells.clear();
|
||||
TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
|
||||
for (int i = 0; i < BATCH_SIZE; i++) {
|
||||
entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
|
||||
}
|
||||
try {
|
||||
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
Assert.fail("Should re-throw TableNotFoundException.");
|
||||
} catch (TableNotFoundException e) {
|
||||
errorCount++;
|
||||
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
|
||||
}
|
||||
|
||||
entries.clear();
|
||||
cells.clear();
|
||||
for(int i = 0; i < BATCH_SIZE; i++) {
|
||||
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
|
||||
}
|
||||
// cause IOException in batch()
|
||||
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
|
||||
try (Admin admin = conn.getAdmin()) {
|
||||
admin.disableTable(TABLE_NAME1);
|
||||
try {
|
||||
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
Assert.fail("Should re-throw IOException.");
|
||||
} catch (IOException e) {
|
||||
errorCount++;
|
||||
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
|
||||
} finally {
|
||||
admin.enableTable(TABLE_NAME1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
|
||||
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
|
||||
byte[] rowBytes = Bytes.toBytes(row);
|
||||
|
|
Loading…
Reference in New Issue