HBASE-6778 Deprecate Chore; its a thread per task when we should have one thread to do all tasks (Jonathan Lawlor)

This commit is contained in:
stack 2015-01-29 19:26:26 -08:00
parent c61c17b6f8
commit 538388c2b5
45 changed files with 381 additions and 476 deletions

View File

@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
@ -579,8 +577,6 @@ final class ConnectionManager {
private final Object masterAndZKLock = new Object();
private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
private final DelayedClosing delayedClosing =
DelayedClosing.createAndStart(this);
// thread executor shared by all HTableInterface instances created
// by this connection
@ -1387,7 +1383,6 @@ final class ConnectionManager {
HConnection connection;
MasterService.BlockingInterface stub;
int userCount;
long keepAliveUntil = Long.MAX_VALUE;
MasterServiceState(final HConnection connection) {
super();
@ -1633,71 +1628,6 @@ final class ConnectionManager {
}
}
/**
* Creates a Chore thread to check the connections to master & zookeeper
* and close them when they reach their closing time (
* {@link MasterServiceState#keepAliveUntil} and
* {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
* managed by the release functions and the variable {@link #keepAlive}
*/
private static final class DelayedClosing extends Chore implements Stoppable {
private HConnectionImplementation hci;
Stoppable stoppable;
private DelayedClosing(
HConnectionImplementation hci, Stoppable stoppable){
super(
"ZooKeeperWatcher and Master delayed closing for connection "+hci,
60*1000, // We check every minutes
stoppable);
this.hci = hci;
this.stoppable = stoppable;
}
static DelayedClosing createAndStart(HConnectionImplementation hci){
Stoppable stoppable = new Stoppable() {
private volatile boolean isStopped = false;
@Override public void stop(String why) { isStopped = true;}
@Override public boolean isStopped() {return isStopped;}
};
return new DelayedClosing(hci, stoppable);
}
protected void closeMasterProtocol(MasterServiceState protocolState) {
if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
hci.closeMasterService(protocolState);
protocolState.keepAliveUntil = Long.MAX_VALUE;
}
}
@Override
protected void chore() {
synchronized (hci.masterAndZKLock) {
if (hci.canCloseZKW) {
if (System.currentTimeMillis() >
hci.keepZooKeeperWatcherAliveUntil) {
hci.closeZooKeeperWatcher();
hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
}
}
closeMasterProtocol(hci.masterServiceState);
closeMasterProtocol(hci.masterServiceState);
}
}
@Override
public void stop(String why) {
stoppable.stop(why);
}
@Override
public boolean isStopped() {
return stoppable.isStopped();
}
}
private void closeZooKeeperWatcher() {
synchronized (masterAndZKLock) {
if (keepAliveZookeeper != null) {
@ -1720,7 +1650,6 @@ final class ConnectionManager {
private void resetMasterServiceState(final MasterServiceState mss) {
mss.userCount++;
mss.keepAliveUntil = Long.MAX_VALUE;
}
@Override
@ -2085,9 +2014,6 @@ final class ConnectionManager {
if (mss.getStub() == null) return;
synchronized (masterAndZKLock) {
--mss.userCount;
if (mss.userCount <= 0) {
mss.keepAliveUntil = System.currentTimeMillis() + keepAlive;
}
}
}
@ -2408,7 +2334,6 @@ final class ConnectionManager {
if (this.closed) {
return;
}
delayedClosing.stop("Closing connection");
closeMaster();
shutdownBatchPool();
this.closed = true;

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.security.UserGroupInformation;
@ -47,12 +46,12 @@ public class AuthUtil {
/**
* Checks if security is enabled and if so, launches chore for refreshing kerberos ticket.
*/
public static void launchAuthChore(Configuration conf) throws IOException {
public static ScheduledChore getAuthChore(Configuration conf) throws IOException {
UserProvider userProvider = UserProvider.instantiate(conf);
// login the principal (if using secure Hadoop)
boolean securityEnabled =
userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled();
if (!securityEnabled) return;
if (!securityEnabled) return null;
String host = null;
try {
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
@ -87,7 +86,8 @@ public class AuthUtil {
// e.g. 5min tgt * 0.8 = 4min refresh so interval is better be way less than 1min
final int CHECK_TGT_INTERVAL = 30 * 1000; // 30sec
Chore refreshCredentials = new Chore("RefreshCredentials", CHECK_TGT_INTERVAL, stoppable) {
ScheduledChore refreshCredentials =
new ScheduledChore("RefreshCredentials", stoppable, CHECK_TGT_INTERVAL) {
@Override
protected void chore() {
try {
@ -97,7 +97,7 @@ public class AuthUtil {
}
}
};
// Start the chore for refreshing credentials
Threads.setDaemonThreadRunning(refreshCredentials.getThread());
return refreshCredentials;
}
}

View File

@ -1,157 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Sleeper;
/**
* Chore is a task performed on a period in hbase. The chore is run in its own
* thread. This base abstract class provides while loop and sleeping facility.
* If an unhandled exception, the threads exit is logged.
* Implementers just need to add checking if there is work to be done and if
* so, do it. Its the base of most of the chore threads in hbase.
*
* <p>Don't subclass Chore if the task relies on being woken up for something to
* do, such as an entry being added to a queue, etc.
*/
@InterfaceAudience.Private
public abstract class Chore extends HasThread {
private final Log LOG = LogFactory.getLog(this.getClass());
private final Sleeper sleeper;
private final Stoppable stopper;
/**
* @param p Period at which we should run. Will be adjusted appropriately
* should we find work and it takes time to complete.
* @param stopper When {@link Stoppable#isStopped()} is true, this thread will
* cleanup and exit cleanly.
*/
public Chore(String name, final int p, final Stoppable stopper) {
super(name);
if (stopper == null){
throw new NullPointerException("stopper cannot be null");
}
this.sleeper = new Sleeper(p, stopper);
this.stopper = stopper;
}
/**
* This constructor is for test only. It allows to create an object and to call chore() on
* it. There is no sleeper nor stoppable.
*/
protected Chore(){
sleeper = null;
stopper = null;
}
/**
* @return the sleep period in milliseconds
*/
public final int getPeriod() {
return sleeper.getPeriod();
}
/**
* @see java.lang.Thread#run()
*/
@Override
public void run() {
try {
boolean initialChoreComplete = false;
while (!this.stopper.isStopped()) {
long startTime = System.currentTimeMillis();
try {
if (!initialChoreComplete) {
initialChoreComplete = initialChore();
} else {
chore();
}
} catch (Exception e) {
LOG.error("Caught exception", e);
if (this.stopper.isStopped()) {
continue;
}
}
this.sleeper.sleep(startTime);
}
} catch (Throwable t) {
LOG.fatal(getName() + "error", t);
} finally {
LOG.info(getName() + " exiting");
cleanup();
}
}
/**
* If the thread is currently sleeping, trigger the core to happen immediately.
* If it's in the middle of its operation, will begin another operation
* immediately after finishing this one.
*/
public void triggerNow() {
this.sleeper.skipSleepCycle();
}
/*
* Exposed for TESTING!
* calls directly the chore method, from the current thread.
*/
public void choreForTesting() {
chore();
}
/**
* Override to run a task before we start looping.
* @return true if initial chore was successful
*/
protected boolean initialChore() {
// Default does nothing.
return true;
}
/**
* Look for chores. If any found, do them else just return.
*/
protected abstract void chore();
/**
* Sleep for period.
*/
protected void sleep() {
this.sleeper.sleep();
}
/**
* Called when the chore has completed, allowing subclasses to cleanup any
* extra overhead
*/
protected void cleanup() {
}
protected Stoppable getStopper() {
return stopper;
}
protected Sleeper getSleeper() {
return sleeper;
}
}

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.rest;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ParseFilter;
@ -74,7 +74,10 @@ public class RESTServlet implements Constants {
}
public synchronized static void stop() {
if (INSTANCE != null) INSTANCE = null;
if (INSTANCE != null) {
INSTANCE.shutdown();
INSTANCE = null;
}
}
/**
@ -130,6 +133,13 @@ public class RESTServlet implements Constants {
connectionCache.setEffectiveUser(effectiveUser);
}
/**
* Shutdown any services that need to stop
*/
void shutdown() {
if (connectionCache != null) connectionCache.shutdown();
}
boolean supportsProxyuser() {
return conf.getBoolean(HBASE_REST_SUPPORT_PROXYUSER, false);
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.util.StringUtils;
/**
* The Class HealthCheckChore for running health checker regularly.
*/
public class HealthCheckChore extends Chore {
public class HealthCheckChore extends ScheduledChore {
private static Log LOG = LogFactory.getLog(HealthCheckChore.class);
private HealthChecker healthChecker;
private Configuration config;
@ -38,7 +38,7 @@ import org.apache.hadoop.util.StringUtils;
private long startWindow;
public HealthCheckChore(int sleepTime, Stoppable stopper, Configuration conf) {
super("HealthChecker", sleepTime, stopper);
super("HealthChecker", stopper, sleepTime);
LOG.info("Health Check Chore runs every " + StringUtils.formatTime(sleepTime));
this.config = conf;
String healthCheckScript = this.config.get(HConstants.HEALTH_SCRIPT_LOC);

View File

@ -65,4 +65,9 @@ public interface Server extends Abortable, Stoppable {
* Get CoordinatedStateManager instance for this server.
*/
CoordinatedStateManager getCoordinatedStateManager();
/**
* @return The {@link ChoreService} instance for this server
*/
ChoreService getChoreService();
}

View File

@ -31,12 +31,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Triple;
* table on a period looking for unused regions to garbage collect.
*/
@InterfaceAudience.Private
public class CatalogJanitor extends Chore {
public class CatalogJanitor extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName());
private final Server server;
private final MasterServices services;
@ -66,9 +66,8 @@ public class CatalogJanitor extends Chore {
private final Connection connection;
CatalogJanitor(final Server server, final MasterServices services) {
super("CatalogJanitor-" + server.getServerName().toShortString(),
server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000),
server);
super("CatalogJanitor-" + server.getServerName().toShortString(), server, server
.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000));
this.server = server;
this.services = services;
this.connection = server.getConnection();

View File

@ -35,22 +35,7 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import io.netty.util.internal.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import java.io.Closeable;
import java.io.IOException;
@ -67,6 +52,22 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
/**
* Class to publish the cluster status to the client. This allows them to know immediately
@ -75,7 +76,7 @@ import java.util.concurrent.ConcurrentMap;
* on the client the different timeouts, as the dead servers will be detected separately.
*/
@InterfaceAudience.Private
public class ClusterStatusPublisher extends Chore {
public class ClusterStatusPublisher extends ScheduledChore {
/**
* The implementation class used to publish the status. Default is null (no publish).
* Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
@ -115,8 +116,8 @@ public class ClusterStatusPublisher extends Chore {
public ClusterStatusPublisher(HMaster master, Configuration conf,
Class<? extends Publisher> publisherClass)
throws IOException {
super("HBase clusterStatusPublisher for " + master.getName(),
conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
this.master = master;
this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
try {

View File

@ -384,7 +384,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
" is not set - not publishing status");
} else {
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
getChoreService().scheduleChore(clusterStatusPublisherChore);
}
}
activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
@ -726,11 +726,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// been assigned.
status.setStatus("Starting balancer and catalog janitor");
this.clusterStatusChore = new ClusterStatusChore(this, balancer);
Threads.setDaemonThreadRunning(clusterStatusChore.getThread());
getChoreService().scheduleChore(clusterStatusChore);
this.balancerChore = new BalancerChore(this);
Threads.setDaemonThreadRunning(balancerChore.getThread());
getChoreService().scheduleChore(balancerChore);
this.catalogJanitorChore = new CatalogJanitor(this, this);
Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
getChoreService().scheduleChore(catalogJanitorChore);
status.setStatus("Starting namespace manager");
initNamespace();
@ -1013,16 +1013,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
new LogCleaner(cleanerInterval,
this, conf, getMasterFileSystem().getFileSystem(),
getMasterFileSystem().getOldLogDir());
Threads.setDaemonThreadRunning(logCleaner.getThread(),
getServerName().toShortString() + ".oldLogCleaner");
getChoreService().scheduleChore(logCleaner);
//start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir);
Threads.setDaemonThreadRunning(hfileCleaner.getThread(),
getServerName().toShortString() + ".archivedHFileCleaner");
getChoreService().scheduleChore(hfileCleaner);
serviceStarted = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
@ -1051,8 +1048,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
LOG.debug("Stopping service threads");
}
// Clean up and close up shop
if (this.logCleaner!= null) this.logCleaner.interrupt();
if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
@ -1063,16 +1060,16 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private void stopChores() {
if (this.balancerChore != null) {
this.balancerChore.interrupt();
this.balancerChore.cancel(true);
}
if (this.clusterStatusChore != null) {
this.clusterStatusChore.interrupt();
this.clusterStatusChore.cancel(true);
}
if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt();
this.catalogJanitorChore.cancel(true);
}
if (this.clusterStatusPublisherChore != null){
clusterStatusPublisherChore.interrupt();
clusterStatusPublisherChore.cancel(true);
}
}

View File

@ -39,30 +39,31 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import com.google.common.annotations.VisibleForTesting;
@ -103,6 +104,7 @@ public class SplitLogManager {
private final Stoppable stopper;
private final Configuration conf;
private final ChoreService choreService;
public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
@ -139,6 +141,7 @@ public class SplitLogManager {
this.server = server;
this.conf = conf;
this.stopper = stopper;
this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_");
if (server.getCoordinatedStateManager() != null) {
SplitLogManagerCoordination coordination =
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
@ -155,8 +158,7 @@ public class SplitLogManager {
this.timeoutMonitor =
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
stopper);
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
+ ".splitLogManagerTimeoutMonitor");
choreService.scheduleChore(timeoutMonitor);
}
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
@ -529,8 +531,11 @@ public class SplitLogManager {
}
public void stop() {
if (choreService != null) {
choreService.shutdown();
}
if (timeoutMonitor != null) {
timeoutMonitor.interrupt();
timeoutMonitor.cancel(true);
}
}
@ -684,11 +689,11 @@ public class SplitLogManager {
/**
* Periodically checks all active tasks and resubmits the ones that have timed out
*/
private class TimeoutMonitor extends Chore {
private class TimeoutMonitor extends ScheduledChore {
private long lastLog = 0;
public TimeoutMonitor(final int period, Stoppable stopper) {
super("SplitLogManager Timeout Monitor", period, stopper);
super("SplitLogManager Timeout Monitor", stopper, period);
}
@Override

View File

@ -18,28 +18,27 @@
package org.apache.hadoop.hbase.master.balancer;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.master.HMaster;
import java.io.IOException;
/**
* Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when
* needed.
*/
@InterfaceAudience.Private
public class BalancerChore extends Chore {
public class BalancerChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(BalancerChore.class);
private final HMaster master;
public BalancerChore(HMaster master) {
super(master.getServerName() + "-BalancerChore",
master.getConfiguration().getInt("hbase.balancer.period", 300000),
master);
super(master.getServerName() + "-BalancerChore", master, master.getConfiguration().getInt(
"hbase.balancer.period", 300000));
this.master = master;
}

View File

@ -18,28 +18,27 @@
package org.apache.hadoop.hbase.master.balancer;
import java.io.InterruptedIOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import java.io.InterruptedIOException;
/**
* Chore that will feed the balancer the cluster status.
*/
@InterfaceAudience.Private
public class ClusterStatusChore extends Chore {
public class ClusterStatusChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(ClusterStatusChore.class);
private final HMaster master;
private final LoadBalancer balancer;
public ClusterStatusChore(HMaster master, LoadBalancer balancer) {
super(master.getServerName() + "-ClusterStatusChore",
master.getConfiguration().getInt("hbase.balancer.statusPeriod", 60000),
master);
super(master.getServerName() + "-ClusterStatusChore", master, master.getConfiguration().getInt(
"hbase.balancer.statusPeriod", 60000));
this.master = master;
this.balancer = balancer;
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.ipc.RemoteException;
@ -41,7 +41,7 @@ import com.google.common.collect.Lists;
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
*/
public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore {
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
@ -61,7 +61,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
super(name, sleepPeriod, s);
super(name, s, sleepPeriod);
this.fs = fs;
this.oldFileDir = oldFileDir;
this.conf = conf;

View File

@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
@ -86,7 +85,7 @@ public class QuotaCache implements Stoppable {
Configuration conf = rsServices.getConfiguration();
int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
refreshChore = new QuotaRefresherChore(period, this);
Threads.setDaemonThreadRunning(refreshChore.getThread());
rsServices.getChoreService().scheduleChore(refreshChore);
}
@Override
@ -198,11 +197,11 @@ public class QuotaCache implements Stoppable {
}
// TODO: Remove this once we have the notification bus
private class QuotaRefresherChore extends Chore {
private class QuotaRefresherChore extends ScheduledChore {
private long lastUpdate = 0;
public QuotaRefresherChore(final int period, final Stoppable stoppable) {
super("QuotaRefresherChore", period, stoppable);
super("QuotaRefresherChore", stoppable, period);
}
@Override

View File

@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.reflect.Constructor;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@ -46,7 +47,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.net.InetAddress;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@ -58,7 +58,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
@ -76,10 +77,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@ -127,10 +128,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
@ -147,6 +145,9 @@ import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -325,15 +326,20 @@ public class HRegionServer extends HasThread implements
MetricsRegionServer metricsRegionServer;
private SpanReceiverHost spanReceiverHost;
/**
* ChoreService used to schedule tasks that we want to run periodically
*/
private final ChoreService choreService;
/*
* Check for compactions requests.
*/
Chore compactionChecker;
ScheduledChore compactionChecker;
/*
* Check for flushes
*/
Chore periodicFlusher;
ScheduledChore periodicFlusher;
protected volatile WALFactory walFactory;
@ -372,7 +378,7 @@ public class HRegionServer extends HasThread implements
private HealthCheckChore healthCheckChore;
/** The nonce manager chore. */
private Chore nonceManagerChore;
private ScheduledChore nonceManagerChore;
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
@ -552,6 +558,7 @@ public class HRegionServer extends HasThread implements
rpcServices.start();
putUpWebUI();
this.walRoller = new LogRoller(this, this);
this.choreService = new ChoreService(getServerName().toString());
}
protected void login(UserProvider user, String host) throws IOException {
@ -779,8 +786,8 @@ public class HRegionServer extends HasThread implements
movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
if (this.nonceManager != null) {
// Create the chore that cleans up nonces.
nonceManagerChore = this.nonceManager.createCleanupChore(this);
// Create the scheduled chore that cleans up nonces.
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
}
// Setup the Quota Manager
@ -935,17 +942,10 @@ public class HRegionServer extends HasThread implements
if(this.hMemManager != null) this.hMemManager.stop();
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
if (this.compactionChecker != null)
this.compactionChecker.interrupt();
if (this.healthCheckChore != null) {
this.healthCheckChore.interrupt();
}
if (this.nonceManagerChore != null) {
this.nonceManagerChore.interrupt();
}
if (this.storefileRefresher != null) {
this.storefileRefresher.interrupt();
}
if (this.compactionChecker != null) this.compactionChecker.cancel(true);
if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
// Stop the quota manager
if (rsQuotaManager != null) {
@ -1306,7 +1306,7 @@ public class HRegionServer extends HasThread implements
private void startHeapMemoryManager() {
this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
if (this.hMemManager != null) {
this.hMemManager.start();
this.hMemManager.start(getChoreService());
}
}
@ -1421,7 +1421,7 @@ public class HRegionServer extends HasThread implements
/*
* Inner class that runs on a long period checking if regions need compaction.
*/
private static class CompactionChecker extends Chore {
private static class CompactionChecker extends ScheduledChore {
private final HRegionServer instance;
private final int majorCompactPriority;
private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
@ -1429,7 +1429,7 @@ public class HRegionServer extends HasThread implements
CompactionChecker(final HRegionServer h, final int sleepTime,
final Stoppable stopper) {
super("CompactionChecker", sleepTime, h);
super("CompactionChecker", stopper, sleepTime);
this.instance = h;
LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
@ -1475,12 +1475,12 @@ public class HRegionServer extends HasThread implements
}
}
class PeriodicMemstoreFlusher extends Chore {
class PeriodicMemstoreFlusher extends ScheduledChore {
final HRegionServer server;
final static int RANGE_OF_DELAY = 20000; //millisec
final static int MIN_DELAY_TIME = 3000; //millisec
public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
this.server = server;
}
@ -1621,22 +1621,12 @@ public class HRegionServer extends HasThread implements
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
this.cacheFlusher.start(uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
".compactionChecker", uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() +
".periodicFlusher", uncaughtExceptionHandler);
if (this.healthCheckChore != null) {
Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker",
uncaughtExceptionHandler);
}
if (this.nonceManagerChore != null) {
Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner",
uncaughtExceptionHandler);
}
if (this.storefileRefresher != null) {
Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher",
uncaughtExceptionHandler);
}
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@ -1729,8 +1719,8 @@ public class HRegionServer extends HasThread implements
// Verify that all threads are alive
if (!(leases.isAlive()
&& cacheFlusher.isAlive() && walRoller.isAlive()
&& this.compactionChecker.isAlive()
&& this.periodicFlusher.isAlive())) {
&& this.compactionChecker.isScheduled()
&& this.periodicFlusher.isScheduled())) {
stop("One or more threads are no longer alive -- stop");
return false;
}
@ -1994,21 +1984,18 @@ public class HRegionServer extends HasThread implements
* have already been called.
*/
protected void stopServiceThreads() {
if (this.nonceManagerChore != null) {
Threads.shutdown(this.nonceManagerChore.getThread());
}
if (this.compactionChecker != null) {
Threads.shutdown(this.compactionChecker.getThread());
}
if (this.periodicFlusher != null) {
Threads.shutdown(this.periodicFlusher.getThread());
}
// clean up the scheduled chores
if (this.choreService != null) choreService.shutdown();
if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
if (this.compactionChecker != null) compactionChecker.cancel(true);
if (this.periodicFlusher != null) periodicFlusher.cancel(true);
if (this.healthCheckChore != null) healthCheckChore.cancel(true);
if (this.storefileRefresher != null) storefileRefresher.cancel(true);
if (this.cacheFlusher != null) {
this.cacheFlusher.join();
}
if (this.healthCheckChore != null) {
Threads.shutdown(this.healthCheckChore.getThread());
}
if (this.spanReceiverHost != null) {
this.spanReceiverHost.closeReceivers();
}
@ -2034,9 +2021,6 @@ public class HRegionServer extends HasThread implements
this.replicationSinkHandler.stopReplicationService();
}
}
if (this.storefileRefresher != null) {
Threads.shutdown(this.storefileRefresher.getThread());
}
}
/**
@ -2435,6 +2419,11 @@ public class HRegionServer extends HasThread implements
return service;
}
@Override
public ChoreService getChoreService() {
return choreService;
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
return rsQuotaManager;
@ -2944,13 +2933,13 @@ public class HRegionServer extends HasThread implements
/**
* Creates a Chore thread to clean the moved region cache.
*/
protected static class MovedRegionsCleaner extends Chore implements Stoppable {
protected static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
private HRegionServer regionServer;
Stoppable stoppable;
private MovedRegionsCleaner(
HRegionServer regionServer, Stoppable stoppable){
super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
super("MovedRegionsCleaner for region " + regionServer, stoppable, TIMEOUT_REGION_MOVED);
this.regionServer = regionServer;
this.stoppable = stoppable;
}

View File

@ -26,16 +26,16 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
@ -183,11 +183,11 @@ public class HeapMemoryManager {
return true;
}
public void start() {
LOG.info("Starting HeapMemoryTuner chore.");
this.heapMemTunerChore = new HeapMemoryTunerChore();
Threads.setDaemonThreadRunning(heapMemTunerChore.getThread());
if (tunerOn) {
public void start(ChoreService service) {
LOG.info("Starting HeapMemoryTuner chore.");
this.heapMemTunerChore = new HeapMemoryTunerChore();
service.scheduleChore(heapMemTunerChore);
if (tunerOn) {
// Register HeapMemoryTuner as a memstore flush listener
memStoreFlusher.registerFlushRequestListener(heapMemTunerChore);
}
@ -196,7 +196,8 @@ public class HeapMemoryManager {
public void stop() {
// The thread is Daemon. Just interrupting the ongoing process.
LOG.info("Stoping HeapMemoryTuner chore.");
this.heapMemTunerChore.interrupt();
this.heapMemTunerChore.cancel(true);
}
// Used by the test cases.
@ -211,7 +212,7 @@ public class HeapMemoryManager {
return this.heapOccupancyPercent;
}
private class HeapMemoryTunerChore extends Chore implements FlushRequestListener {
private class HeapMemoryTunerChore extends ScheduledChore implements FlushRequestListener {
private HeapMemoryTuner heapMemTuner;
private AtomicLong blockedFlushCount = new AtomicLong();
private AtomicLong unblockedFlushCount = new AtomicLong();
@ -220,7 +221,7 @@ public class HeapMemoryManager {
private boolean alarming = false;
public HeapMemoryTunerChore() {
super(server.getServerName() + "-HeapMemoryTunerChore", defaultChorePeriod, server);
super(server.getServerName() + "-HeapMemoryTunerChore", server, defaultChorePeriod);
Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass(
HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class);
heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration());
@ -239,7 +240,7 @@ public class HeapMemoryManager {
" is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")");
alarming = true;
}
getSleeper().skipSleepCycle();
triggerNow();
try {
// Need to sleep ourselves since we've told the chore's sleeper
// to skip the next sleep cycle.

View File

@ -18,17 +18,15 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
@ -37,6 +35,8 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.Service;
/**
* Services provided by {@link HRegionServer}
*/

View File

@ -18,20 +18,19 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Date;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
@ -247,13 +246,13 @@ public class ServerNonceManager {
}
/**
* Creates a chore that is used to clean up old nonces.
* Creates a scheduled chore that is used to clean up old nonces.
* @param stoppable Stoppable for the chore.
* @return Chore; the chore is not started.
* @return ScheduledChore; the scheduled chore is not started.
*/
public Chore createCleanupChore(Stoppable stoppable) {
public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) {
// By default, it will run every 6 minutes (30 / 5).
return new Chore("nonceCleaner", deleteNonceGracePeriod / 5, stoppable) {
return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) {
@Override
protected void chore() {
cleanUpOldNonces();

View File

@ -25,9 +25,9 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
@ -42,7 +42,7 @@ import org.apache.hadoop.util.StringUtils;
* primary region).
*/
@InterfaceAudience.Private
public class StorefileRefresherChore extends Chore {
public class StorefileRefresherChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(StorefileRefresherChore.class);
@ -69,7 +69,7 @@ public class StorefileRefresherChore extends Chore {
public StorefileRefresherChore(int period, boolean onlyMetaRefresh, HRegionServer regionServer,
Stoppable stoppable) {
super("StorefileRefresherChore", period, stoppable);
super("StorefileRefresherChore", stoppable, period);
this.period = period;
this.regionServer = regionServer;
this.hfileTtl = this.regionServer.getConfiguration().getLong(

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -181,5 +182,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
public ClusterConnection getConnection() {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -36,12 +36,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
@ -773,8 +775,15 @@ public final class Canary implements Tool {
public static void main(String[] args) throws Exception {
final Configuration conf = HBaseConfiguration.create();
AuthUtil.launchAuthChore(conf);
final ChoreService choreService = new ChoreService("CANARY_TOOL");
final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
if (authChore != null) {
choreService.scheduleChore(authChore);
}
int exitCode = ToolRunner.run(conf, new Canary(), args);
choreService.shutdown();
System.exit(exitCode);
}
}

View File

@ -23,10 +23,11 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
@ -53,6 +54,7 @@ public class ConnectionCache {
private final UserGroupInformation realUser;
private final UserProvider userProvider;
private final Configuration conf;
private final ChoreService choreService;
private final ThreadLocal<String> effectiveUserNames =
new ThreadLocal<String>() {
@ -69,8 +71,8 @@ public class ConnectionCache {
@Override public void stop(String why) { isStopped = true;}
@Override public boolean isStopped() {return isStopped;}
};
Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) {
this.choreService = new ChoreService("ConnectionCache");
ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) {
@Override
protected void chore() {
for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
@ -93,7 +95,7 @@ public class ConnectionCache {
}
};
// Start the daemon cleaner chore
Threads.setDaemonThreadRunning(cleaner.getThread());
choreService.scheduleChore(cleaner);
this.realUser = userProvider.getCurrent().getUGI();
this.realUserName = realUser.getShortUserName();
this.userProvider = userProvider;
@ -114,6 +116,13 @@ public class ConnectionCache {
return effectiveUserNames.get();
}
/**
* Called when cache is no longer needed so that it can perform cleanup operations
*/
public void shutdown() {
if (choreService != null) choreService.shutdown();
}
/**
* Caller doesn't close the admin afterwards.
* We need to manage it and close it properly.

View File

@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -242,6 +242,11 @@ public class MockRegionServerServices implements RegionServerServices {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {

View File

@ -33,10 +33,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@ -45,6 +44,8 @@ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
@ -77,7 +78,7 @@ public class TestHFileArchiving {
UTIL.startMiniCluster();
// We don't want the cleaner to remove files. The tests do that.
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().interrupt();
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true);
}
private static void setupConf(Configuration conf) {
@ -351,6 +352,7 @@ public class TestHFileArchiving {
@Test
public void testCleaningRace() throws Exception {
final long TEST_TIME = 20 * 1000;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
@ -369,7 +371,7 @@ public class TestHFileArchiving {
// The cleaner should be looping without long pauses to reproduce the race condition.
HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir);
try {
cleaner.start();
choreService.scheduleChore(cleaner);
// Keep creating/archiving new files while the cleaner is running in the other thread
long startTime = System.currentTimeMillis();
@ -404,7 +406,8 @@ public class TestHFileArchiving {
}
} finally {
stoppable.stop("test end");
cleaner.join();
cleaner.cancel(true);
choreService.shutdown();
fs.delete(rootDir, true);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -212,6 +213,7 @@ public class TestZooKeeperTableArchiveClient {
Configuration conf = UTIL.getConfiguration();
// setup the delegate
Stoppable stop = new StoppableImplementation();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
@ -250,7 +252,7 @@ public class TestZooKeeperTableArchiveClient {
// need to be checked) in 'otherTable' and the files (which should be retained) in the 'table'
CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
// run the cleaner
cleaner.start();
choreService.scheduleChore(cleaner);
// wait for the cleaner to check all the files
finished.await();
// stop the cleaner
@ -412,8 +414,9 @@ public class TestZooKeeperTableArchiveClient {
*/
private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
throws InterruptedException {
final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME");
// run the cleaner
cleaner.start();
choreService.scheduleChore(cleaner);
// wait for the cleaner to check all the files
finished.await();
// stop the cleaner

View File

@ -33,25 +33,24 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
/**
* Test cases for the "load" half of the HFileOutputFormat bulk load
@ -98,7 +97,7 @@ public class TestLoadIncrementalHFiles {
* Test case that creates some regions and loads
* HFiles that fit snugly inside those regions
*/
@Test
@Test(timeout = 60000)
public void testSimpleLoad() throws Exception {
runTest("testSimpleLoad", BloomType.NONE,
new byte[][][] {
@ -111,7 +110,7 @@ public class TestLoadIncrementalHFiles {
* Test case that creates some regions and loads
* HFiles that cross the boundaries of those regions
*/
@Test
@Test(timeout = 60000)
public void testRegionCrossingLoad() throws Exception {
runTest("testRegionCrossingLoad", BloomType.NONE,
new byte[][][] {
@ -123,7 +122,7 @@ public class TestLoadIncrementalHFiles {
/**
* Test loading into a column family that has a ROW bloom filter.
*/
@Test
@Test(timeout = 60000)
public void testRegionCrossingRowBloom() throws Exception {
runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
new byte[][][] {
@ -135,7 +134,7 @@ public class TestLoadIncrementalHFiles {
/**
* Test loading into a column family that has a ROWCOL bloom filter.
*/
@Test
@Test(timeout = 60000)
public void testRegionCrossingRowColBloom() throws Exception {
runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
new byte[][][] {
@ -148,7 +147,7 @@ public class TestLoadIncrementalHFiles {
* Test case that creates some regions and loads HFiles that have
* different region boundaries than the table pre-split.
*/
@Test
@Test(timeout = 60000)
public void testSimpleHFileSplit() throws Exception {
runTest("testHFileSplit", BloomType.NONE,
new byte[][] {
@ -166,7 +165,7 @@ public class TestLoadIncrementalHFiles {
* Test case that creates some regions and loads HFiles that cross the boundaries
* and have different region boundaries than the table pre-split.
*/
@Test
@Test(timeout = 60000)
public void testRegionCrossingHFileSplit() throws Exception {
testRegionCrossingHFileSplit(BloomType.NONE);
}
@ -175,7 +174,7 @@ public class TestLoadIncrementalHFiles {
* Test case that creates some regions and loads HFiles that cross the boundaries
* have a ROW bloom filter and a different region boundaries than the table pre-split.
*/
@Test
@Test(timeout = 60000)
public void testRegionCrossingHFileSplitRowBloom() throws Exception {
testRegionCrossingHFileSplit(BloomType.ROW);
}
@ -184,7 +183,7 @@ public class TestLoadIncrementalHFiles {
* Test case that creates some regions and loads HFiles that cross the boundaries
* have a ROWCOL bloom filter and a different region boundaries than the table pre-split.
*/
@Test
@Test(timeout = 60000)
public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
testRegionCrossingHFileSplit(BloomType.ROWCOL);
}
@ -278,7 +277,7 @@ public class TestLoadIncrementalHFiles {
/**
* Test loading into a column family that does not exist.
*/
@Test
@Test(timeout = 60000)
public void testNonexistentColumnFamilyLoad() throws Exception {
String testName = "testNonexistentColumnFamilyLoad";
byte[][][] hFileRanges = new byte[][][] {
@ -307,7 +306,7 @@ public class TestLoadIncrementalHFiles {
}
}
@Test
@Test(timeout = 60000)
public void testSplitStoreFile() throws IOException {
Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
FileSystem fs = util.getTestFileSystem();
@ -354,7 +353,7 @@ public class TestLoadIncrementalHFiles {
map.put(last, value-1);
}
@Test
@Test(timeout = 60000)
public void testInferBoundaries() {
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
@ -411,7 +410,7 @@ public class TestLoadIncrementalHFiles {
}
}
@Test
@Test(timeout = 60000)
public void testLoadTooMayHFiles() throws Exception {
Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
FileSystem fs = util.getTestFileSystem();
@ -445,7 +444,7 @@ public class TestLoadIncrementalHFiles {
loader.run(args);
}
@Test
@Test(timeout = 60000)
public void testTableWithCFNameStartWithUnderScore() throws Exception {
Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
FileSystem fs = util.getTestFileSystem();

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
@ -99,8 +100,8 @@ import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -550,6 +551,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {

View File

@ -28,6 +28,7 @@ import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
@ -320,5 +321,10 @@ public class TestActiveMasterManager {
public ActiveMasterManager getActiveMasterManager() {
return activeMasterManager;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -53,7 +54,6 @@ import org.apache.hadoop.hbase.TableDescriptor;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableState;
@ -200,6 +200,11 @@ public class TestCatalogJanitor {
@Override
public void stop(String why) {
}
@Override
public ChoreService getChoreService() {
return null;
}
}
/**
@ -235,6 +240,11 @@ public class TestCatalogJanitor {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public MasterFileSystem getMasterFileSystem() {
return this.mfs;
@ -644,7 +654,7 @@ public class TestCatalogJanitor {
assertTrue(janitor.cleanParent(parent, regions.get(parent)));
services.stop("test finished");
janitor.join();
janitor.cancel(true);
}
/**
@ -712,7 +722,7 @@ public class TestCatalogJanitor {
assertEquals(2, janitor.scan());
services.stop("test finished");
janitor.join();
janitor.cancel(true);
}
/**
@ -875,7 +885,7 @@ public class TestCatalogJanitor {
FSUtils.delete(fs, rootdir, true);
services.stop("Test finished");
server.stop("Test finished");
janitor.join();
janitor.cancel(true);
}
/**
@ -960,7 +970,7 @@ public class TestCatalogJanitor {
// cleanup
services.stop("Test finished");
server.stop("shutdown");
janitor.join();
janitor.cancel(true);
}
private FileStatus[] addMockStoreFiles(int count, MasterServices services, Path storedir)

View File

@ -25,6 +25,7 @@ import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -93,6 +94,11 @@ public class TestClockSkewDetection {
@Override
public void stop(String why) {
}
@Override
public ChoreService getChoreService() {
return null;
}
}, null, false);
LOG.debug("regionServerStartup 1");

View File

@ -46,6 +46,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -162,6 +163,11 @@ public class TestSplitLogManager {
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
static Stoppable stopper = new Stoppable() {

View File

@ -37,18 +37,18 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@ -346,9 +346,10 @@ public class TestTableLockManager {
int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
StoppableImplementation stopper = new StoppableImplementation();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
//alter table every 10 sec
Chore alterThread = new Chore("Alter Chore", 10000, stopper) {
ScheduledChore alterThread = new ScheduledChore("Alter Chore", stopper, 10000) {
@Override
protected void chore() {
Random random = new Random();
@ -367,7 +368,7 @@ public class TestTableLockManager {
};
//split table every 5 sec
Chore splitThread = new Chore("Split thread", 5000, stopper) {
ScheduledChore splitThread = new ScheduledChore("Split thread", stopper, 5000) {
@Override
public void chore() {
try {
@ -392,8 +393,8 @@ public class TestTableLockManager {
}
};
alterThread.start();
splitThread.start();
choreService.scheduleChore(alterThread);
choreService.scheduleChore(splitThread);
TEST_UTIL.waitTableEnabled(tableName);
while (true) {
List<HRegionInfo> regions = admin.getTableRegions(tableName);
@ -424,6 +425,7 @@ public class TestTableLockManager {
}
admin.close();
choreService.shutdown();
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -157,7 +158,6 @@ public class TestHFileCleaner {
LOG.debug("Kept hfiles: " + file.getPath().getName());
}
cleaner.interrupt();
// reset the edge back to the original edge
EnvironmentEdgeManager.injectEdge(originalEdge);
}
@ -248,5 +248,10 @@ public class TestHFileCleaner {
public boolean isStopped() {
return false;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -27,14 +27,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -122,8 +122,6 @@ public class TestHFileLinkCleaner {
}
assertFalse("HFile should be deleted", fs.exists(FSUtils.getTableDir(archiveDir, tableName)));
assertFalse("Link should be deleted", fs.exists(FSUtils.getTableDir(archiveDir, tableLinkName)));
cleaner.interrupt();
}
private static Path getFamilyDirPath (final Path rootDir, final TableName table,
@ -183,5 +181,10 @@ public class TestHFileLinkCleaner {
public boolean isStopped() {
return false;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -193,5 +194,10 @@ public class TestLogsCleaner {
public boolean isStopped() {
return false;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -119,7 +119,7 @@ public class TestNamespaceAuditor {
.getMasterQuotaManager().isQuotaEnabled());
}
@Test
@Test(timeout = 60000)
public void testTableOperations() throws Exception {
String nsp = prefix + "_np2";
NamespaceDescriptor nspDesc =

View File

@ -34,11 +34,12 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
@ -198,8 +199,9 @@ public class TestEndToEndSplitTransaction {
Stoppable stopper = new StoppableImplementation();
RegionSplitter regionSplitter = new RegionSplitter(table);
RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
final ChoreService choreService = new ChoreService("TEST_SERVER");
regionChecker.start();
choreService.scheduleChore(regionChecker);
regionSplitter.start();
//wait until the splitter is finished
@ -298,17 +300,16 @@ public class TestEndToEndSplitTransaction {
/**
* Checks regions using MetaScanner, MetaTableAccessor and HTable methods
*/
static class RegionChecker extends Chore {
static class RegionChecker extends ScheduledChore {
Connection connection;
Configuration conf;
TableName tableName;
Throwable ex;
RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
super("RegionChecker", 10, stopper);
super("RegionChecker", stopper, 10);
this.conf = conf;
this.tableName = tableName;
this.setDaemon(true);
this.connection = ConnectionFactory.createConnection(conf);
}

View File

@ -27,6 +27,7 @@ import java.lang.management.ManagementFactory;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -111,7 +112,8 @@ public class TestHeapMemoryManager {
new RegionServerStub(conf));
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
@ -151,7 +153,8 @@ public class TestHeapMemoryManager {
new RegionServerStub(conf));
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
@ -186,7 +189,8 @@ public class TestHeapMemoryManager {
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
heapMemoryManager.start();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
// Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner
CustomHeapMemoryTuner.memstoreSize = 0.78f;
CustomHeapMemoryTuner.blockCacheSize = 0.02f;
@ -215,7 +219,8 @@ public class TestHeapMemoryManager {
HeapMemoryTuner.class);
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
heapMemoryManager.start();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
CustomHeapMemoryTuner.memstoreSize = 0.78f;
CustomHeapMemoryTuner.blockCacheSize = 0.02f;
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
@ -241,7 +246,8 @@ public class TestHeapMemoryManager {
new RegionServerStub(conf));
long oldMemstoreSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
CustomHeapMemoryTuner.memstoreSize = 0.7f;
CustomHeapMemoryTuner.blockCacheSize = 0.3f;
Thread.sleep(1500);
@ -283,7 +289,8 @@ public class TestHeapMemoryManager {
conf));
long oldMemstoreSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
CustomHeapMemoryTuner.memstoreSize = 0.4f;
CustomHeapMemoryTuner.blockCacheSize = 0.4f;
Thread.sleep(1500);
@ -490,6 +497,11 @@ public class TestHeapMemoryManager {
public ServerName getServerName() {
return ServerName.valueOf("server1",4000,12345);
}
@Override
public ChoreService getChoreService() {
return null;
}
}
static class CustomHeapMemoryTuner implements HeapMemoryTuner {

View File

@ -19,17 +19,20 @@
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Threads;
@ -87,7 +90,7 @@ public class TestServerNonceManager {
EnvironmentEdgeManager.injectEdge(edge);
try {
ServerNonceManager nm = createManager(6);
Chore cleanup = nm.createCleanupChore(Mockito.mock(Stoppable.class));
ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
edge.setValue(1);
assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
@ -120,7 +123,7 @@ public class TestServerNonceManager {
EnvironmentEdgeManager.injectEdge(edge);
try {
ServerNonceManager nm = createManager(6);
Chore cleanup = nm.createCleanupChore(Mockito.mock(Stoppable.class));
ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
// Add nonces from WAL, including dups.
edge.setValue(12);
nm.reportOperationFromWal(NO_NONCE, 1, 8);

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -137,6 +138,11 @@ public class TestSplitLogWorker {
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -183,5 +184,10 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public boolean isStopped() {
return this.isStopped;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -286,5 +287,10 @@ public class TestReplicationTrackerZKImpl {
public boolean isStopped() {
return this.isStopped;
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -37,8 +37,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -48,15 +49,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
@ -68,6 +64,9 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -478,5 +477,10 @@ public class TestReplicationSourceManager {
public boolean isStopped() {
return false; // To change body of implemented methods use File | Settings | File Templates.
}
@Override
public ChoreService getChoreService() {
return null;
}
}
}

View File

@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Coprocessor;
@ -319,6 +320,11 @@ public class TestTokenAuthentication {
throw new ServiceException(ioe);
}
}
@Override
public ChoreService getChoreService() {
return null;
}
}
private static HBaseTestingUtility TEST_UTIL;

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
@ -121,4 +122,9 @@ public class MockServer implements Server {
// TODO Auto-generated method stub
return this.aborted;
}
@Override
public ChoreService getChoreService() {
return null;
}
}