HBASE-10926 Use global procedure to flush table memstore cache (Jerry He)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1592368 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2014-05-04 16:12:32 +00:00
parent a7ba07f757
commit 4b2a4d10de
15 changed files with 807 additions and 24 deletions

View File

@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -1467,21 +1468,12 @@ public class HBaseAdmin implements Admin {
} else {
final TableName tableName = checkTableExists(
TableName.valueOf(tableNameOrRegionName), ct);
List<Pair<HRegionInfo, ServerName>> pairs =
MetaReader.getTableRegionsAndLocations(ct,
tableName);
for (Pair<HRegionInfo, ServerName> pair: pairs) {
if (pair.getFirst().isOffline()) continue;
if (pair.getSecond() == null) continue;
try {
flush(pair.getSecond(), pair.getFirst());
} catch (NotServingRegionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to flush " + pair.getFirst() + ": " +
StringUtils.stringifyException(e));
}
}
if (isTableDisabled(tableName)) {
LOG.info("Table is disabled: " + tableName.getNameAsString());
return;
}
execProcedure("flush-table-proc", tableName.getNameAsString(),
new HashMap<String, String>());
}
} finally {
cleanupCatalogTracker(ct);

View File

@ -387,4 +387,14 @@ public class BaseMasterObserver implements MasterObserver {
List<HTableDescriptor> descriptors) throws IOException {
}
@Override
public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
@Override
public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
}

View File

@ -694,4 +694,22 @@ public interface MasterObserver extends Coprocessor {
*/
void postModifyNamespace(final ObserverContext<MasterCoprocessorEnvironment> ctx,
NamespaceDescriptor ns) throws IOException;
/**
* Called before the table memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
* @param tableName the name of the table
* @throws IOException
*/
void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName) throws IOException;
/**
* Called after the table memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
* @param tableName the name of the table
* @throws IOException
*/
void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName) throws IOException;
}

View File

@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
@ -428,6 +429,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
this.snapshotManager = new SnapshotManager();
this.mpmHost = new MasterProcedureManagerHost();
this.mpmHost.register(this.snapshotManager);
this.mpmHost.register(new MasterFlushTableProcedureManager());
this.mpmHost.loadProcedures(conf);
this.mpmHost.initialize(this, this.metricsMaster);
}

View File

@ -1568,4 +1568,38 @@ public class MasterCoprocessorHost
}
}
public void preTableFlush(final TableName tableName) throws IOException {
ObserverContext<MasterCoprocessorEnvironment> ctx = null;
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((MasterObserver)env.getInstance()).preTableFlush(ctx, tableName);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
public void postTableFlush(final TableName tableName) throws IOException {
ObserverContext<MasterCoprocessorEnvironment> ctx = null;
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((MasterObserver)env.getInstance()).postTableFlush(ctx, tableName);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
}

View File

@ -341,6 +341,16 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
}
/**
* Check if the entire procedure has globally completed, or has been aborted.
* @throws ForeignException
*/
public boolean isCompleted() throws ForeignException {
// Rethrow exception if any
monitor.rethrowException();
return (completedLatch.getCount() == 0);
}
/**
* A callback that handles incoming ForeignExceptions.
*/

View File

