From ea63f24c5de0b743f2fa9f35e858e57941c7a03e Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Tue, 27 Jul 2010 04:58:31 +0000 Subject: [PATCH] HBASE-2838 Replication metrics git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@979532 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../regionserver/ReplicationSink.java | 18 ++-- .../regionserver/ReplicationSinkMetrics.java | 82 ++++++++++++++ .../regionserver/ReplicationSource.java | 37 ++++++- .../ReplicationSourceManager.java | 1 + .../ReplicationSourceMetrics.java | 101 ++++++++++++++++++ .../regionserver/ReplicationStatistics.java | 45 ++++++++ .../hbase/replication/TestReplication.java | 8 +- 8 files changed, 279 insertions(+), 14 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java create mode 100644 src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java create mode 100644 src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java diff --git a/CHANGES.txt b/CHANGES.txt index 49b0ee641a3..ff5aad7a4db 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -854,6 +854,7 @@ Release 0.21.0 - Unreleased HBASE-2223 Handle 10min+ network partitions between clusters HBASE-2862 Name DFSClient for Improved Debugging (Nicolas Spiegelberg via Stack) + HBASE-2838 Replication metrics OPTIMIZATIONS HBASE-410 [testing] Speed up the test suite diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 3eec1d70abd..3bed8bb13d6 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -22,8 +22,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; @@ -35,16 +33,10 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * This class is responsible for replicating the edits coming @@ -70,6 +62,7 @@ public class ReplicationSink { private final HTablePool pool; // boolean coming from HRS to know when the process stops private final AtomicBoolean stop; + private final ReplicationSinkMetrics metrics; /** * Create a sink for replication @@ -84,6 +77,7 @@ public class ReplicationSink { this.pool = new HTablePool(this.conf, conf.getInt("replication.sink.htablepool.capacity", 10)); this.stop = stopper; + this.metrics = new ReplicationSinkMetrics(); } /** @@ -95,6 +89,9 @@ public class ReplicationSink { */ public synchronized void replicateEntries(HLog.Entry[] entries) throws IOException { + if (entries.length == 0) { + return; + } // Very simple optimization where we batch sequences of rows going // to the same table. try { @@ -139,6 +136,9 @@ public class ReplicationSink { totalReplicated++; } put(lastTable, puts); + this.metrics.setAgeOfLastAppliedOp( + entries[entries.length-1].getKey().getWriteTime()); + this.metrics.appliedBatchesRate.inc(1); LOG.info("Total replicated: " + totalReplicated); } catch (IOException ex) { if (ex.getCause() instanceof TableNotFoundException) { @@ -173,6 +173,7 @@ public class ReplicationSink { try { table = this.pool.getTable(tableName); table.put(puts); + this.metrics.appliedOpsRate.inc(puts.size()); this.pool.putTable(table); puts.clear(); } finally { @@ -193,6 +194,7 @@ public class ReplicationSink { try { table = this.pool.getTable(tableName); table.delete(delete); + this.metrics.appliedOpsRate.inc(1); this.pool.putTable(table); } finally { if (table != null) { diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java new file mode 100644 index 00000000000..4c3b3256969 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java @@ -0,0 +1,82 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; +import org.apache.hadoop.hbase.metrics.MetricsRate; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.jvm.JvmMetrics; +import org.apache.hadoop.metrics.util.MetricsIntValue; +import org.apache.hadoop.metrics.util.MetricsLongValue; +import org.apache.hadoop.metrics.util.MetricsRegistry; + +/** + * This class is for maintaining the various replication statistics + * for a sink and publishing them through the metrics interfaces. + */ +public class ReplicationSinkMetrics implements Updater { + private final MetricsRecord metricsRecord; + private MetricsRegistry registry = new MetricsRegistry(); + private static ReplicationSinkMetrics instance; + + /** Rate of operations applied by the sink */ + public final MetricsRate appliedOpsRate = + new MetricsRate("appliedOpsRate", registry); + + /** Rate of batches (of operations) applied by the sink */ + public final MetricsRate appliedBatchesRate = + new MetricsRate("appliedBatchesRate", registry); + + /** Age of the last operation that was applied by the sink */ + private final MetricsLongValue ageOfLastAppliedOp = + new MetricsLongValue("ageOfLastAppliedOp", registry); + + /** + * Constructor used to register the metrics + */ + public ReplicationSinkMetrics() { + MetricsContext context = MetricsUtil.getContext("hbase"); + String name = Thread.currentThread().getName(); + metricsRecord = MetricsUtil.createRecord(context, "replication"); + metricsRecord.setTag("RegionServer", name); + context.registerUpdater(this); + // Add jvmmetrics. + JvmMetrics.init("RegionServer", name); + // export for JMX + new ReplicationStatistics(this.registry, "ReplicationSink"); + } + + /** + * Set the age of the last edit that was applied + * @param timestamp write time of the edit + */ + public void setAgeOfLastAppliedOp(long timestamp) { + ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp); + } + @Override + public void doUpdates(MetricsContext metricsContext) { + synchronized (this) { + this.appliedOpsRate.pushMetric(this.metricsRecord); + this.appliedBatchesRate.pushMetric(this.metricsRecord); + this.ageOfLastAppliedOp.pushMetric(this.metricsRecord); + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 855cd6f047e..32508de9279 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -53,7 +53,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; /** * Class that handles the source of a replication stream. @@ -119,8 +118,12 @@ public class ReplicationSource extends Thread private long maxRetriesMultiplier; // Current number of entries that we need to replicate private int currentNbEntries = 0; + // Current number of operations (Put/Delete) that we need to replicate + private int currentNbOperations = 0; // Indicates if this particular source is running private volatile boolean running = true; + // Metrics for this source + private ReplicationSourceMetrics metrics; /** * Instantiation method used by region servers @@ -167,6 +170,7 @@ public class ReplicationSource extends Thread this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.clusterId = Byte.valueOf(zkHelper.getClusterId()); + this.metrics = new ReplicationSourceMetrics(peerClusterZnode); // Finally look if this is a recovered queue this.checkIfQueueRecovered(peerClusterZnode); @@ -213,6 +217,7 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { this.queue.put(log); + this.metrics.sizeOfLogQueue.set(queue.size()); } @Override @@ -334,6 +339,7 @@ public class ReplicationSource extends Thread HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); while (entry != null) { WALEdit edit = entry.getEdit(); + this.metrics.logEditsReadRate.inc(1); seenEntries++; // Remove all KVs that should not be replicated removeNonReplicableEdits(edit); @@ -344,7 +350,10 @@ public class ReplicationSource extends Thread Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && edit.size() != 0 && replicating.get()) { logKey.setClusterId(this.clusterId); + currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; + } else { + this.metrics.logEditsFilteredRate.inc(1); } // Stop if too many entries or too big if ((this.reader.getPosition() - this.position) @@ -354,7 +363,7 @@ public class ReplicationSource extends Thread } entry = this.reader.next(entriesArray[currentNbEntries]); } - LOG.debug("currentNbEntries:" + currentNbEntries + + LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + " and size: " + (this.reader.getPosition() - this.position)); // If we didn't get anything and the queue has an object, it means we @@ -382,6 +391,7 @@ public class ReplicationSource extends Thread try { if (this.currentPath == null) { this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); + this.metrics.sizeOfLogQueue.set(queue.size()); } } catch (InterruptedException e) { LOG.warn("Interrupted while reading edits", e); @@ -484,6 +494,24 @@ public class ReplicationSource extends Thread } } + /** + * Count the number of different row keys in the given edit because of + * mini-batching. We assume that there's at least one KV in the WALEdit. + * @param edit edit to count row keys from + * @return number of different row keys + */ + private int countDistinctRowKeys(WALEdit edit) { + List kvs = edit.getKeyValues(); + int distinctRowKeys = 1; + KeyValue lastKV = kvs.get(0); + for (int i = 0; i < edit.size(); i++) { + if (!kvs.get(i).matchingRow(lastKV)) { + distinctRowKeys++; + } + } + return distinctRowKeys; + } + /** * Do the shipping logic */ @@ -497,6 +525,11 @@ public class ReplicationSource extends Thread this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.position, queueRecovered); this.totalReplicatedEdits += currentNbEntries; + this.metrics.shippedBatchesRate.inc(1); + this.metrics.shippedOpsRate.inc( + this.currentNbOperations); + this.metrics.setAgeOfLastShippedOp( + this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime()); LOG.debug("Replicated in total: " + this.totalReplicatedEdits); break; diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 05010481fb7..8046b7389ed 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -219,6 +219,7 @@ public class ReplicationSourceManager implements LogActionsListener { this.hlogs.add(newLog.getName()); } this.latestPath = newLog; + // This only update the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { source.enqueueLog(newLog); } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java new file mode 100644 index 00000000000..dca63134bf9 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java @@ -0,0 +1,101 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; +import org.apache.hadoop.hbase.metrics.MetricsRate; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.jvm.JvmMetrics; +import org.apache.hadoop.metrics.util.MetricsIntValue; +import org.apache.hadoop.metrics.util.MetricsLongValue; +import org.apache.hadoop.metrics.util.MetricsRegistry; + +/** + * This class is for maintaining the various replication statistics + * for a source and publishing them through the metrics interfaces. + */ +public class ReplicationSourceMetrics implements Updater { + private final MetricsRecord metricsRecord; + private MetricsRegistry registry = new MetricsRegistry(); + + /** Rate of shipped operations by the source */ + public final MetricsRate shippedOpsRate = + new MetricsRate("shippedOpsRate", registry); + + /** Rate of shipped batches by the source */ + public final MetricsRate shippedBatchesRate = + new MetricsRate("shippedBatchesRate", registry); + + /** Rate of log entries (can be multiple Puts) read from the logs */ + public final MetricsRate logEditsReadRate = + new MetricsRate("logEditsReadRate", registry); + + /** Rate of log entries filtered by the source */ + public final MetricsRate logEditsFilteredRate = + new MetricsRate("logEditsFilteredRate", registry); + + /** Age of the last operation that was shipped by the source */ + private final MetricsLongValue ageOfLastShippedOp = + new MetricsLongValue("ageOfLastShippedOp", registry); + + /** + * Current size of the queue of logs to replicate, + * excluding the one being processed at the moment + */ + public final MetricsIntValue sizeOfLogQueue = + new MetricsIntValue("sizeOfLogQueue", registry); + + /** + * Constructor used to register the metrics + * @param id Name of the source this class is monitoring + */ + public ReplicationSourceMetrics(String id) { + MetricsContext context = MetricsUtil.getContext("hbase"); + String name = Thread.currentThread().getName(); + metricsRecord = MetricsUtil.createRecord(context, "replication"); + metricsRecord.setTag("RegionServer", name); + context.registerUpdater(this); + // Add jvmmetrics. + JvmMetrics.init("RegionServer", name); + // export for JMX + new ReplicationStatistics(this.registry, "ReplicationSource for " + id); + } + + /** + * Set the age of the last edit that was shipped + * @param timestamp write time of the edit + */ + public void setAgeOfLastShippedOp(long timestamp) { + ageOfLastShippedOp.set(System.currentTimeMillis() - timestamp); + } + + @Override + public void doUpdates(MetricsContext metricsContext) { + synchronized (this) { + this.shippedOpsRate.pushMetric(this.metricsRecord); + this.shippedBatchesRate.pushMetric(this.metricsRecord); + this.logEditsReadRate.pushMetric(this.metricsRecord); + this.logEditsFilteredRate.pushMetric(this.metricsRecord); + this.ageOfLastShippedOp.pushMetric(this.metricsRecord); + this.sizeOfLogQueue.pushMetric(this.metricsRecord); + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java new file mode 100644 index 00000000000..54ca3df75ff --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java @@ -0,0 +1,45 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.metrics.MetricsMBeanBase; +import org.apache.hadoop.metrics.util.MBeanUtil; +import org.apache.hadoop.metrics.util.MetricsRegistry; + +import javax.management.ObjectName; + +/** + * Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean + * for JMX monitoring. + */ +public class ReplicationStatistics extends MetricsMBeanBase { + + private final ObjectName mbeanName; + + /** + * Constructor to register the MBean + * @param registry which rehistry to use + * @param name name to get to this bean + */ + public ReplicationStatistics(MetricsRegistry registry, String name) { + super(registry, name); + mbeanName = MBeanUtil.registerMBean("Replication", name, this); + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index 28a6e5163ff..31cc6805c8d 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -180,7 +180,7 @@ public class TestReplication { * Add a row, check it's replicated, delete it, check's gone * @throws Exception */ - //@Test + @Test public void testSimplePutDelete() throws Exception { LOG.info("testSimplePutDelete"); Put put = new Put(row); @@ -228,7 +228,7 @@ public class TestReplication { * Try a small batch upload using the write buffer, check it's replicated * @throws Exception */ - //@Test + @Test public void testSmallBatch() throws Exception { LOG.info("testSmallBatch"); Put put; @@ -272,7 +272,7 @@ public class TestReplication { * replicated, enable it, try replicating and it should work * @throws Exception */ - //@Test + @Test public void testStartStop() throws Exception { // Test stopping replication @@ -341,7 +341,7 @@ public class TestReplication { * hlog rolling and other non-trivial code paths * @throws Exception */ - //@Test + @Test public void loadTesting() throws Exception { htable1.setWriteBufferSize(1024); htable1.setAutoFlush(false);