HBASE-3836 Add facility to track currently progressing actions and workflows.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1098933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-05-03 06:11:06 +00:00
parent 8341e4a3be
commit ca952ac00b
16 changed files with 724 additions and 78 deletions

View File

@ -233,6 +233,8 @@ Release 0.91.0 - Unreleased
(Subbu M. Iyer via Stack)
HBASE-1364 [performance] Distributed splitting of regionserver commit logs
(Prakash Khemani)
HBASE-3836 Add facility to track currently progressing actions and
workflows. (todd)
Release 0.90.3 - Unreleased

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@ -119,17 +120,20 @@ class ActiveMasterManager extends ZooKeeperListener {
*
* This also makes sure that we are watching the master znode so will be
* notified if another master dies.
* @param startupStatus
* @return True if no issue becoming active master else false if another
* master was running or if some other problem (zookeeper, stop flag has been
* set on this Master)
*/
boolean blockUntilBecomingActiveMaster() {
boolean blockUntilBecomingActiveMaster(MonitoredTask startupStatus) {
startupStatus.setStatus("Trying to register in ZK as active master");
boolean cleanSetOfActiveMaster = true;
// Try to become the active master, watch if there is another master
try {
if (ZKUtil.createEphemeralNodeAndWatch(this.watcher,
this.watcher.masterAddressZNode, Bytes.toBytes(this.sn.toString()))) {
// We are the master, return
startupStatus.setStatus("Successfully registered as active master.");
this.clusterHasActiveMaster.set(true);
LOG.info("Master=" + this.sn);
return cleanSetOfActiveMaster;
@ -143,13 +147,17 @@ class ActiveMasterManager extends ZooKeeperListener {
ZKUtil.getDataAndWatch(this.watcher, this.watcher.masterAddressZNode);
ServerName currentMaster = new ServerName(Bytes.toString(bytes));
if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
LOG.info("Current master has this master's address, " + currentMaster +
String msg = ("Current master has this master's address, " + currentMaster +
"; master was restarted? Waiting on znode to expire...");
LOG.info(msg);
startupStatus.setStatus(msg);
// Hurry along the expiration of the znode.
ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
} else {
LOG.info("Another master is the active master, " + currentMaster +
"; waiting to become the next active master");
String msg = "Another master is the active master, " + currentMaster +
"; waiting to become the next active master";
LOG.info(msg);
startupStatus.setStatus(msg);
}
} catch (KeeperException ke) {
master.abort("Received an unexpected KeeperException, aborting", ke);
@ -168,7 +176,7 @@ class ActiveMasterManager extends ZooKeeperListener {
return cleanSetOfActiveMaster;
}
// Try to become active master again now that there is no active master
blockUntilBecomingActiveMaster();
blockUntilBecomingActiveMaster(startupStatus);
}
return cleanSetOfActiveMaster;
}

View File

@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
@ -271,6 +273,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
*/
@Override
public void run() {
MonitoredTask startupStatus =
TaskMonitor.get().createStatus("Master startup");
startupStatus.setDescription("Master startup");
try {
/*
* Block on becoming the active master.
@ -282,16 +287,18 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
* now wait until it dies to try and become the next active master. If we
* do not succeed on our first attempt, this is no longer a cluster startup.
*/
becomeActiveMaster();
becomeActiveMaster(startupStatus);
// We are either the active master or we were asked to shutdown
if (!this.stopped) {
finishInitialization();
finishInitialization(startupStatus);
loop();
}
} catch (Throwable t) {
abort("Unhandled exception. Starting shutdown.", t);
} finally {
startupStatus.cleanup();
stopChores();
// Wait for all the remaining region servers to report in IFF we were
// running a cluster shutdown AND we were NOT aborting.
@ -313,17 +320,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
/**
* Try becoming active master.
* @param startupStatus
* @return True if we could successfully become the active master.
* @throws InterruptedException
*/
private boolean becomeActiveMaster() throws InterruptedException {
private boolean becomeActiveMaster(MonitoredTask startupStatus)
throws InterruptedException {
// TODO: This is wrong!!!! Should have new servername if we restart ourselves,
// if we come back to life.
this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
this);
this.zooKeeper.registerListener(activeMasterManager);
stallIfBackupMaster(this.conf, this.activeMasterManager);
return this.activeMasterManager.blockUntilBecomingActiveMaster();
return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
}
/**
@ -386,7 +395,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
* @throws InterruptedException
* @throws KeeperException
*/
private void finishInitialization()
private void finishInitialization(MonitoredTask status)
throws IOException, InterruptedException, KeeperException {
isActiveMaster = true;
@ -397,9 +406,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
* below after we determine if cluster startup or failover.
*/
status.setStatus("Initializing Master file system");
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
this.fileSystemManager = new MasterFileSystem(this, metrics);
// publish cluster ID
status.setStatus("Publishing Cluster ID in ZooKeeper");
ClusterId.setClusterId(this.zooKeeper,
fileSystemManager.getClusterId());
@ -407,16 +419,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.serverManager = new ServerManager(this, this);
status.setStatus("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
// initialize master side coprocessors before we start handling requests
status.setStatus("Initializing master coprocessors");
this.cpHost = new MasterCoprocessorHost(this, this.conf);
// start up all service threads.
status.setStatus("Initializing master service threads");
startServiceThreads();
// Wait for region servers to report in.
this.serverManager.waitForRegionServers();
this.serverManager.waitForRegionServers(status);
// Check zk for regionservers that are up but didn't register
for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
if (!this.serverManager.isServerOnline(sn)) {
@ -427,20 +442,25 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
}
// TODO: Should do this in background rather than block master startup
status.setStatus("Splitting logs after master startup");
this.fileSystemManager.
splitLogAfterStartup(this.serverManager.getOnlineServers().keySet());
// Make sure root and meta assigned before proceeding.
assignRootAndMeta();
assignRootAndMeta(status);
// Fixup assignment manager status
status.setStatus("Starting assignment manager");
this.assignmentManager.joinCluster();
// Start balancer and meta catalog janitor after meta and regions have
// been assigned.
status.setStatus("Starting balancer and catalog janitor");
this.balancerChore = getAndStartBalancerChore(this);
this.catalogJanitorChore =
Threads.setDaemonThreadRunning(new CatalogJanitor(this, this));
status.markComplete("Initialization successful");
LOG.info("Master has completed initialization");
initialized = true;
}
@ -453,12 +473,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
* @throws KeeperException
* @return Count of regions we assigned.
*/
int assignRootAndMeta()
int assignRootAndMeta(MonitoredTask status)
throws InterruptedException, IOException, KeeperException {
int assigned = 0;
long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
// Work on ROOT region. Is it in zk in transition?
status.setStatus("Assigning ROOT region");
boolean rit = this.assignmentManager.
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO);
if (!catalogTracker.verifyRootRegionLocation(timeout)) {
@ -474,6 +495,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
", location=" + catalogTracker.getRootLocation());
// Work on meta region
status.setStatus("Assigning META region");
rit = this.assignmentManager.
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) {
@ -490,6 +512,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
}
LOG.info(".META. assigned=" + assigned + ", rit=" + rit +
", location=" + catalogTracker.getMetaLocation());
status.setStatus("META and ROOT assigned.");
return assigned;
}
@ -1101,15 +1124,21 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":"
+ this.serverName.getPort(), this);
if (!becomeActiveMaster()) {
return false;
MonitoredTask status =
TaskMonitor.get().createStatus("Recovering expired ZK session");
try {
if (!becomeActiveMaster(status)) {
return false;
}
initializeZKBasedSystemTrackers();
// Update in-memory structures to reflect our earlier Root/Meta assignment.
assignRootAndMeta(status);
// process RIT if any
this.assignmentManager.processRegionsInTransition();
return true;
} finally {
status.cleanup();
}
initializeZKBasedSystemTrackers();
// Update in-memory structures to reflect our earlier Root/Meta assignment.
assignRootAndMeta();
// process RIT if any
this.assignmentManager.processRegionsInTransition();
return true;
}
/**

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
/**
* The ServerManager class manages info about region servers.
@ -466,7 +467,7 @@ public class ServerManager {
* Waits for the regionservers to report in.
* @throws InterruptedException
*/
public void waitForRegionServers()
public void waitForRegionServers(MonitoredTask status)
throws InterruptedException {
long interval = this.master.getConfiguration().
getLong("hbase.master.wait.on.regionservers.interval", 3000);
@ -477,11 +478,15 @@ public class ServerManager {
Thread.sleep(interval);
count = countOfRegionServers();
if (count == oldcount && count > 0) break;
String msg;
if (count == 0) {
LOG.info("Waiting on regionserver(s) to checkin");
msg = "Waiting on regionserver(s) to checkin";
} else {
LOG.info("Waiting on regionserver(s) count to settle; currently=" + count);
msg = "Waiting on regionserver(s) count to settle; currently=" + count;
}
LOG.info(msg);
status.setStatus(msg);
oldcount = count;
}
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
@ -183,15 +185,22 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.warn(logDir + " doesn't exist. Nothing to do!");
return 0;
}
MonitoredTask status = TaskMonitor.get().createStatus(
"Doing distributed log split in " + logDir);
status.setStatus("Checking directory contents...");
FileStatus[] logfiles = fs.listStatus(logDir); // TODO filter filenames?
if (logfiles == null || logfiles.length == 0) {
LOG.info(logDir + " is empty dir, no logs to split");
return 0;
}
status.setStatus("Scheduling batch of logs to split");
tot_mgr_log_split_batch_start.incrementAndGet();
LOG.info("started splitting logs in " + logDir);
long t = EnvironmentEdgeManager.currentTimeMillis();
long totalSize = 0;
long totalSize = 0;
TaskBatch batch = new TaskBatch();
for (FileStatus lf : logfiles) {
// TODO If the log file is still being written to - which is most likely
@ -205,7 +214,7 @@ public class SplitLogManager extends ZooKeeperListener {
+ lf.getPath());
}
}
waitTasks(batch);
waitTasks(batch, status);
if (batch.done != batch.installed) {
stopTrackingTasks(batch);
tot_mgr_log_split_batch_err.incrementAndGet();
@ -214,6 +223,8 @@ public class SplitLogManager extends ZooKeeperListener {
throw new IOException("error or interrupt while splitting logs in "
+ logDir + " Task = " + batch);
}
status.setStatus("Checking for orphaned logs in log directory...");
if (anyNewLogFiles(logDir, logfiles)) {
tot_mgr_new_unexpected_hlogs.incrementAndGet();
LOG.warn("new hlogs were produced while logs in " + logDir +
@ -221,12 +232,18 @@ public class SplitLogManager extends ZooKeeperListener {
throw new OrphanHLogAfterSplitException();
}
tot_mgr_log_split_batch_success.incrementAndGet();
status.setStatus("Cleaning up log directory...");
if (!fs.delete(logDir, true)) {
throw new IOException("Unable to delete src dir: " + logDir);
}
LOG.info("finished splitting (more than or equal to) " + totalSize +
String msg = "finished splitting (more than or equal to) " + totalSize +
" bytes in " + batch.installed + " log files in " + logDir + " in " +
(EnvironmentEdgeManager.currentTimeMillis() - t) + "ms");
(EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
status.markComplete(msg);
LOG.info(msg);
return totalSize;
}
@ -244,10 +261,14 @@ public class SplitLogManager extends ZooKeeperListener {
return false;
}
private void waitTasks(TaskBatch batch) {
private void waitTasks(TaskBatch batch, MonitoredTask status) {
synchronized (batch) {
while ((batch.done + batch.error) != batch.installed) {
try {
status.setStatus("Waiting for distributed tasks to finish. "
+ " scheduled=" + batch.installed
+ " done=" + batch.done
+ " error=" + batch.error);
batch.wait(100);
if (stopper.isStopped()) {
LOG.warn("Stopped while waiting for log splits to be completed");

View File

@ -0,0 +1,53 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.monitoring;
public interface MonitoredTask {
enum State {
RUNNING,
COMPLETE,
ABORTED;
}
public abstract long getStartTime();
public abstract String getDescription();
public abstract String getStatus();
public abstract State getState();
public abstract long getCompletionTimestamp();
public abstract void markComplete(String msg);
public abstract void abort(String msg);
public abstract void setStatus(String status);
public abstract void setDescription(String description);
/**
* Explicitly mark this status as able to be cleaned up,
* even though it might not be complete.
*/
public abstract void cleanup();
}

View File

@ -0,0 +1,102 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.monitoring;
import com.google.common.annotations.VisibleForTesting;
class MonitoredTaskImpl implements MonitoredTask {
private long startTime;
private long completionTimestamp = -1;
private String status;
private String description;
private State state = State.RUNNING;
public MonitoredTaskImpl() {
startTime = System.currentTimeMillis();
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public String getDescription() {
return description;
}
@Override
public String getStatus() {
return status;
}
@Override
public State getState() {
return state;
}
@Override
public long getCompletionTimestamp() {
return completionTimestamp;
}
@Override
public void markComplete(String status) {
state = State.COMPLETE;
setStatus(status);
completionTimestamp = System.currentTimeMillis();
}
@Override
public void abort(String msg) {
setStatus(msg);
state = State.ABORTED;
completionTimestamp = System.currentTimeMillis();
}
@Override
public void setStatus(String status) {
this.status = status;
}
@Override
public void setDescription(String description) {
this.description = description;
}
@Override
public void cleanup() {
if (state == State.RUNNING) {
state = State.ABORTED;
completionTimestamp = System.currentTimeMillis();
}
}
/**
* Force the completion timestamp backwards so that
* it expires now.
*/
@VisibleForTesting
void expireNow() {
completionTimestamp -= 180 * 1000;
}
}

View File

@ -0,0 +1,176 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.monitoring;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
/**
* Singleton which keeps track of tasks going on in this VM.
* A Task here is anything which takes more than a few seconds
* and the user might want to inquire about the status
*/
public class TaskMonitor {
private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
// Don't keep around any tasks that have completed more than
// 60 seconds ago
private static final long EXPIRATION_TIME = 60*1000;
@VisibleForTesting
static final int MAX_TASKS = 1000;
private static TaskMonitor instance;
private List<TaskAndWeakRefPair> tasks =
Lists.newArrayList();
/**
* Get singleton instance.
* TODO this would be better off scoped to a single daemon
*/
public static synchronized TaskMonitor get() {
if (instance == null) {
instance = new TaskMonitor();
}
return instance;
}
public MonitoredTask createStatus(String description) {
MonitoredTask stat = new MonitoredTaskImpl();
stat.setDescription(description);
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(
stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredTask.class },
new PassthroughInvocationHandler<MonitoredTask>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair);
return proxy;
}
private synchronized void purgeExpiredTasks() {
int size = 0;
for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
it.hasNext();) {
TaskAndWeakRefPair pair = it.next();
MonitoredTask stat = pair.get();
if (pair.isDead()) {
// The class who constructed this leaked it. So we can
// assume it's done.
if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
LOG.warn("Status " + stat + " appears to have been leaked");
stat.cleanup();
}
}
if (canPurge(stat)) {
it.remove();
} else {
size++;
}
}
if (size > MAX_TASKS) {
LOG.warn("Too many actions in action monitor! Purging some.");
tasks = tasks.subList(size - MAX_TASKS, size);
}
}
public synchronized List<MonitoredTask> getTasks() {
purgeExpiredTasks();
ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
for (TaskAndWeakRefPair pair : tasks) {
ret.add(pair.get());
}
return ret;
}
private boolean canPurge(MonitoredTask stat) {
long cts = stat.getCompletionTimestamp();
return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
}
/**
* This class encapsulates an object as well as a weak reference to a proxy
* that passes through calls to that object. In art form:
* <code>
* Proxy <------------------
* | \
* v \
* PassthroughInvocationHandler | weak reference
* | /
* MonitoredTaskImpl /
* | /
* StatAndWeakRefProxy ------/
*
* Since we only return the Proxy to the creator of the MonitorableStatus,
* this means that they can leak that object, and we'll detect it
* since our weak reference will go null. But, we still have the actual
* object, so we can log it and display it as a leaked (incomplete) action.
*/
private static class TaskAndWeakRefPair {
private MonitoredTask impl;
private WeakReference<MonitoredTask> weakProxy;
public TaskAndWeakRefPair(MonitoredTask stat,
MonitoredTask proxy) {
this.impl = stat;
this.weakProxy = new WeakReference<MonitoredTask>(proxy);
}
public MonitoredTask get() {
return impl;
}
public boolean isDead() {
return weakProxy.get() == null;
}
}
/**
* An InvocationHandler that simply passes through calls to the original object.
*/
private static class PassthroughInvocationHandler<T> implements InvocationHandler {
private T delegatee;
public PassthroughInvocationHandler(T delegatee) {
this.delegatee = delegatee;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
return method.invoke(delegatee, args);
}
}
}

View File

@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -349,7 +351,12 @@ public class HRegion implements HeapSize { // , Writable{
*/
public long initialize(final CancelableProgressable reporter)
throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Initializing region " + this);
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-open hook");
coprocessorHost.preOpen();
}
// A region can be reopened if failed a split; reset flags
@ -357,14 +364,17 @@ public class HRegion implements HeapSize { // , Writable{
this.closed.set(false);
// Write HRI to a file in case we need to recover .META.
status.setStatus("Writing region info on filesystem");
checkRegioninfoOnFilesystem();
// Remove temporary data left over from old regions
status.setStatus("Cleaning up temporary data from old regions");
cleanupTmpDir();
// Load in all the HStores. Get maximum seqid.
long maxSeqId = -1;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
status.setStatus("Instantiating store for column family " + c);
Store store = instantiateHStore(this.tableDir, c);
this.stores.put(c.getName(), store);
long storeSeqId = store.getMaxSequenceId();
@ -373,8 +383,10 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// Recover any edits if available.
maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
maxSeqId = replayRecoveredEditsIfAny(
this.regiondir, maxSeqId, reporter, status);
status.setStatus("Cleaning up detritus from prior splits");
// Get rid of any splits or merges that were lost in-progress. Clean out
// these directories here on open. We may be opening a region that was
// being split but we crashed in the middle of it all.
@ -390,9 +402,12 @@ public class HRegion implements HeapSize { // , Writable{
long nextSeqid = maxSeqId + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
}
status.markComplete("Region opened successfully");
return nextSeqid;
}
@ -556,12 +571,22 @@ public class HRegion implements HeapSize { // , Writable{
public List<StoreFile> close(final boolean abort) throws IOException {
// Only allow one thread to close at a time. Serialize them so dual
// threads attempting to close will run up against each other.
synchronized (closeLock) {
return doClose(abort);
MonitoredTask status = TaskMonitor.get().createStatus(
"Closing region " + this +
(abort ? " due to abort" : ""));
status.setStatus("Waiting for close lock");
try {
synchronized (closeLock) {
return doClose(abort, status);
}
} finally {
status.cleanup();
}
}
private List<StoreFile> doClose(final boolean abort)
private List<StoreFile> doClose(
final boolean abort, MonitoredTask status)
throws IOException {
if (isClosed()) {
LOG.warn("Region " + this + " already closed");
@ -569,9 +594,11 @@ public class HRegion implements HeapSize { // , Writable{
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-close hooks");
this.coprocessorHost.preClose(abort);
}
status.setStatus("Disabling compacts and flushes for region");
boolean wasFlushing = false;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
@ -596,20 +623,24 @@ public class HRegion implements HeapSize { // , Writable{
// that will clear out of the bulk of the memstore before we put up
// the close flag?
if (!abort && !wasFlushing && worthPreFlushing()) {
status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
internalFlushcache();
internalFlushcache(status);
}
this.closing.set(true);
status.setStatus("Disabling writes for close");
lock.writeLock().lock();
try {
if (this.isClosed()) {
status.abort("Already got closed by another process");
// SplitTransaction handles the null
return null;
}
LOG.debug("Updates disabled for region " + this);
// Don't flush the cache if we are aborting
if (!abort) {
internalFlushcache();
internalFlushcache(status);
}
List<StoreFile> result = new ArrayList<StoreFile>();
@ -619,8 +650,10 @@ public class HRegion implements HeapSize { // , Writable{
this.closed.set(true);
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
}
status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
} finally {
@ -824,6 +857,8 @@ public class HRegion implements HeapSize { // , Writable{
lock.readLock().lock();
this.lastCompactInfo = null;
byte [] splitRow = null;
MonitoredTask status = TaskMonitor.get().createStatus(
"Compacting stores in " + this);
try {
if (this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closed");
@ -833,6 +868,7 @@ public class HRegion implements HeapSize { // , Writable{
return splitRow;
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor preCompact hooks");
coprocessorHost.preCompact(false);
}
try {
@ -840,10 +876,12 @@ public class HRegion implements HeapSize { // , Writable{
if (!writestate.compacting && writestate.writesEnabled) {
writestate.compacting = true;
} else {
LOG.info("NOT compacting region " + this +
String msg = "NOT compacting region " + this +
": compacting=" + writestate.compacting + ", writesEnabled=" +
writestate.writesEnabled);
return splitRow;
writestate.writesEnabled;
LOG.info(msg);
status.abort(msg);
return splitRow;
}
}
LOG.info("Starting compaction on region " + this);
@ -852,6 +890,7 @@ public class HRegion implements HeapSize { // , Writable{
long lastCompactSize = 0;
boolean completed = false;
try {
status.setStatus("Compacting store " + store);
final Store.StoreSize ss = store.compact();
lastCompactSize += store.getLastCompactSize();
if (ss != null) {
@ -868,6 +907,9 @@ public class HRegion implements HeapSize { // , Writable{
if (completed) {
this.lastCompactInfo =
new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
status.setStatus("Compaction complete: " +
StringUtils.humanReadableInt(lastCompactSize) + " in " +
(now - startTime) + "ms");
}
}
} finally {
@ -877,9 +919,13 @@ public class HRegion implements HeapSize { // , Writable{
}
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-compact hooks");
coprocessorHost.postCompact(splitRow != null);
}
status.markComplete("Compaction complete");
} finally {
status.cleanup();
lock.readLock().unlock();
}
if (splitRow != null) {
@ -915,13 +961,17 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Skipping flush on " + this + " because closing");
return false;
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.setStatus("Acquiring readlock on region");
lock.readLock().lock();
try {
if (this.closed.get()) {
LOG.debug("Skipping flush on " + this + " because closed");
status.abort("Skipped: closed");
return false;
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush();
}
try {
@ -935,13 +985,19 @@ public class HRegion implements HeapSize { // , Writable{
writestate.flushing + ", writesEnabled=" +
writestate.writesEnabled);
}
status.abort("Not flushing since " +
(writestate.flushing ? "already flushing" : "writes not enabled"));
return false;
}
}
boolean result = internalFlushcache();
boolean result = internalFlushcache(status);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
coprocessorHost.postFlush();
}
status.markComplete("Flush successful");
return result;
} finally {
synchronized (writestate) {
@ -952,6 +1008,7 @@ public class HRegion implements HeapSize { // , Writable{
}
} finally {
lock.readLock().unlock();
status.cleanup();
}
}
@ -982,6 +1039,7 @@ public class HRegion implements HeapSize { // , Writable{
* routes.
*
* <p> This method may block for some time.
* @param status
*
* @return true if the region needs compacting
*
@ -989,19 +1047,21 @@ public class HRegion implements HeapSize { // , Writable{
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
protected boolean internalFlushcache() throws IOException {
return internalFlushcache(this.log, -1);
protected boolean internalFlushcache(MonitoredTask status) throws IOException {
return internalFlushcache(this.log, -1, status);
}
/**
* @param wal Null if we're NOT to go via hlog/wal.
* @param myseqid The seqid to use if <code>wal</code> is null writing out
* flush file.
* @param status
* @return true if the region needs compacting
* @throws IOException
* @see #internalFlushcache()
*/
protected boolean internalFlushcache(final HLog wal, final long myseqid)
protected boolean internalFlushcache(
final HLog wal, final long myseqid, MonitoredTask status)
throws IOException {
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Clear flush flag.
@ -1031,7 +1091,9 @@ public class HRegion implements HeapSize { // , Writable{
// We have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
status.setStatus("Obtaining lock to block concurrent updates");
this.updatesLock.writeLock().lock();
status.setStatus("Preparing to flush by snapshotting stores");
final long currentMemStoreSize = this.memstoreSize.get();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
@ -1049,6 +1111,7 @@ public class HRegion implements HeapSize { // , Writable{
} finally {
this.updatesLock.writeLock().unlock();
}
status.setStatus("Flushing stores");
LOG.debug("Finished snapshotting, commencing flushing stores");
@ -1063,7 +1126,7 @@ public class HRegion implements HeapSize { // , Writable{
// just-made new flush store file.
for (StoreFlusher flusher : storeFlushers) {
flusher.flushCache();
flusher.flushCache(status);
}
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
@ -1088,6 +1151,7 @@ public class HRegion implements HeapSize { // , Writable{
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionName()));
dse.initCause(t);
status.abort("Flush failed: " + StringUtils.stringifyException(t));
throw dse;
}
@ -1111,13 +1175,13 @@ public class HRegion implements HeapSize { // , Writable{
}
long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
if (LOG.isDebugEnabled()) {
LOG.info("Finished memstore flush of ~" +
String msg = "Finished memstore flush of ~" +
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
this + " in " + time + "ms, sequenceid=" + sequenceId +
", compaction requested=" + compactionRequested +
((wal == null)? "; wal=null": ""));
}
((wal == null)? "; wal=null": "");
LOG.info(msg);
status.setStatus(msg);
this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
return compactionRequested;
@ -2020,7 +2084,8 @@ public class HRegion implements HeapSize { // , Writable{
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
final long minSeqId, final CancelableProgressable reporter)
final long minSeqId, final CancelableProgressable reporter,
final MonitoredTask status)
throws UnsupportedEncodingException, IOException {
long seqid = minSeqId;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
@ -2046,7 +2111,7 @@ public class HRegion implements HeapSize { // , Writable{
}
if (seqid > minSeqId) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid);
internalFlushcache(null, seqid, status);
}
// Now delete the content of recovered edits. We're done w/ them.
for (Path file: files) {
@ -2071,7 +2136,11 @@ public class HRegion implements HeapSize { // , Writable{
private long replayRecoveredEdits(final Path edits,
final long minSeqId, final CancelableProgressable reporter)
throws IOException {
LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
String msg = "Replaying edits from " + edits + "; minSequenceid=" + minSeqId;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
status.setStatus("Opening logs");
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
long currentEditSeqId = minSeqId;
@ -2103,10 +2172,14 @@ public class HRegion implements HeapSize { // , Writable{
intervalEdits = 0;
long cur = EnvironmentEdgeManager.currentTimeMillis();
if (lastReport + period <= cur) {
status.setStatus("Replaying edits..." +
" skipped=" + skippedEdits +
" edits=" + editsCount);
// Timeout reached
if(!reporter.progress()) {
String msg = "Progressable reporter failed, stopping replay";
msg = "Progressable reporter failed, stopping replay";
LOG.warn(msg);
status.abort(msg);
throw new IOException(msg);
}
lastReport = cur;
@ -2117,6 +2190,7 @@ public class HRegion implements HeapSize { // , Writable{
// Start coprocessor replay here. The coprocessor is for each WALEdit
// instead of a KeyValue.
if (coprocessorHost != null) {
status.setStatus("Running pre-WAL-restore hook in coprocessors");
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
// if bypass this log entry, ignore it ...
continue;
@ -2158,7 +2232,7 @@ public class HRegion implements HeapSize { // , Writable{
flush = restoreEdit(store, kv);
editsCount++;
}
if (flush) internalFlushcache(null, currentEditSeqId);
if (flush) internalFlushcache(null, currentEditSeqId, status);
if (coprocessorHost != null) {
coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
@ -2166,30 +2240,39 @@ public class HRegion implements HeapSize { // , Writable{
}
} catch (EOFException eof) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
LOG.warn("Encountered EOF. Most likely due to Master failure during " +
msg = "Encountered EOF. Most likely due to Master failure during " +
"log spliting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p, eof);
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, eof);
status.abort(msg);
} catch (IOException ioe) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
LOG.warn("File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p, ioe);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
status.setStatus(msg);
} else {
status.abort(StringUtils.stringifyException(ioe));
// other IO errors may be transient (bad network connection,
// checksum exception on one datanode, etc). throw & retry
throw ioe;
}
}
msg = "Applied " + editsCount + ", skipped " + skippedEdits +
", firstSequenceidInLog=" + firstSeqIdInLog +
", maxSequenceidInLog=" + currentEditSeqId;
status.markComplete(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
", firstSequenceidInLog=" + firstSeqIdInLog +
", maxSequenceidInLog=" + currentEditSeqId);
LOG.debug(msg);
}
return currentEditSeqId;
} finally {
reader.close();
status.cleanup();
}
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -448,11 +449,13 @@ public class Store implements HeapSize {
*/
private StoreFile flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot,
TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
TimeRangeTracker snapshotTimeRangeTracker,
MonitoredTask status) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
return internalFlushCache(
snapshot, logCacheFlushId, snapshotTimeRangeTracker, status);
}
/*
@ -463,7 +466,8 @@ public class Store implements HeapSize {
*/
private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
final long logCacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker)
TimeRangeTracker snapshotTimeRangeTracker,
MonitoredTask status)
throws IOException {
StoreFile.Writer writer = null;
long flushed = 0;
@ -476,6 +480,7 @@ public class Store implements HeapSize {
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + this + ": creating writer");
// A. Write the map out to the disk
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
@ -491,18 +496,23 @@ public class Store implements HeapSize {
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
writer.close();
}
}
// Write-out finished successfully, move into the right spot
Path dstPath = StoreFile.getUniqueFile(fs, homedir);
LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
LOG.info(msg);
status.setStatus("Flushing " + this + ": " + msg);
if (!fs.rename(writer.getPath(), dstPath)) {
LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
}
status.setStatus("Flushing " + this + ": reopening flushed file");
StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
this.conf, this.family.getBloomFilterType(), this.inMemory);
StoreFile.Reader r = sf.createReader();
@ -1593,8 +1603,9 @@ public class Store implements HeapSize {
}
@Override
public void flushCache() throws IOException {
storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
public void flushCache(MonitoredTask status) throws IOException {
storeFile = Store.this.flushCache(
cacheFlushId, snapshot, snapshotTimeRangeTracker, status);
}
@Override

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
/**
* A package protected interface for a store flushing.
* A store flusher carries the state required to prepare/flush/commit the
@ -45,7 +47,7 @@ interface StoreFlusher {
*
* @throws IOException in case the flush fails
*/
void flushCache() throws IOException;
void flushCache(MonitoredTask status) throws IOException;
/**
* Commit the flush - add the store file to the store and clear the

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
@ -70,7 +72,6 @@ import com.google.common.collect.Lists;
* region to replay on startup. Delete the old log files when finished.
*/
public class HLogSplitter {
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
/**
@ -106,6 +107,8 @@ public class HLogSplitter {
// Wait/notify for when data has been produced by the reader thread,
// consumed by the reader thread, or an exception occurred
Object dataAvailable = new Object();
private MonitoredTask status;
/**
@ -179,10 +182,16 @@ public class HLogSplitter {
"An HLogSplitter instance may only be used once");
hasSplit = true;
status = TaskMonitor.get().createStatus(
"Splitting logs in " + srcDir);
long startTime = EnvironmentEdgeManager.currentTimeMillis();
status.setStatus("Determining files to split...");
List<Path> splits = null;
if (!fs.exists(srcDir)) {
// Nothing to do
status.markComplete("No log directory existed to split.");
return splits;
}
FileStatus[] logfiles = fs.listStatus(srcDir);
@ -190,15 +199,20 @@ public class HLogSplitter {
// Nothing to do
return splits;
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
logAndReport("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
splits = splitLog(logfiles);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
LOG.info("hlog file splitting completed in " + splitTime +
logAndReport("hlog file splitting completed in " + splitTime +
" ms for " + srcDir.toString());
return splits;
}
private void logAndReport(String msg) {
status.setStatus(msg);
LOG.info(msg);
}
/**
* @return time that this split took
@ -252,6 +266,7 @@ public class HLogSplitter {
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
long totalBytesToSplit = countTotalBytes(logfiles);
splitSize = 0;
outputSink.startWriterThreads(entryBuffers);
@ -262,7 +277,7 @@ public class HLogSplitter {
Path logPath = log.getPath();
long logLength = log.getLen();
splitSize += logLength;
LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+ ": " + logPath + ", length=" + logLength);
Reader in;
try {
@ -284,19 +299,35 @@ public class HLogSplitter {
continue;
}
}
status.setStatus("Log splits complete. Checking for orphaned logs.");
if (fs.listStatus(srcDir).length > processedLogs.size()
+ corruptedLogs.size()) {
throw new OrphanHLogAfterSplitException(
"Discovered orphan hlog after split. Maybe the "
+ "HRegionServer was not dead when we started");
}
status.setStatus("Archiving logs after completed split");
archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
} finally {
status.setStatus("Finishing writing output logs and closing down.");
splits = outputSink.finishWritingAndClose();
}
return splits;
}
/**
* @return the total size of the passed list of files.
*/
private static long countTotalBytes(FileStatus[] logfiles) {
long ret = 0;
for (FileStatus stat : logfiles) {
ret += stat.getLen();
}
return ret;
}
/**
* Splits a HLog file into a temporary staging area. tmpname is used to build
* the name of the staging area where the recovered-edits will be separated
@ -328,6 +359,11 @@ public class HLogSplitter {
final Map<byte[], Object> logWriters = Collections.
synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
boolean isCorrupted = false;
Preconditions.checkState(status == null);
status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() +
"into a temporary staging area.");
Object BAD_WRITER = new Object();
@ -342,6 +378,7 @@ public class HLogSplitter {
Path logPath = logfile.getPath();
long logLength = logfile.getLen();
LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
status.setStatus("Opening log file");
Reader in = null;
try {
in = getReader(fs, logfile, conf, skipErrors);
@ -351,12 +388,14 @@ public class HLogSplitter {
isCorrupted = true;
}
if (in == null) {
status.markComplete("Was nothing to split in log file");
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
long t = EnvironmentEdgeManager.currentTimeMillis();
long last_report_at = t;
if (reporter != null && reporter.progress() == false) {
status.markComplete("Failed: reporter.progress asked us to terminate");
return false;
}
int editsCount = 0;
@ -380,10 +419,12 @@ public class HLogSplitter {
wap.w.append(entry);
editsCount++;
if (editsCount % interval == 0) {
status.setStatus("Split " + editsCount + " edits");
long t1 = EnvironmentEdgeManager.currentTimeMillis();
if ((t1 - last_report_at) > period) {
last_report_at = t;
if (reporter != null && reporter.progress() == false) {
status.markComplete("Failed: reporter.progress asked us to terminate");
progress_failed = true;
return false;
}
@ -416,10 +457,12 @@ public class HLogSplitter {
wap.w.close();
LOG.debug("Closed " + wap.p);
}
LOG.info("processed " + editsCount + " edits across " + n + " regions" +
String msg = ("processed " + editsCount + " edits across " + n + " regions" +
" threw away edits for " + (logWriters.size() - n) + " regions" +
" log file = " + logPath +
" is corrupted = " + isCorrupted);
LOG.info(msg);
status.markComplete(msg);
}
return true;
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@ -41,6 +42,7 @@ import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test the {@link ActiveMasterManager}.
@ -77,7 +79,8 @@ public class TestActiveMasterManager {
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
// First test becoming the active master uninterrupted
activeMasterManager.blockUntilBecomingActiveMaster();
MonitoredTask status = Mockito.mock(MonitoredTask.class);
activeMasterManager.blockUntilBecomingActiveMaster(status);
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
assertMaster(zk, master);
@ -87,7 +90,7 @@ public class TestActiveMasterManager {
master, secondDummyMaster);
zk.registerListener(secondActiveMasterManager);
assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
activeMasterManager.blockUntilBecomingActiveMaster();
activeMasterManager.blockUntilBecomingActiveMaster(status);
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
assertMaster(zk, master);
}
@ -120,7 +123,8 @@ public class TestActiveMasterManager {
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
// First test becoming the active master uninterrupted
activeMasterManager.blockUntilBecomingActiveMaster();
activeMasterManager.blockUntilBecomingActiveMaster(
Mockito.mock(MonitoredTask.class));
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
assertMaster(zk, firstMasterAddress);
@ -201,7 +205,8 @@ public class TestActiveMasterManager {
@Override
public void run() {
manager.blockUntilBecomingActiveMaster();
manager.blockUntilBecomingActiveMaster(
Mockito.mock(MonitoredTask.class));
LOG.info("Second master has become the active master!");
isActiveMaster = true;
}

View File

@ -0,0 +1,101 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.monitoring;
import static org.junit.Assert.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
public class TestTaskMonitor {
@Test
public void testTaskMonitorBasics() {
TaskMonitor tm = new TaskMonitor();
assertTrue("Task monitor should start empty",
tm.getTasks().isEmpty());
// Make a task and fetch it back out
MonitoredTask task = tm.createStatus("Test task");
MonitoredTask taskFromTm = tm.getTasks().get(0);
// Make sure the state is reasonable.
assertEquals(task.getDescription(), taskFromTm.getDescription());
assertEquals(-1, taskFromTm.getCompletionTimestamp());
assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
// Mark it as finished
task.markComplete("Finished!");
assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState());
// It should still show up in the TaskMonitor list
assertEquals(1, tm.getTasks().size());
// If we mark its completion time back a few minutes, it should get gced
((MonitoredTaskImpl)taskFromTm).expireNow();
assertEquals(0, tm.getTasks().size());
}
@Test
public void testTasksGetAbortedOnLeak() throws InterruptedException {
final TaskMonitor tm = new TaskMonitor();
assertTrue("Task monitor should start empty",
tm.getTasks().isEmpty());
final AtomicBoolean threadSuccess = new AtomicBoolean(false);
// Make a task in some other thread and leak it
Thread t = new Thread() {
@Override
public void run() {
MonitoredTask task = tm.createStatus("Test task");
assertEquals(MonitoredTask.State.RUNNING, task.getState());
threadSuccess.set(true);
}
};
t.start();
t.join();
// Make sure the thread saw the correct state
assertTrue(threadSuccess.get());
// Make sure the leaked reference gets cleared
System.gc();
System.gc();
System.gc();
// Now it should be aborted
MonitoredTask taskFromTm = tm.getTasks().get(0);
assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
}
@Test
public void testTaskLimit() throws Exception {
TaskMonitor tm = new TaskMonitor();
for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) {
tm.createStatus("task " + i);
}
// Make sure it was limited correctly
assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size());
// Make sure we culled the earlier tasks, not later
// (i.e. tasks 0 through 9 should have been deleted)
assertEquals("task 10", tm.getTasks().get(0).getDescription());
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.mockito.Mockito;
import com.google.common.base.Joiner;
@ -600,7 +602,7 @@ public class TestStore extends TestCase {
private static void flushStore(Store store, long id) throws IOException {
StoreFlusher storeFlusher = store.getStoreFlusher(id);
storeFlusher.prepare();
storeFlusher.flushCache();
storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
storeFlusher.commit();
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
@ -55,6 +56,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test replay of edits out of a WAL split.
@ -394,7 +396,8 @@ public class TestWALReplay {
null) {
protected boolean internalFlushcache(HLog wal, long myseqid)
throws IOException {
boolean b = super.internalFlushcache(wal, myseqid);
boolean b = super.internalFlushcache(wal, myseqid,
Mockito.mock(MonitoredTask.class));
flushcount.incrementAndGet();
return b;
};