From 4b2a4d10de0b48f8ceb33ae33d1af4f1a081170e Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Sun, 4 May 2014 16:12:32 +0000 Subject: [PATCH] HBASE-10926 Use global procedure to flush table memstore cache (Jerry He) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1592368 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/client/HBaseAdmin.java | 20 +- .../hbase/coprocessor/BaseMasterObserver.java | 10 + .../hbase/coprocessor/MasterObserver.java | 18 + .../apache/hadoop/hbase/master/HMaster.java | 2 + .../hbase/master/MasterCoprocessorHost.java | 34 ++ .../hadoop/hbase/procedure/Procedure.java | 10 + .../hbase/procedure/ProcedureCoordinator.java | 32 +- .../RegionServerProcedureManagerHost.java | 3 + .../hadoop/hbase/procedure/Subprocedure.java | 4 +- .../flush/FlushTableSubprocedure.java | 139 ++++++++ .../MasterFlushTableProcedureManager.java | 196 +++++++++++ ...egionServerFlushTableProcedureManager.java | 332 ++++++++++++++++++ .../security/access/AccessController.java | 11 + .../visibility/VisibilityController.java | 10 + .../hbase/coprocessor/TestMasterObserver.java | 10 + 15 files changed, 807 insertions(+), 24 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 759822a6190..989876fc5d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -24,6 +24,7 @@ import java.io.InterruptedIOException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1467,21 +1468,12 @@ public class HBaseAdmin implements Admin { } else { final TableName tableName = checkTableExists( TableName.valueOf(tableNameOrRegionName), ct); - List> pairs = - MetaReader.getTableRegionsAndLocations(ct, - tableName); - for (Pair pair: pairs) { - if (pair.getFirst().isOffline()) continue; - if (pair.getSecond() == null) continue; - try { - flush(pair.getSecond(), pair.getFirst()); - } catch (NotServingRegionException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to flush " + pair.getFirst() + ": " + - StringUtils.stringifyException(e)); - } - } + if (isTableDisabled(tableName)) { + LOG.info("Table is disabled: " + tableName.getNameAsString()); + return; } + execProcedure("flush-table-proc", tableName.getNameAsString(), + new HashMap()); } } finally { cleanupCatalogTracker(ct); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java index 4739da8f2c7..2e834bf3eb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java @@ -387,4 +387,14 @@ public class BaseMasterObserver implements MasterObserver { List descriptors) throws IOException { } + @Override + public void preTableFlush(ObserverContext ctx, + TableName tableName) throws IOException { + } + + @Override + public void postTableFlush(ObserverContext ctx, + TableName tableName) throws IOException { + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index edcce380b6f..a266157f412 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -694,4 +694,22 @@ public interface MasterObserver extends Coprocessor { */ void postModifyNamespace(final ObserverContext ctx, NamespaceDescriptor ns) throws IOException; + + /** + * Called before the table memstore is flushed to disk. + * @param ctx the environment to interact with the framework and master + * @param tableName the name of the table + * @throws IOException + */ + void preTableFlush(final ObserverContext ctx, + final TableName tableName) throws IOException; + + /** + * Called after the table memstore is flushed to disk. + * @param ctx the environment to interact with the framework and master + * @param tableName the name of the table + * @throws IOException + */ + void postTableFlush(final ObserverContext ctx, + final TableName tableName) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 396c15be015..2d6a6e7736f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; +import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; @@ -428,6 +429,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { this.snapshotManager = new SnapshotManager(); this.mpmHost = new MasterProcedureManagerHost(); this.mpmHost.register(this.snapshotManager); + this.mpmHost.register(new MasterFlushTableProcedureManager()); this.mpmHost.loadProcedures(conf); this.mpmHost.initialize(this, this.metricsMaster); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index e87d6149aa7..2950456d3c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -1568,4 +1568,38 @@ public class MasterCoprocessorHost } } + public void preTableFlush(final TableName tableName) throws IOException { + ObserverContext ctx = null; + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((MasterObserver)env.getInstance()).preTableFlush(ctx, tableName); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + public void postTableFlush(final TableName tableName) throws IOException { + ObserverContext ctx = null; + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((MasterObserver)env.getInstance()).postTableFlush(ctx, tableName); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java index 37eae44126c..fd15a481c07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java @@ -341,6 +341,16 @@ public class Procedure implements Callable, ForeignExceptionListener { waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed"); } + /** + * Check if the entire procedure has globally completed, or has been aborted. + * @throws ForeignException + */ + public boolean isCompleted() throws ForeignException { + // Rethrow exception if any + monitor.rethrowException(); + return (completedLatch.getCount() == 0); + } + /** * A callback that handles incoming ForeignExceptions. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index 6744167527b..516365d0180 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -149,12 +149,21 @@ public class ProcedureCoordinator { Procedure oldProc = procedures.get(procName); if (oldProc != null) { // procedures are always eventually completed on both successful and failed execution - if (oldProc.completedLatch.getCount() != 0) { - LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); - return false; + try { + if (!oldProc.isCompleted()) { + LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); + return false; + } + else { + LOG.debug("Procedure " + procName + + " was in running list but was completed. Accepting new attempt."); + procedures.remove(procName); + } + } catch (ForeignException e) { + LOG.debug("Procedure " + procName + + " was in running list but has exception. Accepting new attempt."); + procedures.remove(procName); } - LOG.debug("Procedure " + procName + " was in running list but was completed. Accepting new attempt."); - procedures.remove(procName); } } @@ -233,14 +242,19 @@ public class ProcedureCoordinator { /** * Kick off the named procedure + * Currently only one procedure with the same type and name is allowed to run at a time. * @param procName name of the procedure to start * @param procArgs arguments for the procedure * @param expectedMembers expected members to start - * @return handle to the running procedure, if it was started correctly, null otherwise - * @throws RejectedExecutionException if there are no more available threads to run the procedure + * @return handle to the running procedure, if it was started correctly, + * null otherwise. + * Null could be due to submitting a procedure multiple times + * (or one with the same name), or runtime exception. + * Check the procedure's monitor that holds a reference to the exception + * that caused the failure. */ public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, - List expectedMembers) throws RejectedExecutionException { + List expectedMembers) { Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers); if (!this.submitProcedure(proc)) { LOG.error("Failed to submit procedure '" + procName + "'"); @@ -303,4 +317,4 @@ public class ProcedureCoordinator { public Set getProcedureNames() { return new HashSet(procedures.keySet()); } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java index 6aa567abf5f..00b5100c226 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager; @@ -70,6 +71,8 @@ public class RegionServerProcedureManagerHost extends loadUserProcedures(conf, REGIONSERVER_PROCEDURE_CONF_KEY); // load the default snapshot manager procedures.add(new RegionServerSnapshotManager()); + // load the default flush region procedure manager + procedures.add(new RegionServerFlushTableProcedureManager()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java index 9d1b9bf6fa4..1c03f3d95dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java @@ -160,6 +160,7 @@ abstract public class Subprocedure implements Callable { LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage"); acquireBarrier(); LOG.debug("Subprocedure '" + barrierName + "' locally acquired"); + rethrowException(); // vote yes to coordinator about being prepared rpcs.sendMemberAcquired(this); @@ -180,6 +181,7 @@ abstract public class Subprocedure implements Callable { LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator."); insideBarrier(); LOG.debug("Subprocedure '" + barrierName + "' locally completed"); + rethrowException(); // Ack that the member has executed and released local barrier rpcs.sendMemberCompleted(this); @@ -327,4 +329,4 @@ abstract public class Subprocedure implements Callable { @Override public void cleanup(Exception e) {} }; -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java new file mode 100644 index 00000000000..370f18189ea --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure.flush; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * This flush region implementation uses the distributed procedure framework to flush + * table regions. + * Its acquireBarrier stage does nothing. Its insideBarrier stage flushes the regions. + */ +@InterfaceAudience.Private +public class FlushTableSubprocedure extends Subprocedure { + private static final Log LOG = LogFactory.getLog(FlushTableSubprocedure.class); + + private final String table; + private final List regions; + private final FlushTableSubprocedurePool taskManager; + + public FlushTableSubprocedure(ProcedureMember member, + ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, + List regions, String table, + FlushTableSubprocedurePool taskManager) { + super(member, table, errorListener, wakeFrequency, timeout); + this.table = table; + this.regions = regions; + this.taskManager = taskManager; + } + + private class RegionFlushTask implements Callable { + HRegion region; + RegionFlushTask(HRegion region) { + this.region = region; + } + + @Override + public Void call() throws Exception { + LOG.debug("Starting region operation on " + region); + region.startRegionOperation(); + try { + LOG.debug("Flush region " + region.toString() + " started..."); + region.flushcache(); + } finally { + LOG.debug("Closing region operation on " + region); + region.closeRegionOperation(); + } + return null; + } + } + + private void flushRegions() throws ForeignException { + if (regions.isEmpty()) { + // No regions on this RS, we are basically done. + return; + } + + monitor.rethrowException(); + + // assert that the taskManager is empty. + if (taskManager.hasTasks()) { + throw new IllegalStateException("Attempting to flush " + + table + " but we currently have outstanding tasks"); + } + + // Add all hfiles already existing in region. + for (HRegion region : regions) { + // submit one task per region for parallelize by region. + taskManager.submitTask(new RegionFlushTask(region)); + monitor.rethrowException(); + } + + // wait for everything to complete. + LOG.debug("Flush region tasks submitted for " + regions.size() + " regions"); + try { + taskManager.waitForOutstandingTasks(); + } catch (InterruptedException e) { + throw new ForeignException(getMemberName(), e); + } + } + + /** + * Flush the online regions on this rs for the target table. + */ + @Override + public void acquireBarrier() throws ForeignException { + flushRegions(); + } + + @Override + public void insideBarrier() throws ForeignException { + // No-Op + } + + /** + * Cancel threads if they haven't finished. + */ + @Override + public void cleanup(Exception e) { + LOG.info("Aborting all flush region subprocedure task threads for '" + + table + "' due to error", e); + try { + taskManager.cancelTasks(); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + + public void releaseBarrier() { + // NO OP + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java new file mode 100644 index 00000000000..16456c3b4fc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure.flush; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.MetricsMaster; +import org.apache.hadoop.hbase.procedure.MasterProcedureManager; +import org.apache.hadoop.hbase.procedure.Procedure; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; + +import com.google.common.collect.Lists; + +public class MasterFlushTableProcedureManager extends MasterProcedureManager { + + public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; + + private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis"; + private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; + private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis"; + private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500; + + private static final String FLUSH_PROC_POOL_THREADS_KEY = + "hbase.flush.procedure.master.threads"; + private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1; + + private static final Log LOG = LogFactory.getLog(MasterFlushTableProcedureManager.class); + + private MasterServices master; + private ProcedureCoordinator coordinator; + private Map procMap = new HashMap(); + private boolean stopped; + + public MasterFlushTableProcedureManager() {}; + + @Override + public void stop(String why) { + LOG.info("stop: " + why); + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + @Override + public void initialize(MasterServices master, MetricsMaster metricsMaster) + throws KeeperException, IOException, UnsupportedOperationException { + this.master = master; + + // get the configuration for the coordinator + Configuration conf = master.getConfiguration(); + long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT); + long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); + int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT); + + // setup the procedure coordinator + String name = master.getServerName().toString(); + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads); + ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( + master.getZooKeeper(), getProcedureSignature(), name); + + this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); + } + + @Override + public String getProcedureSignature() { + return FLUSH_TABLE_PROCEDURE_SIGNATURE; + } + + @Override + public void execProcedure(ProcedureDescription desc) throws IOException { + + TableName tableName = TableName.valueOf(desc.getInstance()); + + // call pre coproc hook + MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preTableFlush(tableName); + } + + // Get the list of region servers that host the online regions for table. + // We use the procedure instance name to carry the table name from the client. + // It is possible that regions may move after we get the region server list. + // Each region server will get its own online regions for the table. + // We may still miss regions that need to be flushed. + List> regionsAndLocations = null; + try { + regionsAndLocations = + MetaReader.getTableRegionsAndLocations(this.master.getCatalogTracker(), + TableName.valueOf(desc.getInstance()), false); + } catch (InterruptedException e1) { + String msg = "Failed to get regions for '" + desc.getInstance() + "'"; + LOG.error(msg); + throw new IOException(msg, e1); + + } + Set regionServers = new HashSet(regionsAndLocations.size()); + for (Pair region : regionsAndLocations) { + if (region != null && region.getFirst() != null && region.getSecond() != null) { + HRegionInfo hri = region.getFirst(); + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue; + regionServers.add(region.getSecond().toString()); + } + } + + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); + + // Kick of the global procedure from the master coordinator to the region servers. + // We rely on the existing Distributed Procedure framework to prevent any concurrent + // procedure with the same name. + Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), + new byte[0], Lists.newArrayList(regionServers)); + monitor.rethrowException(); + if (proc == null) { + String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '" + + desc.getInstance() + "'. " + "Another flush procedure is running?"; + LOG.error(msg); + throw new IOException(msg); + } + + procMap.put(tableName, proc); + + try { + // wait for the procedure to complete. A timer thread is kicked off that should cancel this + // if it takes too long. + proc.waitForCompleted(); + LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '" + + desc.getInstance() + "'"); + LOG.info("Master flush table procedure is successful!"); + } catch (InterruptedException e) { + ForeignException ee = + new ForeignException("Interrupted while waiting for flush table procdure to finish", e); + monitor.receive(ee); + Thread.currentThread().interrupt(); + } catch (ForeignException e) { + ForeignException ee = + new ForeignException("Exception while waiting for flush table procdure to finish", e); + monitor.receive(ee); + } + monitor.rethrowException(); + } + + @Override + public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException { + // Procedure instance name is the table name. + TableName tableName = TableName.valueOf(desc.getInstance()); + Procedure proc = procMap.get(tableName); + if (proc == null) { + // The procedure has not even been started yet. + // The client would request the procedure and call isProcedureDone(). + // The HBaseAdmin.execProcedure() wraps both request and isProcedureDone(). + return false; + } + // We reply on the existing Distributed Procedure framework to give us the status. + return proc.isCompleted(); + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java new file mode 100644 index 00000000000..9f0f674325b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure.flush; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.procedure.SubprocedureFactory; +import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This manager class handles flushing of the regions for table on a {@link HRegionServer}. + */ +@InterfaceAudience.Private +public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager { + private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class); + + private static final String CONCURENT_FLUSH_TASKS_KEY = + "hbase.flush.procedure.region.concurrentTasks"; + private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3; + + public static final String FLUSH_REQUEST_THREADS_KEY = + "hbase.flush.procedure.region.pool.threads"; + public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10; + + public static final String FLUSH_TIMEOUT_MILLIS_KEY = + "hbase.flush.procedure.region.timeout"; + public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; + + public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY = + "hbase.flush.procedure.region.wakefrequency"; + private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500; + + private RegionServerServices rss; + private ProcedureMemberRpcs memberRpcs; + private ProcedureMember member; + + /** + * Exposed for testing. + * @param conf HBase configuration. + * @param server region server. + * @param memberRpc use specified memberRpc instance + * @param procMember use specified ProcedureMember + */ + RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server, + ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { + this.rss = server; + this.memberRpcs = memberRpc; + this.member = procMember; + } + + public RegionServerFlushTableProcedureManager() {} + + /** + * Start accepting flush table requests. + */ + @Override + public void start() { + LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString()); + this.memberRpcs.start(rss.getServerName().toString(), member); + } + + /** + * Close this and all running tasks + * @param force forcefully stop all running tasks + * @throws IOException + */ + @Override + public void stop(boolean force) throws IOException { + String mode = force ? "abruptly" : "gracefully"; + LOG.info("Stopping region server flush procedure manager " + mode + "."); + + try { + this.member.close(); + } finally { + this.memberRpcs.close(); + } + } + + /** + * If in a running state, creates the specified subprocedure to flush table regions. + * + * Because this gets the local list of regions to flush and not the set the master had, + * there is a possibility of a race where regions may be missed. + * + * @param table + * @return Subprocedure to submit to the ProcedureMemeber. + */ + public Subprocedure buildSubprocedure(String table) { + + // don't run the subprocedure if the parent is stop(ping) + if (rss.isStopping() || rss.isStopped()) { + throw new IllegalStateException("Can't start flush region subprocedure on RS: " + + rss.getServerName() + ", because stopping/stopped!"); + } + + // check to see if this server is hosting any regions for the table + List involvedRegions; + try { + involvedRegions = getRegionsToFlush(table); + } catch (IOException e1) { + throw new IllegalStateException("Failed to figure out if there is region to flush.", e1); + } + + // We need to run the subprocedure even if we have no relevant regions. The coordinator + // expects participation in the procedure and without sending message the master procedure + // will hang and fail. + + LOG.debug("Launching subprocedure to flush regions for " + table); + ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table); + Configuration conf = rss.getConfiguration(); + long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, + FLUSH_TIMEOUT_MILLIS_DEFAULT); + long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, + FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); + + FlushTableSubprocedurePool taskManager = + new FlushTableSubprocedurePool(rss.getServerName().toString(), conf); + return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, + timeoutMillis, involvedRegions, table, taskManager); + } + + /** + * Get the list of regions to flush for the table on this server + * + * It is possible that if a region moves somewhere between the calls + * we'll miss the region. + * + * @param table + * @return the list of online regions. Empty list is returned if no regions. + * @throws IOException + */ + private List getRegionsToFlush(String table) throws IOException { + return rss.getOnlineRegions(TableName.valueOf(table)); + } + + public class FlushTableSubprocedureBuilder implements SubprocedureFactory { + + @Override + public Subprocedure buildSubprocedure(String name, byte[] data) { + // The name of the procedure instance from the master is the table name. + return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name); + } + + } + + /** + * We use the FlushTableSubprocedurePool, a class specific thread pool instead of + * {@link org.apache.hadoop.hbase.executor.ExecutorService}. + * + * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of + * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation + * failures. + */ + static class FlushTableSubprocedurePool { + private final ExecutorCompletionService taskPool; + private final ThreadPoolExecutor executor; + private volatile boolean stopped; + private final List> futures = new ArrayList>(); + private final String name; + + FlushTableSubprocedurePool(String name, Configuration conf) { + // configure the executor service + long keepAlive = conf.getLong( + RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, + RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT); + int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); + this.name = name; + executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), new DaemonThreadFactory("rs(" + + name + ")-flush-proc-pool")); + taskPool = new ExecutorCompletionService(executor); + } + + boolean hasTasks() { + return futures.size() != 0; + } + + /** + * Submit a task to the pool. + * + * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. + */ + void submitTask(final Callable task) { + Future f = this.taskPool.submit(task); + futures.add(f); + } + + /** + * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. + * This *must* be called after all tasks are submitted via submitTask. + * + * @return true on success, false otherwise + * @throws InterruptedException + */ + boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { + LOG.debug("Waiting for local region flush to finish."); + + int sz = futures.size(); + try { + // Using the completion service to process the futures. + for (int i = 0; i < sz; i++) { + Future f = taskPool.take(); + f.get(); + if (!futures.remove(f)) { + LOG.warn("unexpected future" + f); + } + LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks."); + } + LOG.debug("Completed " + sz + " local region flush tasks."); + return true; + } catch (InterruptedException e) { + LOG.warn("Got InterruptedException in FlushSubprocedurePool", e); + if (!stopped) { + Thread.currentThread().interrupt(); + throw new ForeignException("FlushSubprocedurePool", e); + } + // we are stopped so we can just exit. + } catch (ExecutionException e) { + if (e.getCause() instanceof ForeignException) { + LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); + throw (ForeignException)e.getCause(); + } + LOG.warn("Got Exception in FlushSubprocedurePool", e); + throw new ForeignException(name, e.getCause()); + } finally { + cancelTasks(); + } + return false; + } + + /** + * This attempts to cancel out all pending and in progress tasks (interruptions issues) + * @throws InterruptedException + */ + void cancelTasks() throws InterruptedException { + Collection> tasks = futures; + LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name); + for (Future f: tasks) { + f.cancel(false); + } + + // evict remaining tasks and futures from taskPool. + while (!futures.isEmpty()) { + // block to remove cancelled futures; + LOG.warn("Removing cancelled elements from taskPool"); + futures.remove(taskPool.take()); + } + stop(); + } + + /** + * Abruptly shutdown the thread pool. Call when exiting a region server. + */ + void stop() { + if (this.stopped) return; + + this.stopped = true; + this.executor.shutdownNow(); + } + } + + /** + * Initialize this region server flush procedure manager + * Uses a zookeeper based member controller. + * @param rss region server + * @throws KeeperException if the zookeeper cannot be reached + */ + @Override + public void initialize(RegionServerServices rss) throws KeeperException { + this.rss = rss; + ZooKeeperWatcher zkw = rss.getZooKeeper(); + this.memberRpcs = new ZKProcedureMemberRpcs(zkw, + MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE); + + Configuration conf = rss.getConfiguration(); + long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT); + + // create the actual flush table procedure member + ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), + opThreads, keepAlive); + this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder()); + } + + @Override + public String getProcedureSignature() { + return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 9b94c426927..c08ec18eba7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -1167,6 +1167,17 @@ public class AccessController extends BaseRegionObserver NamespaceDescriptor ns) throws IOException { } + @Override + public void preTableFlush(final ObserverContext ctx, + final TableName tableName) throws IOException { + requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE); + } + + @Override + public void postTableFlush(final ObserverContext ctx, + final TableName tableName) throws IOException { + } + /* ---- RegionObserver implementation ---- */ @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index cccb3ff6d62..a3442117fcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -555,6 +555,16 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb NamespaceDescriptor ns) throws IOException { } + @Override + public void preTableFlush(ObserverContext ctx, + TableName tableName) throws IOException { + } + + @Override + public void postTableFlush(ObserverContext ctx, + TableName tableName) throws IOException { + } + @Override public void preMasterInitialization(ObserverContext ctx) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java index 4e25d8a2265..5749abaa97c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java @@ -941,6 +941,16 @@ public class TestMasterObserver { public boolean wasGetTableDescriptorsCalled() { return preGetTableDescriptorsCalled && postGetTableDescriptorsCalled; } + + @Override + public void preTableFlush(ObserverContext ctx, + TableName tableName) throws IOException { + } + + @Override + public void postTableFlush(ObserverContext ctx, + TableName tableName) throws IOException { + } } private static HBaseTestingUtility UTIL = new HBaseTestingUtility();