@ -149,12 +149,21 @@ public class ProcedureCoordinator {
Procedure oldProc = procedures.get(procName);
if (oldProc != null) {
// procedures are always eventually completed on both successful and failed execution
if (oldProc.completedLatch.getCount() != 0) {
LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
return false;
try {
if (!oldProc.isCompleted()) {
LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
return false;
}
else {
LOG.debug("Procedure " + procName
+ " was in running list but was completed. Accepting new attempt.");
procedures.remove(procName);
}
} catch (ForeignException e) {
LOG.debug("Procedure " + procName
+ " was in running list but has exception. Accepting new attempt.");
procedures.remove(procName);
}
LOG.debug("Procedure " + procName + " was in running list but was completed. Accepting new attempt.");
procedures.remove(procName);
}
}
@ -233,14 +242,19 @@ public class ProcedureCoordinator {
/**
* Kick off the named procedure
* Currently only one procedure with the same type and name is allowed to run at a time.
* @param procName name of the procedure to start
* @param procArgs arguments for the procedure
* @param expectedMembers expected members to start
* @return handle to the running procedure, if it was started correctly, <tt>null</tt> otherwise
* @throws RejectedExecutionException if there are no more available threads to run the procedure
* @return handle to the running procedure, if it was started correctly,
* <tt>null</tt> otherwise.
* Null could be due to submitting a procedure multiple times
* (or one with the same name), or runtime exception.
* Check the procedure's monitor that holds a reference to the exception
* that caused the failure.
*/
public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
List<String> expectedMembers) throws RejectedExecutionException {
List<String> expectedMembers) {
Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
if (!this.submitProcedure(proc)) {
LOG.error("Failed to submit procedure '" + procName + "'");
@ -303,4 +317,4 @@ public class ProcedureCoordinator {
public Set<String> getProcedureNames() {
return new HashSet<String>(procedures.keySet());
}
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
@ -70,6 +71,8 @@ public class RegionServerProcedureManagerHost extends
loadUserProcedures(conf, REGIONSERVER_PROCEDURE_CONF_KEY);
// load the default snapshot manager
procedures.add(new RegionServerSnapshotManager());
// load the default flush region procedure manager
procedures.add(new RegionServerFlushTableProcedureManager());
}
}

View File

@ -160,6 +160,7 @@ abstract public class Subprocedure implements Callable<Void> {
LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
acquireBarrier();
LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
rethrowException();
// vote yes to coordinator about being prepared
rpcs.sendMemberAcquired(this);
@ -180,6 +181,7 @@ abstract public class Subprocedure implements Callable<Void> {
LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
insideBarrier();
LOG.debug("Subprocedure '" + barrierName + "' locally completed");
rethrowException();
// Ack that the member has executed and released local barrier
rpcs.sendMemberCompleted(this);
@ -327,4 +329,4 @@ abstract public class Subprocedure implements Callable<Void> {
@Override
public void cleanup(Exception e) {}
};
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure.flush;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* This flush region implementation uses the distributed procedure framework to flush
* table regions.
* Its acquireBarrier stage does nothing. Its insideBarrier stage flushes the regions.
*/
@InterfaceAudience.Private
public class FlushTableSubprocedure extends Subprocedure {
private static final Log LOG = LogFactory.getLog(FlushTableSubprocedure.class);
private final String table;
private final List<HRegion> regions;
private final FlushTableSubprocedurePool taskManager;
public FlushTableSubprocedure(ProcedureMember member,
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
List<HRegion> regions, String table,
FlushTableSubprocedurePool taskManager) {
super(member, table, errorListener, wakeFrequency, timeout);
this.table = table;
this.regions = regions;
this.taskManager = taskManager;
}
private class RegionFlushTask implements Callable<Void> {
HRegion region;
RegionFlushTask(HRegion region) {
this.region = region;
}
@Override
public Void call() throws Exception {
LOG.debug("Starting region operation on " + region);
region.startRegionOperation();
try {
LOG.debug("Flush region " + region.toString() + " started...");
region.flushcache();
} finally {
LOG.debug("Closing region operation on " + region);
region.closeRegionOperation();
}
return null;
}
}
private void flushRegions() throws ForeignException {
if (regions.isEmpty()) {
// No regions on this RS, we are basically done.
return;
}
monitor.rethrowException();
// assert that the taskManager is empty.
if (taskManager.hasTasks()) {
throw new IllegalStateException("Attempting to flush "
+ table + " but we currently have outstanding tasks");
}
// Add all hfiles already existing in region.
for (HRegion region : regions) {
// submit one task per region for parallelize by region.
taskManager.submitTask(new RegionFlushTask(region));
monitor.rethrowException();
}
// wait for everything to complete.
LOG.debug("Flush region tasks submitted for " + regions.size() + " regions");
try {
taskManager.waitForOutstandingTasks();
} catch (InterruptedException e) {
throw new ForeignException(getMemberName(), e);
}
}
/**
* Flush the online regions on this rs for the target table.
*/
@Override
public void acquireBarrier() throws ForeignException {
flushRegions();
}
@Override
public void insideBarrier() throws ForeignException {
// No-Op
}
/**
* Cancel threads if they haven't finished.
*/
@Override
public void cleanup(Exception e) {
LOG.info("Aborting all flush region subprocedure task threads for '"
+ table + "' due to error", e);
try {
taskManager.cancelTasks();
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
public void releaseBarrier() {
// NO OP
}
}

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure.flush;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
import com.google.common.collect.Lists;
public class MasterFlushTableProcedureManager extends MasterProcedureManager {
public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis";
private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis";
private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500;
private static final String FLUSH_PROC_POOL_THREADS_KEY =
"hbase.flush.procedure.master.threads";
private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1;
private static final Log LOG = LogFactory.getLog(MasterFlushTableProcedureManager.class);
private MasterServices master;
private ProcedureCoordinator coordinator;
private Map<TableName, Procedure> procMap = new HashMap<TableName, Procedure>();
private boolean stopped;
public MasterFlushTableProcedureManager() {};
@Override
public void stop(String why) {
LOG.info("stop: " + why);
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException {
this.master = master;
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT);
long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT);
// setup the procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
master.getZooKeeper(), getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
}
@Override
public String getProcedureSignature() {
return FLUSH_TABLE_PROCEDURE_SIGNATURE;
}
@Override
public void execProcedure(ProcedureDescription desc) throws IOException {
TableName tableName = TableName.valueOf(desc.getInstance());
// call pre coproc hook
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preTableFlush(tableName);
}
// Get the list of region servers that host the online regions for table.
// We use the procedure instance name to carry the table name from the client.
// It is possible that regions may move after we get the region server list.
// Each region server will get its own online regions for the table.
// We may still miss regions that need to be flushed.
List<Pair<HRegionInfo, ServerName>> regionsAndLocations = null;
try {
regionsAndLocations =
MetaReader.getTableRegionsAndLocations(this.master.getCatalogTracker(),
TableName.valueOf(desc.getInstance()), false);
} catch (InterruptedException e1) {
String msg = "Failed to get regions for '" + desc.getInstance() + "'";
LOG.error(msg);
throw new IOException(msg, e1);
}
Set<String> regionServers = new HashSet<String>(regionsAndLocations.size());
for (Pair<HRegionInfo, ServerName> region : regionsAndLocations) {
if (region != null && region.getFirst() != null && region.getSecond() != null) {
HRegionInfo hri = region.getFirst();
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
regionServers.add(region.getSecond().toString());
}
}
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
// Kick of the global procedure from the master coordinator to the region servers.
// We rely on the existing Distributed Procedure framework to prevent any concurrent
// procedure with the same name.
Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
new byte[0], Lists.newArrayList(regionServers));
monitor.rethrowException();
if (proc == null) {
String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
+ desc.getInstance() + "'. " + "Another flush procedure is running?";
LOG.error(msg);
throw new IOException(msg);
}
procMap.put(tableName, proc);
try {
// wait for the procedure to complete. A timer thread is kicked off that should cancel this
// if it takes too long.
proc.waitForCompleted();
LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '"
+ desc.getInstance() + "'");
LOG.info("Master flush table procedure is successful!");
} catch (InterruptedException e) {
ForeignException ee =
new ForeignException("Interrupted while waiting for flush table procdure to finish", e);
monitor.receive(ee);
Thread.currentThread().interrupt();
} catch (ForeignException e) {
ForeignException ee =
new ForeignException("Exception while waiting for flush table procdure to finish", e);
monitor.receive(ee);
}
monitor.rethrowException();
}
@Override
public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException {
// Procedure instance name is the table name.
TableName tableName = TableName.valueOf(desc.getInstance());
Procedure proc = procMap.get(tableName);
if (proc == null) {
// The procedure has not even been started yet.
// The client would request the procedure and call isProcedureDone().
// The HBaseAdmin.execProcedure() wraps both request and isProcedureDone().
return false;
}
// We reply on the existing Distributed Procedure framework to give us the status.
return proc.isCompleted();
}
}

View File

@ -0,0 +1,332 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure.flush;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* This manager class handles flushing of the regions for table on a {@link HRegionServer}.
*/
@InterfaceAudience.Private
public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager {
private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class);
private static final String CONCURENT_FLUSH_TASKS_KEY =
"hbase.flush.procedure.region.concurrentTasks";
private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
public static final String FLUSH_REQUEST_THREADS_KEY =
"hbase.flush.procedure.region.pool.threads";
public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
public static final String FLUSH_TIMEOUT_MILLIS_KEY =
"hbase.flush.procedure.region.timeout";
public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY =
"hbase.flush.procedure.region.wakefrequency";
private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500;
private RegionServerServices rss;
private ProcedureMemberRpcs memberRpcs;
private ProcedureMember member;
/**
* Exposed for testing.
* @param conf HBase configuration.
* @param server region server.
* @param memberRpc use specified memberRpc instance
* @param procMember use specified ProcedureMember
*/
RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server,
ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
this.rss = server;
this.memberRpcs = memberRpc;
this.member = procMember;
}
public RegionServerFlushTableProcedureManager() {}
/**
* Start accepting flush table requests.
*/
@Override
public void start() {
LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString());
this.memberRpcs.start(rss.getServerName().toString(), member);
}
/**
* Close <tt>this</tt> and all running tasks
* @param force forcefully stop all running tasks
* @throws IOException
*/
@Override
public void stop(boolean force) throws IOException {
String mode = force ? "abruptly" : "gracefully";
LOG.info("Stopping region server flush procedure manager " + mode + ".");
try {
this.member.close();
} finally {
this.memberRpcs.close();
}
}
/**
* If in a running state, creates the specified subprocedure to flush table regions.
*
* Because this gets the local list of regions to flush and not the set the master had,
* there is a possibility of a race where regions may be missed.
*
* @param table
* @return Subprocedure to submit to the ProcedureMemeber.
*/
public Subprocedure buildSubprocedure(String table) {
// don't run the subprocedure if the parent is stop(ping)
if (rss.isStopping() || rss.isStopped()) {
throw new IllegalStateException("Can't start flush region subprocedure on RS: "
+ rss.getServerName() + ", because stopping/stopped!");
}
// check to see if this server is hosting any regions for the table
List<HRegion> involvedRegions;
try {
involvedRegions = getRegionsToFlush(table);
} catch (IOException e1) {
throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
}
// We need to run the subprocedure even if we have no relevant regions. The coordinator
// expects participation in the procedure and without sending message the master procedure
// will hang and fail.
LOG.debug("Launching subprocedure to flush regions for " + table);
ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
Configuration conf = rss.getConfiguration();
long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY,
FLUSH_TIMEOUT_MILLIS_DEFAULT);
long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY,
FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
FlushTableSubprocedurePool taskManager =
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf);
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, table, taskManager);
}
/**
* Get the list of regions to flush for the table on this server
*
* It is possible that if a region moves somewhere between the calls
* we'll miss the region.
*
* @param table
* @return the list of online regions. Empty list is returned if no regions.
* @throws IOException
*/
private List<HRegion> getRegionsToFlush(String table) throws IOException {
return rss.getOnlineRegions(TableName.valueOf(table));
}
public class FlushTableSubprocedureBuilder implements SubprocedureFactory {
@Override
public Subprocedure buildSubprocedure(String name, byte[] data) {
// The name of the procedure instance from the master is the table name.
return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
}
}
/**
* We use the FlushTableSubprocedurePool, a class specific thread pool instead of
* {@link org.apache.hadoop.hbase.executor.ExecutorService}.
*
* It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
* completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
* failures.
*/
static class FlushTableSubprocedurePool {
private final ExecutorCompletionService<Void> taskPool;
private final ThreadPoolExecutor executor;
private volatile boolean stopped;
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
private final String name;
FlushTableSubprocedurePool(String name, Configuration conf) {
// configure the executor service
long keepAlive = conf.getLong(
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name;
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
+ name + ")-flush-proc-pool"));
taskPool = new ExecutorCompletionService<Void>(executor);
}
boolean hasTasks() {
return futures.size() != 0;
}
/**
* Submit a task to the pool.
*
* NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}.
*/
void submitTask(final Callable<Void> task) {
Future<Void> f = this.taskPool.submit(task);
futures.add(f);
}
/**
* Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
* This *must* be called after all tasks are submitted via submitTask.
*
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
* @throws InterruptedException
*/
boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
LOG.debug("Waiting for local region flush to finish.");
int sz = futures.size();
try {
// Using the completion service to process the futures.
for (int i = 0; i < sz; i++) {
Future<Void> f = taskPool.take();
f.get();
if (!futures.remove(f)) {
LOG.warn("unexpected future" + f);
}
LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks.");
}
LOG.debug("Completed " + sz + " local region flush tasks.");
return true;
} catch (InterruptedException e) {
LOG.warn("Got InterruptedException in FlushSubprocedurePool", e);
if (!stopped) {
Thread.currentThread().interrupt();
throw new ForeignException("FlushSubprocedurePool", e);
}
// we are stopped so we can just exit.
} catch (ExecutionException e) {
if (e.getCause() instanceof ForeignException) {
LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
throw (ForeignException)e.getCause();
}
LOG.warn("Got Exception in FlushSubprocedurePool", e);
throw new ForeignException(name, e.getCause());
} finally {
cancelTasks();
}
return false;
}
/**
* This attempts to cancel out all pending and in progress tasks (interruptions issues)
* @throws InterruptedException
*/
void cancelTasks() throws InterruptedException {
Collection<Future<Void>> tasks = futures;
LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name);
for (Future<Void> f: tasks) {
f.cancel(false);
}
// evict remaining tasks and futures from taskPool.
while (!futures.isEmpty()) {
// block to remove cancelled futures;
LOG.warn("Removing cancelled elements from taskPool");
futures.remove(taskPool.take());
}
stop();
}
/**
* Abruptly shutdown the thread pool. Call when exiting a region server.
*/
void stop() {
if (this.stopped) return;
this.stopped = true;
this.executor.shutdownNow();
}
}
/**
* Initialize this region server flush procedure manager
* Uses a zookeeper based member controller.
* @param rss region server
* @throws KeeperException if the zookeeper cannot be reached
*/
@Override
public void initialize(RegionServerServices rss) throws KeeperException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE);
Configuration conf = rss.getConfiguration();
long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT);
// create the actual flush table procedure member
ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder());
}
@Override
public String getProcedureSignature() {
return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE;
}
}

View File

@ -1167,6 +1167,17 @@ public class AccessController extends BaseRegionObserver
NamespaceDescriptor ns) throws IOException {
}
@Override
public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName) throws IOException {
requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
}
@Override
public void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName) throws IOException {
}
/* ---- RegionObserver implementation ---- */
@Override

View File

@ -555,6 +555,16 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb
NamespaceDescriptor ns) throws IOException {
}
@Override
public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
@Override
public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
@Override
public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {

View File

@ -941,6 +941,16 @@ public class TestMasterObserver {
public boolean wasGetTableDescriptorsCalled() {
return preGetTableDescriptorsCalled && postGetTableDescriptorsCalled;
}
@Override
public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
@Override
public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
}
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();