HBASE-3102 Enhance HBase rMetrics for Long-running Stats
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1028615 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d2cdee5141
commit
0944bd9fb0
|
@ -1052,6 +1052,8 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-3167 HBase Export: Add ability to export specific Column Family;
|
HBASE-3167 HBase Export: Add ability to export specific Column Family;
|
||||||
Turn Block Cache off during export; improve usage doc
|
Turn Block Cache off during export; improve usage doc
|
||||||
(Kannan Muthukkaruppan via Stack)
|
(Kannan Muthukkaruppan via Stack)
|
||||||
|
HBASE-3102 Enhance HBase rMetrics for Long-running Stats
|
||||||
|
(Nicolas Spiegelberg via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -16,6 +16,10 @@ hbase.class=org.apache.hadoop.metrics.spi.NullContext
|
||||||
# hbase.period=10
|
# hbase.period=10
|
||||||
# hbase.fileName=/tmp/metrics_hbase.log
|
# hbase.fileName=/tmp/metrics_hbase.log
|
||||||
|
|
||||||
|
# HBase-specific configuration to reset long-running stats (e.g. compactions)
|
||||||
|
# If this variable is left out, then the default is no expiration.
|
||||||
|
hbase.extendedperiod = 3600
|
||||||
|
|
||||||
# Configuration of the "hbase" context for ganglia
|
# Configuration of the "hbase" context for ganglia
|
||||||
# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter)
|
# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter)
|
||||||
# hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext
|
# hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext
|
||||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
|
import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
|
import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
|
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
|
||||||
|
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.InfoServer;
|
import org.apache.hadoop.hbase.util.InfoServer;
|
||||||
|
@ -128,6 +129,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
private final HBaseServer rpcServer;
|
private final HBaseServer rpcServer;
|
||||||
// Address of the HMaster
|
// Address of the HMaster
|
||||||
private final HServerAddress address;
|
private final HServerAddress address;
|
||||||
|
// Metrics for the HMaster
|
||||||
|
private final MasterMetrics metrics;
|
||||||
// file system manager for the master FS operations
|
// file system manager for the master FS operations
|
||||||
private MasterFileSystem fileSystemManager;
|
private MasterFileSystem fileSystemManager;
|
||||||
|
|
||||||
|
@ -206,6 +209,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
|
|
||||||
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" +
|
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" +
|
||||||
address.getPort(), this);
|
address.getPort(), this);
|
||||||
|
|
||||||
|
this.metrics = new MasterMetrics(getServerName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -325,11 +330,11 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
|
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
|
||||||
this.fileSystemManager = new MasterFileSystem(this);
|
this.fileSystemManager = new MasterFileSystem(this, metrics);
|
||||||
this.connection = HConnectionManager.getConnection(conf);
|
this.connection = HConnectionManager.getConnection(conf);
|
||||||
this.executorService = new ExecutorService(getServerName());
|
this.executorService = new ExecutorService(getServerName());
|
||||||
|
|
||||||
this.serverManager = new ServerManager(this, this);
|
this.serverManager = new ServerManager(this, this, metrics);
|
||||||
|
|
||||||
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
||||||
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HServerInfo;
|
import org.apache.hadoop.hbase.HServerInfo;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
@ -54,6 +55,8 @@ public class MasterFileSystem {
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
// master status
|
// master status
|
||||||
Server master;
|
Server master;
|
||||||
|
// metrics for master
|
||||||
|
MasterMetrics metrics;
|
||||||
// Keep around for convenience.
|
// Keep around for convenience.
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
// Is the fileystem ok?
|
// Is the fileystem ok?
|
||||||
|
@ -65,9 +68,11 @@ public class MasterFileSystem {
|
||||||
// create the split log lock
|
// create the split log lock
|
||||||
final Lock splitLogLock = new ReentrantLock();
|
final Lock splitLogLock = new ReentrantLock();
|
||||||
|
|
||||||
public MasterFileSystem(Server master) throws IOException {
|
public MasterFileSystem(Server master, MasterMetrics metrics)
|
||||||
|
throws IOException {
|
||||||
this.conf = master.getConfiguration();
|
this.conf = master.getConfiguration();
|
||||||
this.master = master;
|
this.master = master;
|
||||||
|
this.metrics = metrics;
|
||||||
// Set filesystem to be that of this.rootdir else we get complaints about
|
// Set filesystem to be that of this.rootdir else we get complaints about
|
||||||
// mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
|
// mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
|
||||||
// default localfs. Presumption is that rootdir is fully-qualified before
|
// default localfs. Presumption is that rootdir is fully-qualified before
|
||||||
|
@ -181,15 +186,21 @@ public class MasterFileSystem {
|
||||||
|
|
||||||
public void splitLog(final String serverName) {
|
public void splitLog(final String serverName) {
|
||||||
this.splitLogLock.lock();
|
this.splitLogLock.lock();
|
||||||
|
long splitTime = 0, splitLogSize = 0;
|
||||||
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
|
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
|
||||||
try {
|
try {
|
||||||
HLogSplitter splitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter splitter = HLogSplitter.createLogSplitter(conf);
|
||||||
splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
|
splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
|
||||||
|
splitTime = splitter.getTime();
|
||||||
|
splitLogSize = splitter.getSize();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed splitting " + logDir.toString(), e);
|
LOG.error("Failed splitting " + logDir.toString(), e);
|
||||||
} finally {
|
} finally {
|
||||||
this.splitLogLock.unlock();
|
this.splitLogLock.unlock();
|
||||||
}
|
}
|
||||||
|
if (this.metrics != null) {
|
||||||
|
this.metrics.addSplit(splitTime, splitLogSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -123,15 +123,17 @@ public class ServerManager {
|
||||||
* Constructor.
|
* Constructor.
|
||||||
* @param master
|
* @param master
|
||||||
* @param services
|
* @param services
|
||||||
|
* @param metrics
|
||||||
* @param freshClusterStartup True if we are original master on a fresh
|
* @param freshClusterStartup True if we are original master on a fresh
|
||||||
* cluster startup else if false, we are joining an already running cluster.
|
* cluster startup else if false, we are joining an already running cluster.
|
||||||
*/
|
*/
|
||||||
public ServerManager(final Server master, final MasterServices services) {
|
public ServerManager(final Server master, final MasterServices services,
|
||||||
|
MasterMetrics metrics) {
|
||||||
this.master = master;
|
this.master = master;
|
||||||
this.services = services;
|
this.services = services;
|
||||||
|
this.metrics = metrics;
|
||||||
Configuration c = master.getConfiguration();
|
Configuration c = master.getConfiguration();
|
||||||
int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000);
|
int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000);
|
||||||
this.metrics = new MasterMetrics(master.getServerName());
|
|
||||||
this.serverMonitorThread = new ServerMonitor(monitorInterval, master);
|
this.serverMonitorThread = new ServerMonitor(monitorInterval, master);
|
||||||
String n = Thread.currentThread().getName();
|
String n = Thread.currentThread().getName();
|
||||||
Threads.setDaemonThreadRunning(this.serverMonitorThread, n + ".serverMonitor");
|
Threads.setDaemonThreadRunning(this.serverMonitorThread, n + ".serverMonitor");
|
||||||
|
|
|
@ -17,14 +17,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.metrics;
|
package org.apache.hadoop.hbase.master.metrics;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.metrics.MetricsRate;
|
import org.apache.hadoop.hbase.metrics.MetricsRate;
|
||||||
|
import org.apache.hadoop.hbase.metrics.PersistentMetricsTimeVaryingRate;
|
||||||
|
import org.apache.hadoop.metrics.ContextFactory;
|
||||||
import org.apache.hadoop.metrics.MetricsContext;
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
import org.apache.hadoop.metrics.MetricsRecord;
|
import org.apache.hadoop.metrics.MetricsRecord;
|
||||||
import org.apache.hadoop.metrics.MetricsUtil;
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
import org.apache.hadoop.metrics.Updater;
|
import org.apache.hadoop.metrics.Updater;
|
||||||
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsLongValue;
|
||||||
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||||
|
|
||||||
|
|
||||||
|
@ -40,12 +45,24 @@ public class MasterMetrics implements Updater {
|
||||||
private final MetricsRecord metricsRecord;
|
private final MetricsRecord metricsRecord;
|
||||||
private final MetricsRegistry registry = new MetricsRegistry();
|
private final MetricsRegistry registry = new MetricsRegistry();
|
||||||
private final MasterStatistics masterStatistics;
|
private final MasterStatistics masterStatistics;
|
||||||
/*
|
|
||||||
|
private long lastUpdate = System.currentTimeMillis();
|
||||||
|
private long lastExtUpdate = System.currentTimeMillis();
|
||||||
|
private long extendedPeriod = 0;
|
||||||
|
/*
|
||||||
* Count of requests to the cluster since last call to metrics update
|
* Count of requests to the cluster since last call to metrics update
|
||||||
*/
|
*/
|
||||||
private final MetricsRate cluster_requests =
|
private final MetricsRate cluster_requests =
|
||||||
new MetricsRate("cluster_requests", registry);
|
new MetricsRate("cluster_requests", registry);
|
||||||
|
|
||||||
|
/** Time it takes to finish HLog.splitLog() */
|
||||||
|
final PersistentMetricsTimeVaryingRate splitTime =
|
||||||
|
new PersistentMetricsTimeVaryingRate("splitTime", registry);
|
||||||
|
|
||||||
|
/** Size of HLog files being split */
|
||||||
|
final PersistentMetricsTimeVaryingRate splitSize =
|
||||||
|
new PersistentMetricsTimeVaryingRate("splitSize", registry);
|
||||||
|
|
||||||
public MasterMetrics(final String name) {
|
public MasterMetrics(final String name) {
|
||||||
MetricsContext context = MetricsUtil.getContext("hbase");
|
MetricsContext context = MetricsUtil.getContext("hbase");
|
||||||
metricsRecord = MetricsUtil.createRecord(context, "master");
|
metricsRecord = MetricsUtil.createRecord(context, "master");
|
||||||
|
@ -56,6 +73,17 @@ public class MasterMetrics implements Updater {
|
||||||
// expose the MBean for metrics
|
// expose the MBean for metrics
|
||||||
masterStatistics = new MasterStatistics(this.registry);
|
masterStatistics = new MasterStatistics(this.registry);
|
||||||
|
|
||||||
|
// get custom attributes
|
||||||
|
try {
|
||||||
|
Object m =
|
||||||
|
ContextFactory.getFactory().getAttribute("hbase.extendedperiod");
|
||||||
|
if (m instanceof String) {
|
||||||
|
this.extendedPeriod = Long.parseLong((String) m)*1000;
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.info("Couldn't load ContextFactory for Metrics config info");
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Initialized");
|
LOG.info("Initialized");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +99,20 @@ public class MasterMetrics implements Updater {
|
||||||
*/
|
*/
|
||||||
public void doUpdates(MetricsContext unused) {
|
public void doUpdates(MetricsContext unused) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
this.lastUpdate = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// has the extended period for long-living stats elapsed?
|
||||||
|
if (this.extendedPeriod > 0 &&
|
||||||
|
this.lastUpdate - this.lastExtUpdate >= this.extendedPeriod) {
|
||||||
|
this.lastExtUpdate = this.lastUpdate;
|
||||||
|
this.splitTime.resetMinMaxAvg();
|
||||||
|
this.splitSize.resetMinMaxAvg();
|
||||||
|
this.resetAllMinMax();
|
||||||
|
}
|
||||||
|
|
||||||
this.cluster_requests.pushMetric(metricsRecord);
|
this.cluster_requests.pushMetric(metricsRecord);
|
||||||
|
this.splitTime.pushMetric(metricsRecord);
|
||||||
|
this.splitSize.pushMetric(metricsRecord);
|
||||||
}
|
}
|
||||||
this.metricsRecord.update();
|
this.metricsRecord.update();
|
||||||
}
|
}
|
||||||
|
@ -80,6 +121,16 @@ public class MasterMetrics implements Updater {
|
||||||
// Nothing to do
|
// Nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a single instance of a split
|
||||||
|
* @param time time that the split took
|
||||||
|
* @param size length of original HLogs that were split
|
||||||
|
*/
|
||||||
|
public synchronized void addSplit(long time, long size) {
|
||||||
|
splitTime.inc(time);
|
||||||
|
splitSize.inc(size);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Count of requests.
|
* @return Count of requests.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,137 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.metrics;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.metrics.MetricsRecord;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class extends MetricsTimeVaryingRate to let the metrics
|
||||||
|
* persist past a pushMetric() call
|
||||||
|
*/
|
||||||
|
public class PersistentMetricsTimeVaryingRate extends MetricsTimeVaryingRate {
|
||||||
|
protected static final Log LOG =
|
||||||
|
LogFactory.getLog("org.apache.hadoop.hbase.metrics");
|
||||||
|
|
||||||
|
protected boolean reset = false;
|
||||||
|
protected long lastOper = 0;
|
||||||
|
protected long totalOps = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor - create a new metric
|
||||||
|
* @param nam the name of the metrics to be used to publish the metric
|
||||||
|
* @param registry - where the metrics object will be registered
|
||||||
|
* @param description metrics description
|
||||||
|
*/
|
||||||
|
public PersistentMetricsTimeVaryingRate(final String nam,
|
||||||
|
final MetricsRegistry registry,
|
||||||
|
final String description) {
|
||||||
|
super(nam, registry, description);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor - create a new metric
|
||||||
|
* @param nam the name of the metrics to be used to publish the metric
|
||||||
|
* @param registry - where the metrics object will be registered
|
||||||
|
*/
|
||||||
|
public PersistentMetricsTimeVaryingRate(final String nam,
|
||||||
|
MetricsRegistry registry) {
|
||||||
|
this(nam, registry, NO_DESCRIPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push updated metrics to the mr.
|
||||||
|
*
|
||||||
|
* Note this does NOT push to JMX
|
||||||
|
* (JMX gets the info via {@link #getPreviousIntervalAverageTime()} and
|
||||||
|
* {@link #getPreviousIntervalNumOps()}
|
||||||
|
*
|
||||||
|
* @param mr owner of this metric
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void pushMetric(final MetricsRecord mr) {
|
||||||
|
// this will reset the currentInterval & num_ops += prevInterval()
|
||||||
|
super.pushMetric(mr);
|
||||||
|
// since we're retaining prevInterval(), we don't want to do the incr
|
||||||
|
// instead, we want to set that value because we have absolute ops
|
||||||
|
try {
|
||||||
|
mr.setMetric(getName() + "_num_ops", totalOps);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("pushMetric failed for " + getName() + "\n" +
|
||||||
|
StringUtils.stringifyException(e));
|
||||||
|
}
|
||||||
|
if (reset) {
|
||||||
|
// use the previous avg as our starting min/max/avg
|
||||||
|
super.inc(getPreviousIntervalAverageTime());
|
||||||
|
} else {
|
||||||
|
// maintain the stats that pushMetric() cleared
|
||||||
|
maintainStats();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment the metrics for numOps operations
|
||||||
|
* @param numOps - number of operations
|
||||||
|
* @param time - time for numOps operations
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void inc(final int numOps, final long time) {
|
||||||
|
super.inc(numOps, time);
|
||||||
|
totalOps += numOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment the metrics for numOps operations
|
||||||
|
* @param time - time for numOps operations
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void inc(final long time) {
|
||||||
|
super.inc(time);
|
||||||
|
++totalOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rollover to a new interval
|
||||||
|
* NOTE: does not reset numOps. this is an absolute value
|
||||||
|
*/
|
||||||
|
public synchronized void resetMinMaxAvg() {
|
||||||
|
reset = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* MetricsTimeVaryingRate will reset every time pushMetric() is called
|
||||||
|
* This is annoying for long-running stats that might not get a single
|
||||||
|
* operation in the polling period. This function ensures that values
|
||||||
|
* for those stat entries don't get reset.
|
||||||
|
*/
|
||||||
|
protected void maintainStats() {
|
||||||
|
int curOps = this.getPreviousIntervalNumOps();
|
||||||
|
if (curOps > 0) {
|
||||||
|
long curTime = this.getPreviousIntervalAverageTime();
|
||||||
|
long totalTime = curTime * curOps;
|
||||||
|
if (totalTime / curTime == curOps) {
|
||||||
|
super.inc(curOps, totalTime);
|
||||||
|
} else {
|
||||||
|
LOG.info("Stats for " + this.getName() + " overflowed! resetting");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -102,6 +102,9 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
||||||
if(!this.server.isStopped()) {
|
if(!this.server.isStopped()) {
|
||||||
// Don't interrupt us while we are working
|
// Don't interrupt us while we are working
|
||||||
byte [] midKey = r.compactStores();
|
byte [] midKey = r.compactStores();
|
||||||
|
if (r.getLastCompactInfo() != null) { // compaction aborted?
|
||||||
|
this.server.getMetrics().addCompaction(r.getLastCompactInfo());
|
||||||
|
}
|
||||||
if (shouldSplitRegion() && midKey != null &&
|
if (shouldSplitRegion() && midKey != null &&
|
||||||
!this.server.isStopped()) {
|
!this.server.isStopped()) {
|
||||||
split(r, midKey);
|
split(r, midKey);
|
||||||
|
|
|
@ -177,6 +177,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* major compaction. Cleared each time through compaction code.
|
* major compaction. Cleared each time through compaction code.
|
||||||
*/
|
*/
|
||||||
private volatile boolean forceMajorCompaction = false;
|
private volatile boolean forceMajorCompaction = false;
|
||||||
|
private Pair<Long,Long> lastCompactInfo = null;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Data structure of write state flags used coordinating flushes,
|
* Data structure of write state flags used coordinating flushes,
|
||||||
|
@ -217,6 +218,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
final long memstoreFlushSize;
|
final long memstoreFlushSize;
|
||||||
private volatile long lastFlushTime;
|
private volatile long lastFlushTime;
|
||||||
|
private List<Pair<Long,Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
|
||||||
final FlushRequester flushRequester;
|
final FlushRequester flushRequester;
|
||||||
private final long blockingMemStoreSize;
|
private final long blockingMemStoreSize;
|
||||||
final long threadWakeFrequency;
|
final long threadWakeFrequency;
|
||||||
|
@ -608,11 +610,25 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
return this.fs;
|
return this.fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return info about the last compaction <time, size> */
|
||||||
|
public Pair<Long,Long> getLastCompactInfo() {
|
||||||
|
return this.lastCompactInfo;
|
||||||
|
}
|
||||||
|
|
||||||
/** @return the last time the region was flushed */
|
/** @return the last time the region was flushed */
|
||||||
public long getLastFlushTime() {
|
public long getLastFlushTime() {
|
||||||
return this.lastFlushTime;
|
return this.lastFlushTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return info about the last flushes <time, size> */
|
||||||
|
public List<Pair<Long,Long>> getRecentFlushInfo() {
|
||||||
|
this.lock.readLock().lock();
|
||||||
|
List<Pair<Long,Long>> ret = this.recentFlushes;
|
||||||
|
this.recentFlushes = new ArrayList<Pair<Long,Long>>();
|
||||||
|
this.lock.readLock().unlock();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
// HRegion maintenance.
|
// HRegion maintenance.
|
||||||
//
|
//
|
||||||
|
@ -704,6 +720,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
|
this.lastCompactInfo = null;
|
||||||
try {
|
try {
|
||||||
if (this.closed.get()) {
|
if (this.closed.get()) {
|
||||||
LOG.debug("Skipping compaction on " + this + " because closed");
|
LOG.debug("Skipping compaction on " + this + " because closed");
|
||||||
|
@ -728,11 +745,13 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
"compaction on region " + this);
|
"compaction on region " + this);
|
||||||
long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
doRegionCompactionPrep();
|
doRegionCompactionPrep();
|
||||||
|
long lastCompactSize = 0;
|
||||||
long maxSize = -1;
|
long maxSize = -1;
|
||||||
boolean completed = false;
|
boolean completed = false;
|
||||||
try {
|
try {
|
||||||
for (Store store: stores.values()) {
|
for (Store store: stores.values()) {
|
||||||
final Store.StoreSize ss = store.compact(majorCompaction);
|
final Store.StoreSize ss = store.compact(majorCompaction);
|
||||||
|
lastCompactSize += store.getLastCompactSize();
|
||||||
if (ss != null && ss.getSize() > maxSize) {
|
if (ss != null && ss.getSize() > maxSize) {
|
||||||
maxSize = ss.getSize();
|
maxSize = ss.getSize();
|
||||||
splitRow = ss.getSplitRow();
|
splitRow = ss.getSplitRow();
|
||||||
|
@ -746,6 +765,10 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
LOG.info(((completed) ? "completed" : "aborted")
|
LOG.info(((completed) ? "completed" : "aborted")
|
||||||
+ " compaction on region " + this
|
+ " compaction on region " + this
|
||||||
+ " after " + StringUtils.formatTimeDiff(now, startTime));
|
+ " after " + StringUtils.formatTimeDiff(now, startTime));
|
||||||
|
if (completed) {
|
||||||
|
this.lastCompactInfo =
|
||||||
|
new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (writestate) {
|
synchronized (writestate) {
|
||||||
|
@ -973,14 +996,16 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
|
||||||
LOG.info("Finished memstore flush of ~" +
|
LOG.info("Finished memstore flush of ~" +
|
||||||
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
|
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
|
||||||
this + " in " + (now - startTime) + "ms, sequenceid=" + sequenceId +
|
this + " in " + time + "ms, sequenceid=" + sequenceId +
|
||||||
", compaction requested=" + compactionRequested +
|
", compaction requested=" + compactionRequested +
|
||||||
((wal == null)? "; wal=null": ""));
|
((wal == null)? "; wal=null": ""));
|
||||||
}
|
}
|
||||||
|
this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
|
||||||
|
|
||||||
return compactionRequested;
|
return compactionRequested;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3144,7 +3169,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
(4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
|
(4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
|
||||||
(18 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
|
(20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||||
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
|
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
|
||||||
|
|
|
@ -250,6 +250,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
|
||||||
if (region.flushcache()) {
|
if (region.flushcache()) {
|
||||||
server.compactSplitThread.requestCompaction(region, getName());
|
server.compactSplitThread.requestCompaction(region, getName());
|
||||||
}
|
}
|
||||||
|
server.getMetrics().addFlush(region.getRecentFlushInfo());
|
||||||
} catch (DroppedSnapshotException ex) {
|
} catch (DroppedSnapshotException ex) {
|
||||||
// Cache flush can fail in a few places. If it fails in a critical
|
// Cache flush can fail in a few places. If it fails in a critical
|
||||||
// section, we get a DroppedSnapshotException and a replay of hlog
|
// section, we get a DroppedSnapshotException and a replay of hlog
|
||||||
|
|
|
@ -91,6 +91,7 @@ public class Store implements HeapSize {
|
||||||
protected long ttl;
|
protected long ttl;
|
||||||
private long majorCompactionTime;
|
private long majorCompactionTime;
|
||||||
private int maxFilesToCompact;
|
private int maxFilesToCompact;
|
||||||
|
private long lastCompactSize = 0;
|
||||||
/* how many bytes to write between status checks */
|
/* how many bytes to write between status checks */
|
||||||
static int closeCheckInterval = 0;
|
static int closeCheckInterval = 0;
|
||||||
private final long desiredMaxFileSize;
|
private final long desiredMaxFileSize;
|
||||||
|
@ -605,6 +606,8 @@ public class Store implements HeapSize {
|
||||||
boolean forceSplit = this.region.shouldSplit(false);
|
boolean forceSplit = this.region.shouldSplit(false);
|
||||||
boolean majorcompaction = mc;
|
boolean majorcompaction = mc;
|
||||||
synchronized (compactLock) {
|
synchronized (compactLock) {
|
||||||
|
this.lastCompactSize = 0;
|
||||||
|
|
||||||
// filesToCompact are sorted oldest to newest.
|
// filesToCompact are sorted oldest to newest.
|
||||||
List<StoreFile> filesToCompact = this.storefiles;
|
List<StoreFile> filesToCompact = this.storefiles;
|
||||||
if (filesToCompact.isEmpty()) {
|
if (filesToCompact.isEmpty()) {
|
||||||
|
@ -688,6 +691,7 @@ public class Store implements HeapSize {
|
||||||
" file(s), size: " + skipped);
|
" file(s), size: " + skipped);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
this.lastCompactSize = totalSize - skipped;
|
||||||
|
|
||||||
// Ready to go. Have list of files to compact.
|
// Ready to go. Have list of files to compact.
|
||||||
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
|
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
|
||||||
|
@ -1256,6 +1260,11 @@ public class Store implements HeapSize {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return aggregate size of all HStores used in the last compaction */
|
||||||
|
public long getLastCompactSize() {
|
||||||
|
return this.lastCompactSize;
|
||||||
|
}
|
||||||
|
|
||||||
/** @return aggregate size of HStore */
|
/** @return aggregate size of HStore */
|
||||||
public long getSize() {
|
public long getSize() {
|
||||||
return storeSize;
|
return storeSize;
|
||||||
|
@ -1459,7 +1468,7 @@ public class Store implements HeapSize {
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
|
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
|
||||||
(4 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
|
(5 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||||
ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
|
ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
|
||||||
|
|
|
@ -23,8 +23,11 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.metrics.MetricsRate;
|
import org.apache.hadoop.hbase.metrics.MetricsRate;
|
||||||
|
import org.apache.hadoop.hbase.metrics.PersistentMetricsTimeVaryingRate;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Strings;
|
import org.apache.hadoop.hbase.util.Strings;
|
||||||
|
import org.apache.hadoop.metrics.ContextFactory;
|
||||||
import org.apache.hadoop.metrics.MetricsContext;
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
import org.apache.hadoop.metrics.MetricsRecord;
|
import org.apache.hadoop.metrics.MetricsRecord;
|
||||||
import org.apache.hadoop.metrics.MetricsUtil;
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
|
@ -35,8 +38,10 @@ import org.apache.hadoop.metrics.util.MetricsLongValue;
|
||||||
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
|
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.lang.management.MemoryUsage;
|
import java.lang.management.MemoryUsage;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is for maintaining the various regionserver statistics
|
* This class is for maintaining the various regionserver statistics
|
||||||
|
@ -50,6 +55,8 @@ public class RegionServerMetrics implements Updater {
|
||||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||||
private final MetricsRecord metricsRecord;
|
private final MetricsRecord metricsRecord;
|
||||||
private long lastUpdate = System.currentTimeMillis();
|
private long lastUpdate = System.currentTimeMillis();
|
||||||
|
private long lastExtUpdate = System.currentTimeMillis();
|
||||||
|
private long extendedPeriod = 0;
|
||||||
private static final int MB = 1024*1024;
|
private static final int MB = 1024*1024;
|
||||||
private MetricsRegistry registry = new MetricsRegistry();
|
private MetricsRegistry registry = new MetricsRegistry();
|
||||||
private final RegionServerStatistics statistics;
|
private final RegionServerStatistics statistics;
|
||||||
|
@ -134,6 +141,24 @@ public class RegionServerMetrics implements Updater {
|
||||||
public final MetricsTimeVaryingRate fsSyncLatency =
|
public final MetricsTimeVaryingRate fsSyncLatency =
|
||||||
new MetricsTimeVaryingRate("fsSyncLatency", registry);
|
new MetricsTimeVaryingRate("fsSyncLatency", registry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* time each scheduled compaction takes
|
||||||
|
*/
|
||||||
|
protected final PersistentMetricsTimeVaryingRate compactionTime =
|
||||||
|
new PersistentMetricsTimeVaryingRate("compactionTime", registry);
|
||||||
|
|
||||||
|
protected final PersistentMetricsTimeVaryingRate compactionSize =
|
||||||
|
new PersistentMetricsTimeVaryingRate("compactionSize", registry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* time each scheduled flush takes
|
||||||
|
*/
|
||||||
|
protected final PersistentMetricsTimeVaryingRate flushTime =
|
||||||
|
new PersistentMetricsTimeVaryingRate("flushTime", registry);
|
||||||
|
|
||||||
|
protected final PersistentMetricsTimeVaryingRate flushSize =
|
||||||
|
new PersistentMetricsTimeVaryingRate("flushSize", registry);
|
||||||
|
|
||||||
public RegionServerMetrics() {
|
public RegionServerMetrics() {
|
||||||
MetricsContext context = MetricsUtil.getContext("hbase");
|
MetricsContext context = MetricsUtil.getContext("hbase");
|
||||||
metricsRecord = MetricsUtil.createRecord(context, "regionserver");
|
metricsRecord = MetricsUtil.createRecord(context, "regionserver");
|
||||||
|
@ -146,6 +171,16 @@ public class RegionServerMetrics implements Updater {
|
||||||
// export for JMX
|
// export for JMX
|
||||||
statistics = new RegionServerStatistics(this.registry, name);
|
statistics = new RegionServerStatistics(this.registry, name);
|
||||||
|
|
||||||
|
// get custom attributes
|
||||||
|
try {
|
||||||
|
Object m = ContextFactory.getFactory().getAttribute("hbase.extendedperiod");
|
||||||
|
if (m instanceof String) {
|
||||||
|
this.extendedPeriod = Long.parseLong((String) m)*1000;
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.info("Couldn't load ContextFactory for Metrics config info");
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Initialized");
|
LOG.info("Initialized");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,10 +192,23 @@ public class RegionServerMetrics implements Updater {
|
||||||
/**
|
/**
|
||||||
* Since this object is a registered updater, this method will be called
|
* Since this object is a registered updater, this method will be called
|
||||||
* periodically, e.g. every 5 seconds.
|
* periodically, e.g. every 5 seconds.
|
||||||
* @param unused unused argument
|
* @param caller the metrics context that this responsible for calling us
|
||||||
*/
|
*/
|
||||||
public void doUpdates(MetricsContext unused) {
|
public void doUpdates(MetricsContext caller) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
this.lastUpdate = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// has the extended period for long-living stats elapsed?
|
||||||
|
if (this.extendedPeriod > 0 &&
|
||||||
|
this.lastUpdate - this.lastExtUpdate >= this.extendedPeriod) {
|
||||||
|
this.lastExtUpdate = this.lastUpdate;
|
||||||
|
this.compactionTime.resetMinMaxAvg();
|
||||||
|
this.compactionSize.resetMinMaxAvg();
|
||||||
|
this.flushTime.resetMinMaxAvg();
|
||||||
|
this.flushSize.resetMinMaxAvg();
|
||||||
|
this.resetAllMinMax();
|
||||||
|
}
|
||||||
|
|
||||||
this.stores.pushMetric(this.metricsRecord);
|
this.stores.pushMetric(this.metricsRecord);
|
||||||
this.storefiles.pushMetric(this.metricsRecord);
|
this.storefiles.pushMetric(this.metricsRecord);
|
||||||
this.storefileIndexSizeMB.pushMetric(this.metricsRecord);
|
this.storefileIndexSizeMB.pushMetric(this.metricsRecord);
|
||||||
|
@ -196,15 +244,19 @@ public class RegionServerMetrics implements Updater {
|
||||||
this.fsReadLatency.pushMetric(this.metricsRecord);
|
this.fsReadLatency.pushMetric(this.metricsRecord);
|
||||||
this.fsWriteLatency.pushMetric(this.metricsRecord);
|
this.fsWriteLatency.pushMetric(this.metricsRecord);
|
||||||
this.fsSyncLatency.pushMetric(this.metricsRecord);
|
this.fsSyncLatency.pushMetric(this.metricsRecord);
|
||||||
|
this.compactionTime.pushMetric(this.metricsRecord);
|
||||||
|
this.compactionSize.pushMetric(this.metricsRecord);
|
||||||
|
this.flushTime.pushMetric(this.metricsRecord);
|
||||||
|
this.flushSize.pushMetric(this.metricsRecord);
|
||||||
}
|
}
|
||||||
this.metricsRecord.update();
|
this.metricsRecord.update();
|
||||||
this.lastUpdate = System.currentTimeMillis();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetAllMinMax() {
|
public void resetAllMinMax() {
|
||||||
this.atomicIncrementTime.resetMinMax();
|
this.atomicIncrementTime.resetMinMax();
|
||||||
this.fsReadLatency.resetMinMax();
|
this.fsReadLatency.resetMinMax();
|
||||||
this.fsWriteLatency.resetMinMax();
|
this.fsWriteLatency.resetMinMax();
|
||||||
|
this.fsSyncLatency.resetMinMax();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -214,6 +266,24 @@ public class RegionServerMetrics implements Updater {
|
||||||
return this.requests.getPreviousIntervalValue();
|
return this.requests.getPreviousIntervalValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param compact history in <time, size>
|
||||||
|
*/
|
||||||
|
public synchronized void addCompaction(final Pair<Long,Long> compact) {
|
||||||
|
this.compactionTime.inc(compact.getFirst());
|
||||||
|
this.compactionSize.inc(compact.getSecond());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param flushes history in <time, size>
|
||||||
|
*/
|
||||||
|
public synchronized void addFlush(final List<Pair<Long,Long>> flushes) {
|
||||||
|
for (Pair<Long,Long> f : flushes) {
|
||||||
|
this.flushTime.inc(f.getFirst());
|
||||||
|
this.flushSize.inc(f.getSecond());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param inc How much to add to requests.
|
* @param inc How much to add to requests.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -68,6 +68,9 @@ public class HLogSplitter {
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
|
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
|
||||||
|
|
||||||
|
private long splitTime = 0;
|
||||||
|
private long splitSize = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of file that holds recovered edits written by the wal log splitting
|
* Name of file that holds recovered edits written by the wal log splitting
|
||||||
* code, one per region
|
* code, one per region
|
||||||
|
@ -132,7 +135,7 @@ public class HLogSplitter {
|
||||||
Path oldLogDir, final FileSystem fs, final Configuration conf)
|
Path oldLogDir, final FileSystem fs, final Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
long millis = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
List<Path> splits = null;
|
List<Path> splits = null;
|
||||||
if (!fs.exists(srcDir)) {
|
if (!fs.exists(srcDir)) {
|
||||||
// Nothing to do
|
// Nothing to do
|
||||||
|
@ -168,12 +171,26 @@ public class HLogSplitter {
|
||||||
io.initCause(e);
|
io.initCause(e);
|
||||||
throw io;
|
throw io;
|
||||||
}
|
}
|
||||||
long endMillis = System.currentTimeMillis();
|
splitTime = System.currentTimeMillis() - startTime;
|
||||||
LOG.info("hlog file splitting completed in " + (endMillis - millis)
|
LOG.info("hlog file splitting completed in " + splitTime +
|
||||||
+ " millis for " + srcDir.toString());
|
" ms for " + srcDir.toString());
|
||||||
return splits;
|
return splits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return time that this split took
|
||||||
|
*/
|
||||||
|
public long getTime() {
|
||||||
|
return this.splitTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return aggregate size of hlogs that were split
|
||||||
|
*/
|
||||||
|
public long getSize() {
|
||||||
|
return this.splitSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sorts the HLog edits in the given list of logfiles (that are a mix of edits
|
* Sorts the HLog edits in the given list of logfiles (that are a mix of edits
|
||||||
* on multiple regions) by region and then splits them per region directories,
|
* on multiple regions) by region and then splits them per region directories,
|
||||||
|
@ -223,6 +240,8 @@ public class HLogSplitter {
|
||||||
int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
|
int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
|
||||||
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
|
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
|
||||||
|
|
||||||
|
splitSize = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int i = -1;
|
int i = -1;
|
||||||
while (i < logfiles.length) {
|
while (i < logfiles.length) {
|
||||||
|
@ -236,6 +255,7 @@ public class HLogSplitter {
|
||||||
FileStatus log = logfiles[i];
|
FileStatus log = logfiles[i];
|
||||||
Path logPath = log.getPath();
|
Path logPath = log.getPath();
|
||||||
long logLength = log.getLen();
|
long logLength = log.getLen();
|
||||||
|
splitSize += logLength;
|
||||||
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length
|
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length
|
||||||
+ ": " + logPath + ", length=" + logLength);
|
+ ": " + logPath + ", length=" + logLength);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -120,7 +120,7 @@ public class TestCatalogJanitor {
|
||||||
private final MasterFileSystem mfs;
|
private final MasterFileSystem mfs;
|
||||||
|
|
||||||
MockMasterServices(final Server server) throws IOException {
|
MockMasterServices(final Server server) throws IOException {
|
||||||
this.mfs = new MasterFileSystem(server);
|
this.mfs = new MasterFileSystem(server, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue