HBASE-2838 Replication metrics

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@979532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-07-27 04:58:31 +00:00
parent 90908553e9
commit ea63f24c5d
8 changed files with 279 additions and 14 deletions

View File

@ -854,6 +854,7 @@ Release 0.21.0 - Unreleased
HBASE-2223 Handle 10min+ network partitions between clusters HBASE-2223 Handle 10min+ network partitions between clusters
HBASE-2862 Name DFSClient for Improved Debugging HBASE-2862 Name DFSClient for Improved Debugging
(Nicolas Spiegelberg via Stack) (Nicolas Spiegelberg via Stack)
HBASE-2838 Replication metrics
OPTIMIZATIONS OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite HBASE-410 [testing] Speed up the test suite

View File

@ -22,8 +22,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
@ -35,16 +33,10 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* This class is responsible for replicating the edits coming * This class is responsible for replicating the edits coming
@ -70,6 +62,7 @@ public class ReplicationSink {
private final HTablePool pool; private final HTablePool pool;
// boolean coming from HRS to know when the process stops // boolean coming from HRS to know when the process stops
private final AtomicBoolean stop; private final AtomicBoolean stop;
private final ReplicationSinkMetrics metrics;
/** /**
* Create a sink for replication * Create a sink for replication
@ -84,6 +77,7 @@ public class ReplicationSink {
this.pool = new HTablePool(this.conf, this.pool = new HTablePool(this.conf,
conf.getInt("replication.sink.htablepool.capacity", 10)); conf.getInt("replication.sink.htablepool.capacity", 10));
this.stop = stopper; this.stop = stopper;
this.metrics = new ReplicationSinkMetrics();
} }
/** /**
@ -95,6 +89,9 @@ public class ReplicationSink {
*/ */
public synchronized void replicateEntries(HLog.Entry[] entries) public synchronized void replicateEntries(HLog.Entry[] entries)
throws IOException { throws IOException {
if (entries.length == 0) {
return;
}
// Very simple optimization where we batch sequences of rows going // Very simple optimization where we batch sequences of rows going
// to the same table. // to the same table.
try { try {
@ -139,6 +136,9 @@ public class ReplicationSink {
totalReplicated++; totalReplicated++;
} }
put(lastTable, puts); put(lastTable, puts);
this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime());
this.metrics.appliedBatchesRate.inc(1);
LOG.info("Total replicated: " + totalReplicated); LOG.info("Total replicated: " + totalReplicated);
} catch (IOException ex) { } catch (IOException ex) {
if (ex.getCause() instanceof TableNotFoundException) { if (ex.getCause() instanceof TableNotFoundException) {
@ -173,6 +173,7 @@ public class ReplicationSink {
try { try {
table = this.pool.getTable(tableName); table = this.pool.getTable(tableName);
table.put(puts); table.put(puts);
this.metrics.appliedOpsRate.inc(puts.size());
this.pool.putTable(table); this.pool.putTable(table);
puts.clear(); puts.clear();
} finally { } finally {
@ -193,6 +194,7 @@ public class ReplicationSink {
try { try {
table = this.pool.getTable(tableName); table = this.pool.getTable(tableName);
table.delete(delete); table.delete(delete);
this.metrics.appliedOpsRate.inc(1);
this.pool.putTable(table); this.pool.putTable(table);
} finally { } finally {
if (table != null) { if (table != null) {

View File

@ -0,0 +1,82 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.metrics.MetricsRate;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.hadoop.metrics.util.MetricsIntValue;
import org.apache.hadoop.metrics.util.MetricsLongValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
/**
* This class is for maintaining the various replication statistics
* for a sink and publishing them through the metrics interfaces.
*/
public class ReplicationSinkMetrics implements Updater {
private final MetricsRecord metricsRecord;
private MetricsRegistry registry = new MetricsRegistry();
private static ReplicationSinkMetrics instance;
/** Rate of operations applied by the sink */
public final MetricsRate appliedOpsRate =
new MetricsRate("appliedOpsRate", registry);
/** Rate of batches (of operations) applied by the sink */
public final MetricsRate appliedBatchesRate =
new MetricsRate("appliedBatchesRate", registry);
/** Age of the last operation that was applied by the sink */
private final MetricsLongValue ageOfLastAppliedOp =
new MetricsLongValue("ageOfLastAppliedOp", registry);
/**
* Constructor used to register the metrics
*/
public ReplicationSinkMetrics() {
MetricsContext context = MetricsUtil.getContext("hbase");
String name = Thread.currentThread().getName();
metricsRecord = MetricsUtil.createRecord(context, "replication");
metricsRecord.setTag("RegionServer", name);
context.registerUpdater(this);
// Add jvmmetrics.
JvmMetrics.init("RegionServer", name);
// export for JMX
new ReplicationStatistics(this.registry, "ReplicationSink");
}
/**
* Set the age of the last edit that was applied
* @param timestamp write time of the edit
*/
public void setAgeOfLastAppliedOp(long timestamp) {
ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp);
}
@Override
public void doUpdates(MetricsContext metricsContext) {
synchronized (this) {
this.appliedOpsRate.pushMetric(this.metricsRecord);
this.appliedBatchesRate.pushMetric(this.metricsRecord);
this.ageOfLastAppliedOp.pushMetric(this.metricsRecord);
}
}
}

View File

@ -53,7 +53,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* Class that handles the source of a replication stream. * Class that handles the source of a replication stream.
@ -119,8 +118,12 @@ public class ReplicationSource extends Thread
private long maxRetriesMultiplier; private long maxRetriesMultiplier;
// Current number of entries that we need to replicate // Current number of entries that we need to replicate
private int currentNbEntries = 0; private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Indicates if this particular source is running // Indicates if this particular source is running
private volatile boolean running = true; private volatile boolean running = true;
// Metrics for this source
private ReplicationSourceMetrics metrics;
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
@ -167,6 +170,7 @@ public class ReplicationSource extends Thread
this.conf.getLong("replication.source.sleepforretries", 1000); this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs; this.fs = fs;
this.clusterId = Byte.valueOf(zkHelper.getClusterId()); this.clusterId = Byte.valueOf(zkHelper.getClusterId());
this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
// Finally look if this is a recovered queue // Finally look if this is a recovered queue
this.checkIfQueueRecovered(peerClusterZnode); this.checkIfQueueRecovered(peerClusterZnode);
@ -213,6 +217,7 @@ public class ReplicationSource extends Thread
@Override @Override
public void enqueueLog(Path log) { public void enqueueLog(Path log) {
this.queue.put(log); this.queue.put(log);
this.metrics.sizeOfLogQueue.set(queue.size());
} }
@Override @Override
@ -334,6 +339,7 @@ public class ReplicationSource extends Thread
HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
while (entry != null) { while (entry != null) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
this.metrics.logEditsReadRate.inc(1);
seenEntries++; seenEntries++;
// Remove all KVs that should not be replicated // Remove all KVs that should not be replicated
removeNonReplicableEdits(edit); removeNonReplicableEdits(edit);
@ -344,7 +350,10 @@ public class ReplicationSource extends Thread
Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
edit.size() != 0 && replicating.get()) { edit.size() != 0 && replicating.get()) {
logKey.setClusterId(this.clusterId); logKey.setClusterId(this.clusterId);
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++; currentNbEntries++;
} else {
this.metrics.logEditsFilteredRate.inc(1);
} }
// Stop if too many entries or too big // Stop if too many entries or too big
if ((this.reader.getPosition() - this.position) if ((this.reader.getPosition() - this.position)
@ -354,7 +363,7 @@ public class ReplicationSource extends Thread
} }
entry = this.reader.next(entriesArray[currentNbEntries]); entry = this.reader.next(entriesArray[currentNbEntries]);
} }
LOG.debug("currentNbEntries:" + currentNbEntries + LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries + " and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - this.position)); " and size: " + (this.reader.getPosition() - this.position));
// If we didn't get anything and the queue has an object, it means we // If we didn't get anything and the queue has an object, it means we
@ -382,6 +391,7 @@ public class ReplicationSource extends Thread
try { try {
if (this.currentPath == null) { if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
this.metrics.sizeOfLogQueue.set(queue.size());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e); LOG.warn("Interrupted while reading edits", e);
@ -484,6 +494,24 @@ public class ReplicationSource extends Thread
} }
} }
/**
* Count the number of different row keys in the given edit because of
* mini-batching. We assume that there's at least one KV in the WALEdit.
* @param edit edit to count row keys from
* @return number of different row keys
*/
private int countDistinctRowKeys(WALEdit edit) {
List<KeyValue> kvs = edit.getKeyValues();
int distinctRowKeys = 1;
KeyValue lastKV = kvs.get(0);
for (int i = 0; i < edit.size(); i++) {
if (!kvs.get(i).matchingRow(lastKV)) {
distinctRowKeys++;
}
}
return distinctRowKeys;
}
/** /**
* Do the shipping logic * Do the shipping logic
*/ */
@ -497,6 +525,11 @@ public class ReplicationSource extends Thread
this.manager.logPositionAndCleanOldLogs(this.currentPath, this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered); this.peerClusterZnode, this.position, queueRecovered);
this.totalReplicatedEdits += currentNbEntries; this.totalReplicatedEdits += currentNbEntries;
this.metrics.shippedBatchesRate.inc(1);
this.metrics.shippedOpsRate.inc(
this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp(
this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
LOG.debug("Replicated in total: " + this.totalReplicatedEdits); LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
break; break;

View File

@ -219,6 +219,7 @@ public class ReplicationSourceManager implements LogActionsListener {
this.hlogs.add(newLog.getName()); this.hlogs.add(newLog.getName());
} }
this.latestPath = newLog; this.latestPath = newLog;
// This only update the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) { for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog); source.enqueueLog(newLog);
} }

View File

@ -0,0 +1,101 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.metrics.MetricsRate;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.hadoop.metrics.util.MetricsIntValue;
import org.apache.hadoop.metrics.util.MetricsLongValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
/**
* This class is for maintaining the various replication statistics
* for a source and publishing them through the metrics interfaces.
*/
public class ReplicationSourceMetrics implements Updater {
private final MetricsRecord metricsRecord;
private MetricsRegistry registry = new MetricsRegistry();
/** Rate of shipped operations by the source */
public final MetricsRate shippedOpsRate =
new MetricsRate("shippedOpsRate", registry);
/** Rate of shipped batches by the source */
public final MetricsRate shippedBatchesRate =
new MetricsRate("shippedBatchesRate", registry);
/** Rate of log entries (can be multiple Puts) read from the logs */
public final MetricsRate logEditsReadRate =
new MetricsRate("logEditsReadRate", registry);
/** Rate of log entries filtered by the source */
public final MetricsRate logEditsFilteredRate =
new MetricsRate("logEditsFilteredRate", registry);
/** Age of the last operation that was shipped by the source */
private final MetricsLongValue ageOfLastShippedOp =
new MetricsLongValue("ageOfLastShippedOp", registry);
/**
* Current size of the queue of logs to replicate,
* excluding the one being processed at the moment
*/
public final MetricsIntValue sizeOfLogQueue =
new MetricsIntValue("sizeOfLogQueue", registry);
/**
* Constructor used to register the metrics
* @param id Name of the source this class is monitoring
*/
public ReplicationSourceMetrics(String id) {
MetricsContext context = MetricsUtil.getContext("hbase");
String name = Thread.currentThread().getName();
metricsRecord = MetricsUtil.createRecord(context, "replication");
metricsRecord.setTag("RegionServer", name);
context.registerUpdater(this);
// Add jvmmetrics.
JvmMetrics.init("RegionServer", name);
// export for JMX
new ReplicationStatistics(this.registry, "ReplicationSource for " + id);
}
/**
* Set the age of the last edit that was shipped
* @param timestamp write time of the edit
*/
public void setAgeOfLastShippedOp(long timestamp) {
ageOfLastShippedOp.set(System.currentTimeMillis() - timestamp);
}
@Override
public void doUpdates(MetricsContext metricsContext) {
synchronized (this) {
this.shippedOpsRate.pushMetric(this.metricsRecord);
this.shippedBatchesRate.pushMetric(this.metricsRecord);
this.logEditsReadRate.pushMetric(this.metricsRecord);
this.logEditsFilteredRate.pushMetric(this.metricsRecord);
this.ageOfLastShippedOp.pushMetric(this.metricsRecord);
this.sizeOfLogQueue.pushMetric(this.metricsRecord);
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.metrics.MetricsMBeanBase;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import javax.management.ObjectName;
/**
* Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean
* for JMX monitoring.
*/
public class ReplicationStatistics extends MetricsMBeanBase {
private final ObjectName mbeanName;
/**
* Constructor to register the MBean
* @param registry which rehistry to use
* @param name name to get to this bean
*/
public ReplicationStatistics(MetricsRegistry registry, String name) {
super(registry, name);
mbeanName = MBeanUtil.registerMBean("Replication", name, this);
}
}

View File

@ -180,7 +180,7 @@ public class TestReplication {
* Add a row, check it's replicated, delete it, check's gone * Add a row, check it's replicated, delete it, check's gone
* @throws Exception * @throws Exception
*/ */
//@Test @Test
public void testSimplePutDelete() throws Exception { public void testSimplePutDelete() throws Exception {
LOG.info("testSimplePutDelete"); LOG.info("testSimplePutDelete");
Put put = new Put(row); Put put = new Put(row);
@ -228,7 +228,7 @@ public class TestReplication {
* Try a small batch upload using the write buffer, check it's replicated * Try a small batch upload using the write buffer, check it's replicated
* @throws Exception * @throws Exception
*/ */
//@Test @Test
public void testSmallBatch() throws Exception { public void testSmallBatch() throws Exception {
LOG.info("testSmallBatch"); LOG.info("testSmallBatch");
Put put; Put put;
@ -272,7 +272,7 @@ public class TestReplication {
* replicated, enable it, try replicating and it should work * replicated, enable it, try replicating and it should work
* @throws Exception * @throws Exception
*/ */
//@Test @Test
public void testStartStop() throws Exception { public void testStartStop() throws Exception {
// Test stopping replication // Test stopping replication
@ -341,7 +341,7 @@ public class TestReplication {
* hlog rolling and other non-trivial code paths * hlog rolling and other non-trivial code paths
* @throws Exception * @throws Exception
*/ */
//@Test @Test
public void loadTesting() throws Exception { public void loadTesting() throws Exception {
htable1.setWriteBufferSize(1024); htable1.setWriteBufferSize(1024);
htable1.setAutoFlush(false); htable1.setAutoFlush(false);