diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 24fa8e5df0e..b08ff46a923 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -353,7 +353,7 @@ public final class HConstants { /** Default value for cluster ID */ public static final String CLUSTER_ID_DEFAULT = "default-cluster"; - + /** Parameter name for # days to keep MVCC values during a major compaction */ public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period"; /** At least to keep MVCC values in hfiles for 5 days */ @@ -1017,6 +1017,9 @@ public final class HConstants { public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS = "hbase.coordinated.state.manager.class"; + /** Configuration key for SplitLog manager timeout */ + public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout"; + /** * Configuration keys for Bucket cache */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index 1891941f93d..295cefec5ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -52,4 +52,13 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan @Override public abstract TableStateManager getTableStateManager() throws InterruptedException, CoordinatedStateException; + + /** + * Method to retrieve coordination for split log worker + */ + public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination(); + /** + * Method to retrieve coordination for split log manager + */ + public abstract SplitLogManagerCoordination getSplitLogManagerCoordination(); } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 4d62e5414ff..264e692a769 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.CoordinatedStateException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -32,11 +33,16 @@ import org.apache.zookeeper.KeeperException; public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { protected Server server; protected ZooKeeperWatcher watcher; + protected SplitLogWorkerCoordination splitLogWorkerCoordination; + protected SplitLogManagerCoordination splitLogManagerCoordination; @Override public void initialize(Server server) { this.server = server; this.watcher = server.getZooKeeper(); + splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher); + splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher); + } @Override @@ -53,4 +59,13 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { throw new CoordinatedStateException(e); } } + + @Override + public SplitLogWorkerCoordination getSplitLogWorkerCoordination() { + return splitLogWorkerCoordination; + } + @Override + public SplitLogManagerCoordination getSplitLogManagerCoordination() { + return splitLogManagerCoordination; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 9b3a9484cd5..456447a0af9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.ipc.RemoteException; -import org.apache.zookeeper.KeeperException; /** * This class abstracts a bunch of operations the HMaster needs to interact with @@ -91,12 +90,14 @@ public class MasterFileSystem { private final MasterServices services; final static PathFilter META_FILTER = new PathFilter() { + @Override public boolean accept(Path p) { return HLogUtil.isMetaFile(p); } }; final static PathFilter NON_META_FILTER = new PathFilter() { + @Override public boolean accept(Path p) { return !HLogUtil.isMetaFile(p); } @@ -123,14 +124,10 @@ public class MasterFileSystem { // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); HFileSystem.addLocationsOrderInterceptor(conf); - try { - this.splitLogManager = new SplitLogManager(master.getZooKeeper(), - master.getConfiguration(), master, services, - master.getServerName()); - } catch (KeeperException e) { - throw new IOException(e); - } - this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY); + this.splitLogManager = + new SplitLogManager(master, master.getConfiguration(), master, services, + master.getServerName()); + this.distributedLogReplay = this.splitLogManager.isLogReplaying(); } /** @@ -350,11 +347,7 @@ public class MasterFileSystem { if (regions == null || regions.isEmpty()) { return; } - try { - this.splitLogManager.markRegionsRecoveringInZK(serverName, regions); - } catch (KeeperException e) { - throw new IOException(e); - } + this.splitLogManager.markRegionsRecovering(serverName, regions); } public void splitLog(final Set serverNames) throws IOException { @@ -362,13 +355,13 @@ public class MasterFileSystem { } /** - * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegionsFromZK(Set)} + * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)} * @param failedServers - * @throws KeeperException + * @throws IOException */ void removeStaleRecoveringRegionsFromZK(final Set failedServers) - throws KeeperException, InterruptedIOException { - this.splitLogManager.removeStaleRecoveringRegionsFromZK(failedServers); + throws IOException, InterruptedIOException { + this.splitLogManager.removeStaleRecoveringRegions(failedServers); } /** @@ -459,7 +452,7 @@ public class MasterFileSystem { org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir .migrateFSTableDescriptorsIfNecessary(fs, rd); } - + // Create tableinfo-s for hbase:meta if not already there. new FSTableDescriptors(fs, rd).createTableDescriptor(HTableDescriptor.META_TABLEDESC); @@ -651,15 +644,10 @@ public class MasterFileSystem { /** * The function is used in SSH to set recovery mode based on configuration after all outstanding * log split tasks drained. - * @throws KeeperException - * @throws InterruptedIOException + * @throws IOException */ public void setLogRecoveryMode() throws IOException { - try { this.splitLogManager.setRecoveryMode(false); - } catch (KeeperException e) { - throw new IOException(e); - } } public RecoveryMode getLogRecoveryMode() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index b9414cd5db2..012e9a082c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -46,54 +46,40 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; -import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; +import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; -import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; /** * Distributes the task of log splitting to the available region servers. - * Coordination happens via zookeeper. For every log file that has to be split a - * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task. + * Coordination happens via coordination engine. For every log file that has to be split a + * task is created. SplitLogWorkers race to grab a task. * - *

SplitLogManager monitors the task znodes that it creates using the + *

SplitLogManager monitors the tasks that it creates using the * timeoutMonitor thread. If a task's progress is slow then - * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner - * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the - * task's znode is deleted by SplitLogManager. + * {@link SplitLogManagerCoordination#checkTasks} will take away the + * task from the owner {@link SplitLogWorker} and the task will be up for grabs again. When the + * task is done then it is deleted by SplitLogManager. * *

Clients call {@link #splitLogDistributed(Path)} to split a region server's * log files. The caller thread waits in this method until all the log files * have been split. * - *

All the zookeeper calls made by this class are asynchronous. This is mainly + *

All the coordination calls made by this class are asynchronous. This is mainly * to help reduce response time seen by the callers. * *

There is race in this design between the SplitLogManager and the @@ -107,30 +93,19 @@ import org.apache.zookeeper.data.Stat; * can delete the re-submission. */ @InterfaceAudience.Private -public class SplitLogManager extends ZooKeeperListener { +public class SplitLogManager { private static final Log LOG = LogFactory.getLog(SplitLogManager.class); - public static final int DEFAULT_TIMEOUT = 120000; - public static final int DEFAULT_ZK_RETRIES = 3; - public static final int DEFAULT_MAX_RESUBMIT = 3; - public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min + private Server server; private final Stoppable stopper; - private final MasterServices master; - private final ServerName serverName; - private final TaskFinisher taskFinisher; private FileSystem fs; private Configuration conf; - private long zkretries; - private long resubmit_threshold; - private long timeout; + public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min + private long unassignedTimeout; private long lastTaskCreateTime = Long.MAX_VALUE; - public boolean ignoreZKDeleteForTesting = false; - private volatile long lastRecoveringNodeCreationTime = 0; - // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check - // whether to GC stale recovering znodes private long checkRecoveringTimeThreshold = 15000; // 15 seconds private final List, Boolean>> failedRecoveringRegionDeletions = Collections .synchronizedList(new ArrayList, Boolean>>()); @@ -141,94 +116,45 @@ public class SplitLogManager extends ZooKeeperListener { */ protected final ReentrantLock recoveringRegionLock = new ReentrantLock(); - private volatile RecoveryMode recoveryMode; - private volatile boolean isDrainingDone = false; - private final ConcurrentMap tasks = new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; private volatile Set deadWorkers = null; private final Object deadWorkersLock = new Object(); - private Set failedDeletions = null; - - /** - * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, - * Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)} - * that provides a task finisher for copying recovered edits to their final destination. - * The task finisher has to be robust because it can be arbitrarily restarted or called - * multiple times. - * - * @param zkw the ZK watcher - * @param conf the HBase configuration - * @param stopper the stoppable in case anything is wrong - * @param master the master services - * @param serverName the master server name - * @throws KeeperException - * @throws InterruptedIOException - */ - public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, MasterServices master, ServerName serverName) - throws InterruptedIOException, KeeperException { - this(zkw, conf, stopper, master, serverName, new TaskFinisher() { - @Override - public Status finish(ServerName workerName, String logfile) { - try { - HLogSplitter.finishSplitLogFile(logfile, conf); - } catch (IOException e) { - LOG.warn("Could not finish splitting of log file " + logfile, e); - return Status.ERR; - } - return Status.DONE; - } - }); - } - /** * Its OK to construct this object even when region-servers are not online. It does lookup the - * orphan tasks in zk but it doesn't block waiting for them to be done. - * @param zkw the ZK watcher + * orphan tasks in coordination engine but it doesn't block waiting for them to be done. + * @param server the server instance * @param conf the HBase configuration * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name - * @param tf task finisher - * @throws KeeperException - * @throws InterruptedIOException + * @throws IOException */ - public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, - MasterServices master, ServerName serverName, TaskFinisher tf) throws InterruptedIOException, - KeeperException { - super(zkw); - this.taskFinisher = tf; + public SplitLogManager(Server server, Configuration conf, Stoppable stopper, + MasterServices master, ServerName serverName) throws IOException { + this.server = server; this.conf = conf; this.stopper = stopper; - this.master = master; - this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); - this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); - this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); - this.unassignedTimeout = - conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - - // Determine recovery mode - setRecoveryMode(true); - - LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + - ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY)); - - this.serverName = serverName; - this.timeoutMonitor = new TimeoutMonitor( - conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); - - this.failedDeletions = Collections.synchronizedSet(new HashSet()); - - Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName - + ".splitLogManagerTimeoutMonitor"); - // Watcher can be null during tests with Mock'd servers. - if (this.watcher != null) { - this.watcher.registerListener(this); - lookForOrphans(); + if (server.getCoordinatedStateManager() != null) { + SplitLogManagerCoordination coordination = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination(); + Set failedDeletions = Collections.synchronizedSet(new HashSet()); + SplitLogManagerDetails details = + new SplitLogManagerDetails(tasks, master, failedDeletions, serverName); + coordination.init(); + coordination.setDetails(details); + // Determine recovery mode } + this.unassignedTimeout = + conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); + this.timeoutMonitor = + new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), + stopper); + Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName + + ".splitLogManagerTimeoutMonitor"); } private FileStatus[] getFileList(List logDirs, PathFilter filter) throws IOException { @@ -252,10 +178,8 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * @param logDir - * one region sever hlog dir path in .logs - * @throws IOException - * if there was an error while splitting any log file + * @param logDir one region sever hlog dir path in .logs + * @throws IOException if there was an error while splitting any log file * @return cumulative size of the logfiles split * @throws IOException */ @@ -266,11 +190,9 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * The caller will block until all the log files of the given region server - * have been processed - successfully split or an error is encountered - by an - * available worker region server. This method must only be called after the - * region servers have been brought online. - * + * The caller will block until all the log files of the given region server have been processed - + * successfully split or an error is encountered - by an available worker region server. This + * method must only be called after the region servers have been brought online. * @param logDirs List of log dirs to split * @throws IOException If there was an error while splitting any log file * @return cumulative size of the logfiles split @@ -295,11 +217,9 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * The caller will block until all the hbase:meta log files of the given region server - * have been processed - successfully split or an error is encountered - by an - * available worker region server. This method must only be called after the - * region servers have been brought online. - * + * The caller will block until all the hbase:meta log files of the given region server have been + * processed - successfully split or an error is encountered - by an available worker region + * server. This method must only be called after the region servers have been brought online. * @param logDirs List of log dirs to split * @param filter the Path filter to select specific files for considering * @throws IOException If there was an error while splitting any log file @@ -307,8 +227,8 @@ public class SplitLogManager extends ZooKeeperListener { */ public long splitLogDistributed(final Set serverNames, final List logDirs, PathFilter filter) throws IOException { - MonitoredTask status = TaskMonitor.get().createStatus( - "Doing distributed log split in " + logDirs); + MonitoredTask status = + TaskMonitor.get().createStatus("Doing distributed log split in " + logDirs); FileStatus[] logfiles = getFileList(logDirs, filter); status.setStatus("Checking directory contents..."); LOG.debug("Scheduling batch of logs to split"); @@ -331,25 +251,24 @@ public class SplitLogManager extends ZooKeeperListener { } } waitForSplittingCompletion(batch, status); - // remove recovering regions from ZK + // remove recovering regions if (filter == MasterFileSystem.META_FILTER /* reference comparison */) { // we split meta regions and user regions separately therefore logfiles are either all for // meta or user regions but won't for both( we could have mixed situations in tests) isMetaRecovery = true; } - this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery); + removeRecoveringRegions(serverNames, isMetaRecovery); if (batch.done != batch.installed) { batch.isDead = true; SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); - LOG.warn("error while splitting logs in " + logDirs + - " installed = " + batch.installed + " but only " + batch.done + " done"); - String msg = "error or interrupted while splitting logs in " - + logDirs + " Task = " + batch; + LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + + " but only " + batch.done + " done"); + String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; status.abort(msg); throw new IOException(msg); } - for(Path logDir: logDirs){ + for (Path logDir : logDirs) { status.setStatus("Cleaning up log directory..."); try { if (fs.exists(logDir) && !fs.delete(logDir, false)) { @@ -358,39 +277,39 @@ public class SplitLogManager extends ZooKeeperListener { } catch (IOException ioe) { FileStatus[] files = fs.listStatus(logDir); if (files != null && files.length > 0) { - LOG.warn("returning success without actually splitting and " + - "deleting all the log files in path " + logDir); + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); } else { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); } } SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); } - String msg = "finished splitting (more than or equal to) " + totalSize + - " bytes in " + batch.installed + " log files in " + logDirs + " in " + - (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; + String msg = + "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed + + " log files in " + logDirs + " in " + + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; status.markComplete(msg); LOG.info(msg); return totalSize; } /** - * Add a task entry to splitlog znode if it is not already there. - * + * Add a task entry to coordination if it is not already there. * @param taskname the path of the log to be split * @param batch the batch this task belongs to * @return true if a new entry is created, false if it is already there. */ boolean enqueueSplitTask(String taskname, TaskBatch batch) { - SplitLogCounters.tot_mgr_log_split_start.incrementAndGet(); - // This is a znode path under the splitlog dir with the rest of the path made up of an - // url encoding of the passed in log to split. - String path = ZKSplitLog.getEncodedNodeName(watcher, taskname); lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); - Task oldtask = createTaskIfAbsent(path, batch); + String task = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().prepareTask(taskname); + Task oldtask = createTaskIfAbsent(task, batch); if (oldtask == null) { - // publish the task in zk - createNode(path, zkretries); + // publish the task in the coordination engine + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().submitTask(task); return true; } return false; @@ -400,26 +319,25 @@ public class SplitLogManager extends ZooKeeperListener { synchronized (batch) { while ((batch.done + batch.error) != batch.installed) { try { - status.setStatus("Waiting for distributed tasks to finish. " - + " scheduled=" + batch.installed - + " done=" + batch.done - + " error=" + batch.error); + status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" + + batch.installed + " done=" + batch.done + " error=" + batch.error); int remaining = batch.installed - (batch.done + batch.error); int actual = activeTasks(batch); if (remaining != actual) { - LOG.warn("Expected " + remaining - + " active tasks, but actually there are " + actual); + LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); } - int remainingInZK = remainingTasksInZK(); - if (remainingInZK >= 0 && actual > remainingInZK) { - LOG.warn("Expected at least" + actual - + " tasks in ZK, but actually there are " + remainingInZK); + int remainingTasks = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().remainingTasksInCoordination(); + if (remainingTasks >= 0 && actual > remainingTasks) { + LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " + + remainingTasks); } - if (remainingInZK == 0 || actual == 0) { - LOG.warn("No more task remaining (ZK or task map), splitting " - + "should have completed. Remaining tasks in ZK " + remainingInZK - + ", active tasks in map " + actual); - if (remainingInZK == 0 && actual == 0) { + if (remainingTasks == 0 || actual == 0) { + LOG.warn("No more task remaining, splitting " + + "should have completed. Remaining tasks is " + remainingTasks + + ", active tasks in map " + actual); + if (remainingTasks == 0 && actual == 0) { return; } } @@ -439,31 +357,13 @@ public class SplitLogManager extends ZooKeeperListener { private int activeTasks(final TaskBatch batch) { int count = 0; - for (Task t: tasks.values()) { + for (Task t : tasks.values()) { if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { count++; } } return count; - } - private int remainingTasksInZK() { - int count = 0; - try { - List tasks = - ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null) { - for (String t: tasks) { - if (!ZKSplitLog.isRescanNode(watcher, t)) { - count++; - } - } - } - } catch (KeeperException ke) { - LOG.warn("Failed to check remaining tasks", ke); - count = -1; - } - return count; } /** @@ -473,15 +373,12 @@ public class SplitLogManager extends ZooKeeperListener { * @param isMetaRecovery whether current recovery is for the meta region on * serverNames */ - private void - removeRecoveringRegionsFromZK(final Set serverNames, Boolean isMetaRecovery) { - if (this.recoveryMode != RecoveryMode.LOG_REPLAY) { + private void removeRecoveringRegions(final Set serverNames, Boolean isMetaRecovery) { + if (!isLogReplaying()) { // the function is only used in WALEdit direct replay mode return; } - final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(); - int count = 0; Set recoveredServerNameSet = new HashSet(); if (serverNames != null) { for (ServerName tmpServerName : serverNames) { @@ -491,56 +388,11 @@ public class SplitLogManager extends ZooKeeperListener { try { this.recoveringRegionLock.lock(); - - List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null) { - for (String t : tasks) { - if (!ZKSplitLog.isRescanNode(watcher, t)) { - count++; - } - } - } - if (count == 0 && this.master.isInitialized() - && !this.master.getServerManager().areDeadServersInProgress()) { - // no splitting work items left - deleteRecoveringRegionZNodes(watcher, null); - // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at - // this point. - lastRecoveringNodeCreationTime = Long.MAX_VALUE; - } else if (!recoveredServerNameSet.isEmpty()) { - // remove recovering regions which doesn't have any RS associated with it - List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); - if (regions != null) { - for (String region : regions) { - if(isMetaRecovery != null) { - if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName)) - || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) { - // skip non-meta regions when recovering the meta region or - // skip the meta region when recovering user regions - continue; - } - } - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); - List failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath); - if (failedServers == null || failedServers.isEmpty()) { - ZKUtil.deleteNode(watcher, nodePath); - continue; - } - if (recoveredServerNameSet.containsAll(failedServers)) { - ZKUtil.deleteNodeRecursively(watcher, nodePath); - } else { - for (String failedServer : failedServers) { - if (recoveredServerNameSet.contains(failedServer)) { - String tmpPath = ZKUtil.joinZNode(nodePath, failedServer); - ZKUtil.deleteNode(watcher, tmpPath); - } - } - } - } - } - } - } catch (KeeperException ke) { - LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet, + isMetaRecovery); + } catch (IOException e) { + LOG.warn("removeRecoveringRegions got exception. Will retry", e); if (serverNames != null && !serverNames.isEmpty()) { this.failedRecoveringRegionDeletions.add(new Pair, Boolean>(serverNames, isMetaRecovery)); @@ -554,11 +406,10 @@ public class SplitLogManager extends ZooKeeperListener { * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name] * during master initialization phase. * @param failedServers A set of known failed servers - * @throws KeeperException + * @throws IOException */ - void removeStaleRecoveringRegionsFromZK(final Set failedServers) - throws KeeperException, InterruptedIOException { - + void removeStaleRecoveringRegions(final Set failedServers) throws IOException, + InterruptedIOException { Set knownFailedServers = new HashSet(); if (failedServers != null) { for (ServerName tmpServerName : failedServers) { @@ -568,406 +419,13 @@ public class SplitLogManager extends ZooKeeperListener { this.recoveringRegionLock.lock(); try { - List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null) { - for (String t : tasks) { - byte[] data; - try { - data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t)); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - if (data != null) { - SplitLogTask slt = null; - try { - slt = SplitLogTask.parseFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed parse data for znode " + t, e); - } - if (slt != null && slt.isDone()) { - continue; - } - } - // decode the file name - t = ZKSplitLog.getFileName(t); - ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t)); - if (serverName != null) { - knownFailedServers.add(serverName.getServerName()); - } else { - LOG.warn("Found invalid WAL log file name:" + t); - } - } - } - - // remove recovering regions which doesn't have any RS associated with it - List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); - if (regions != null) { - for (String region : regions) { - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); - List regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath); - if (regionFailedServers == null || regionFailedServers.isEmpty()) { - ZKUtil.deleteNode(watcher, nodePath); - continue; - } - boolean needMoreRecovery = false; - for (String tmpFailedServer : regionFailedServers) { - if (knownFailedServers.contains(tmpFailedServer)) { - needMoreRecovery = true; - break; - } - } - if (!needMoreRecovery) { - ZKUtil.deleteNodeRecursively(watcher, nodePath); - } - } - } + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers); } finally { this.recoveringRegionLock.unlock(); } } - public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List regions) { - try { - if (regions == null) { - // remove all children under /home/recovering-regions - LOG.debug("Garbage collecting all recovering region znodes"); - ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode); - } else { - for (String curRegion : regions) { - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion); - ZKUtil.deleteNodeRecursively(watcher, nodePath); - } - } - } catch (KeeperException e) { - LOG.warn("Cannot remove recovering regions from ZooKeeper", e); - } - } - - private void setDone(String path, TerminationStatus status) { - Task task = tasks.get(path); - if (task == null) { - if (!ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); - LOG.debug("unacquired orphan task is done " + path); - } - } else { - synchronized (task) { - if (task.status == IN_PROGRESS) { - if (status == SUCCESS) { - SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); - LOG.info("Done splitting " + path); - } else { - SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); - LOG.warn("Error splitting " + path); - } - task.status = status; - if (task.batch != null) { - synchronized (task.batch) { - if (status == SUCCESS) { - task.batch.done++; - } else { - task.batch.error++; - } - task.batch.notify(); - } - } - } - } - } - // delete the task node in zk. It's an async - // call and no one is blocked waiting for this node to be deleted. All - // task names are unique (log.) there is no risk of deleting - // a future task. - // if a deletion fails, TimeoutMonitor will retry the same deletion later - deleteNode(path, zkretries); - return; - } - - private void createNode(String path, Long retry_count) { - SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode); - ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); - return; - } - - private void createNodeSuccess(String path) { - LOG.debug("put up splitlog task at znode " + path); - getDataSetWatch(path, zkretries); - } - - private void createNodeFailure(String path) { - // TODO the Manager should split the log locally instead of giving up - LOG.warn("failed to create task node" + path); - setDone(path, FAILURE); - } - - - private void getDataSetWatch(String path, Long retry_count) { - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(path, this.watcher, - new GetDataAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); - } - - private void tryGetDataSetWatch(String path) { - // A negative retry count will lead to ignoring all error processing. - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(path, this.watcher, - new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); - } - - private void getDataSetWatchSuccess(String path, byte[] data, int version) - throws DeserializationException { - if (data == null) { - if (version == Integer.MIN_VALUE) { - // assume all done. The task znode suddenly disappeared. - setDone(path, SUCCESS); - return; - } - SplitLogCounters.tot_mgr_null_data.incrementAndGet(); - LOG.fatal("logic error - got null data " + path); - setDone(path, FAILURE); - return; - } - data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); - SplitLogTask slt = SplitLogTask.parseFrom(data); - if (slt.isUnassigned()) { - LOG.debug("task not yet acquired " + path + " ver = " + version); - handleUnassignedTask(path); - } else if (slt.isOwned()) { - heartbeat(path, version, slt.getServerName()); - } else if (slt.isResigned()) { - LOG.info("task " + path + " entered state: " + slt.toString()); - resubmitOrFail(path, FORCE); - } else if (slt.isDone()) { - LOG.info("task " + path + " entered state: " + slt.toString()); - if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { - if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { - setDone(path, SUCCESS); - } else { - resubmitOrFail(path, CHECK); - } - } else { - setDone(path, SUCCESS); - } - } else if (slt.isErr()) { - LOG.info("task " + path + " entered state: " + slt.toString()); - resubmitOrFail(path, CHECK); - } else { - LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString()); - setDone(path, FAILURE); - } - } - - private void getDataSetWatchFailure(String path) { - LOG.warn("failed to set data watch " + path); - setDone(path, FAILURE); - } - - /** - * It is possible for a task to stay in UNASSIGNED state indefinitely - say - * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED - * state but it dies before it could create the RESCAN task node to signal - * the SplitLogWorkers to pick up the task. To prevent this scenario the - * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. - * - * @param path - */ - private void handleUnassignedTask(String path) { - if (ZKSplitLog.isRescanNode(watcher, path)) { - return; - } - Task task = findOrCreateOrphanTask(path); - if (task.isOrphan() && (task.incarnation == 0)) { - LOG.info("resubmitting unassigned orphan task " + path); - // ignore failure to resubmit. The timeout-monitor will handle it later - // albeit in a more crude fashion - resubmit(path, task, FORCE); - } - } - - /** - * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions - * @param statusCode integer value of a ZooKeeper exception code - * @param action description message about the retried action - * @return true when need to abandon retries otherwise false - */ - private boolean needAbandonRetries(int statusCode, String action) { - if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) { - LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for " - + "action=" + action); - return true; - } - return false; - } - - private void heartbeat(String path, int new_version, ServerName workerName) { - Task task = findOrCreateOrphanTask(path); - if (new_version != task.last_version) { - if (task.isUnassigned()) { - LOG.info("task " + path + " acquired by " + workerName); - } - task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName); - SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); - } else { - // duplicate heartbeats - heartbeats w/o zk node version - // changing - are possible. The timeout thread does - // getDataSetWatch() just to check whether a node still - // exists or not - } - return; - } - - private boolean resubmit(String path, Task task, ResubmitDirective directive) { - // its ok if this thread misses the update to task.deleted. It will fail later - if (task.status != IN_PROGRESS) { - return false; - } - int version; - if (directive != FORCE) { - // We're going to resubmit: - // 1) immediately if the worker server is now marked as dead - // 2) after a configurable timeout if the server is not marked as dead but has still not - // finished the task. This allows to continue if the worker cannot actually handle it, - // for any reason. - final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update; - final boolean alive = master.getServerManager() != null ? - master.getServerManager().isServerOnline(task.cur_worker_name) : true; - if (alive && time < timeout) { - LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " + - task.cur_worker_name + " is not marked as dead, we waited for " + time + - " while the timeout is " + timeout); - return false; - } - if (task.unforcedResubmits.get() >= resubmit_threshold) { - if (!task.resubmitThresholdReached) { - task.resubmitThresholdReached = true; - SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet(); - LOG.info("Skipping resubmissions of task " + path + - " because threshold " + resubmit_threshold + " reached"); - } - return false; - } - // race with heartbeat() that might be changing last_version - version = task.last_version; - } else { - SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet(); - version = -1; - } - LOG.info("resubmitting task " + path); - task.incarnation++; - try { - // blocking zk call but this is done from the timeout thread - SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode); - if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { - LOG.debug("failed to resubmit task " + path + - " version changed"); - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); - return false; - } - } catch (NoNodeException e) { - LOG.warn("failed to resubmit because znode doesn't exist " + path + - " task done (or forced done by removing the znode)"); - try { - getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); - } catch (DeserializationException e1) { - LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1); - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); - return false; - } - return false; - } catch (KeeperException.BadVersionException e) { - LOG.debug("failed to resubmit task " + path + " version changed"); - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); - return false; - } catch (KeeperException e) { - SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet(); - LOG.warn("failed to resubmit " + path, e); - return false; - } - // don't count forced resubmits - if (directive != FORCE) { - task.unforcedResubmits.incrementAndGet(); - } - task.setUnassigned(); - createRescanNode(Long.MAX_VALUE); - SplitLogCounters.tot_mgr_resubmit.incrementAndGet(); - return true; - } - - private void resubmitOrFail(String path, ResubmitDirective directive) { - if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) { - setDone(path, FAILURE); - } - } - - private void deleteNode(String path, Long retries) { - SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet(); - // Once a task znode is ready for delete, that is it is in the TASK_DONE - // state, then no one should be writing to it anymore. That is no one - // will be updating the znode version any more. - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - delete(path, -1, new DeleteAsyncCallback(), - retries); - } - - private void deleteNodeSuccess(String path) { - if (ignoreZKDeleteForTesting) { - return; - } - Task task; - task = tasks.remove(path); - if (task == null) { - if (ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); - } - SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); - LOG.debug("deleted task without in memory state " + path); - return; - } - synchronized (task) { - task.status = DELETED; - task.notify(); - } - SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); - } - - private void deleteNodeFailure(String path) { - LOG.info("Failed to delete node " + path + " and will retry soon."); - return; - } - - /** - * signal the workers that a task was resubmitted by creating the - * RESCAN node. - * @throws KeeperException - */ - private void createRescanNode(long retries) { - // The RESCAN node will be deleted almost immediately by the - // SplitLogManager as soon as it is created because it is being - // created in the DONE state. This behavior prevents a buildup - // of RESCAN nodes. But there is also a chance that a SplitLogWorker - // might miss the watch-trigger that creation of RESCAN node provides. - // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks - // therefore this behavior is safe. - lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); - SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode); - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, - new CreateRescanAsyncCallback(), Long.valueOf(retries)); - } - - private void createRescanSuccess(String path) { - SplitLogCounters.tot_mgr_rescan.incrementAndGet(); - getDataSetWatch(path, zkretries); - } - - private void createRescanFailure() { - LOG.fatal("logic failure, rescan failure must not happen"); - } - /** * @param path * @param batch @@ -982,7 +440,7 @@ public class SplitLogManager extends ZooKeeperListener { oldtask = tasks.putIfAbsent(path, newtask); if (oldtask == null) { batch.installed++; - return null; + return null; } // new task was not used. synchronized (oldtask) { @@ -1013,16 +471,15 @@ public class SplitLogManager extends ZooKeeperListener { } } if (oldtask.status != DELETED) { - LOG.warn("Failure because previously failed task" + - " state still present. Waiting for znode delete callback" + - " path=" + path); + LOG.warn("Failure because previously failed task" + + " state still present. Waiting for znode delete callback" + " path=" + path); return oldtask; } // reinsert the newTask and it must succeed this time Task t = tasks.putIfAbsent(path, newtask); if (t == null) { batch.installed++; - return null; + return null; } LOG.fatal("Logic error. Deleted task still present in tasks map"); assert false : "Deleted task still present in tasks map"; @@ -1045,308 +502,86 @@ public class SplitLogManager extends ZooKeeperListener { return task; } - @Override - public void nodeDataChanged(String path) { - Task task; - task = tasks.get(path); - if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { - if (task != null) { - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); - } - getDataSetWatch(path, zkretries); - } - } - public void stop() { if (timeoutMonitor != null) { timeoutMonitor.interrupt(); } } - private void lookForOrphans() { - List orphans; - try { - orphans = ZKUtil.listChildrenNoWatch(this.watcher, - this.watcher.splitLogZNode); - if (orphans == null) { - LOG.warn("could not get children of " + this.watcher.splitLogZNode); - return; + void handleDeadWorker(ServerName workerName) { + // resubmit the tasks on the TimeoutMonitor thread. Makes it easier + // to reason about concurrency. Makes it easier to retry. + synchronized (deadWorkersLock) { + if (deadWorkers == null) { + deadWorkers = new HashSet(100); } - } catch (KeeperException e) { - LOG.warn("could not get children of " + this.watcher.splitLogZNode + - " " + StringUtils.stringifyException(e)); - return; + deadWorkers.add(workerName); } - int rescan_nodes = 0; - for (String path : orphans) { - String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path); - if (ZKSplitLog.isRescanNode(watcher, nodepath)) { - rescan_nodes++; - LOG.debug("found orphan rescan node " + path); - } else { - LOG.info("found orphan task " + path); + LOG.info("dead splitlog worker " + workerName); + } + + void handleDeadWorkers(Set serverNames) { + synchronized (deadWorkersLock) { + if (deadWorkers == null) { + deadWorkers = new HashSet(100); } - getDataSetWatch(nodepath, zkretries); + deadWorkers.addAll(serverNames); } - LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + - rescan_nodes + " rescan nodes"); + LOG.info("dead splitlog workers " + serverNames); } /** - * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for - * all regions of the passed in region servers - * @param serverName the name of a region server - * @param userRegions user regiones assigned on the region server + * This function is to set recovery mode from outstanding split log tasks from before or current + * configuration setting + * @param isForInitialization + * @throws IOException throws if it's impossible to set recovery mode */ - void markRegionsRecoveringInZK(final ServerName serverName, Set userRegions) - throws KeeperException, InterruptedIOException { - if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) { + public void setRecoveryMode(boolean isForInitialization) throws IOException { + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization); + + } + + public void markRegionsRecovering(ServerName server, Set userRegions) + throws InterruptedIOException, IOException { + if (userRegions == null || (!isLogReplaying())) { return; } - try { this.recoveringRegionLock.lock(); - // mark that we're creating recovering znodes - this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis(); - - for (HRegionInfo region : userRegions) { - String regionEncodeName = region.getEncodedName(); - long retries = this.zkretries; - - do { - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName); - long lastRecordedFlushedSequenceId = -1; - try { - long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId( - regionEncodeName.getBytes()); - - /* - * znode layout: .../region_id[last known flushed sequence id]/failed server[last known - * flushed sequence id for the server] - */ - byte[] data = ZKUtil.getData(this.watcher, nodePath); - if (data == null) { - ZKUtil.createSetData(this.watcher, nodePath, - ZKUtil.positionToByteArray(lastSequenceId)); - } else { - lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); - if (lastRecordedFlushedSequenceId < lastSequenceId) { - // update last flushed sequence id in the region level - ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId)); - } - } - // go one level deeper with server name - nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName()); - if (lastSequenceId <= lastRecordedFlushedSequenceId) { - // the newly assigned RS failed even before any flush to the region - lastSequenceId = lastRecordedFlushedSequenceId; - } - ZKUtil.createSetData(this.watcher, nodePath, - ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null)); - LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server " - + serverName); - - // break retry loop - break; - } catch (KeeperException e) { - // ignore ZooKeeper exceptions inside retry loop - if (retries <= 1) { - throw e; - } - // wait a little bit for retry - try { - Thread.sleep(20); - } catch (InterruptedException e1) { - throw new InterruptedIOException(); - } - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } while ((--retries) > 0 && (!this.stopper.isStopped())); - } + // mark that we're creating recovering regions + ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions); } finally { this.recoveringRegionLock.unlock(); } + } /** - * @param bytes - Content of a failed region server or recovering region znode. - * @return long - The last flushed sequence Id for the region server + * @return whether log is replaying */ - public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) { - long lastRecordedFlushedSequenceId = -1l; - try { - lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes); - } catch (DeserializationException e) { - lastRecordedFlushedSequenceId = -1l; - LOG.warn("Can't parse last flushed sequence Id", e); - } - return lastRecordedFlushedSequenceId; + public boolean isLogReplaying() { + if (server.getCoordinatedStateManager() == null) return false; + return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().isReplaying(); } /** - * check if /hbase/recovering-regions/ exists. Returns true if exists - * and set watcher as well. - * @param zkw - * @param regionEncodedName region encode name - * @return true when /hbase/recovering-regions/ exists - * @throws KeeperException + * @return whether log is splitting */ - public static boolean - isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName) - throws KeeperException { - boolean result = false; - String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName); - - byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath); - if (node != null) { - result = true; - } - return result; + public boolean isLogSplitting() { + if (server.getCoordinatedStateManager() == null) return false; + return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().isSplitting(); } /** - * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK - * @param zkw - * @param serverName - * @param encodedRegionName - * @return the last flushed sequence ids recorded in ZK of the region for serverName - * @throws IOException + * @return the current log recovery mode */ - public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw, - String serverName, String encodedRegionName) throws IOException { - // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits, - // last flushed sequence Id changes when newly assigned RS flushes writes to the region. - // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed - // sequence Id name space (sequence Id only valid for a particular RS instance), changes - // when different newly assigned RS flushes the region. - // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of - // last flushed sequence Id for each failed RS instance. - RegionStoreSequenceIds result = null; - String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName); - nodePath = ZKUtil.joinZNode(nodePath, serverName); - try { - byte[] data; - try { - data = ZKUtil.getData(zkw, nodePath); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - if (data != null) { - result = ZKUtil.parseRegionStoreSequenceIds(data); - } - } catch (KeeperException e) { - throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server=" - + serverName + "; region=" + encodedRegionName, e); - } catch (DeserializationException e) { - LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e); - } - return result; - } - - /** - * This function is to set recovery mode from outstanding split log tasks from before or - * current configuration setting - * @param isForInitialization - * @throws KeeperException - * @throws InterruptedIOException - */ - public void setRecoveryMode(boolean isForInitialization) throws KeeperException, - InterruptedIOException { - if(this.isDrainingDone) { - // when there is no outstanding splitlogtask after master start up, we already have up to date - // recovery mode - return; - } - if(this.watcher == null) { - // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING - this.isDrainingDone = true; - this.recoveryMode = RecoveryMode.LOG_SPLITTING; - return; - } - boolean hasSplitLogTask = false; - boolean hasRecoveringRegions = false; - RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN; - RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? - RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING; - - // Firstly check if there are outstanding recovering regions - List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); - if (regions != null && !regions.isEmpty()) { - hasRecoveringRegions = true; - previousRecoveryMode = RecoveryMode.LOG_REPLAY; - } - if (previousRecoveryMode == RecoveryMode.UNKNOWN) { - // Secondly check if there are outstanding split log task - List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null && !tasks.isEmpty()) { - hasSplitLogTask = true; - if (isForInitialization) { - // during initialization, try to get recovery mode from splitlogtask - for (String task : tasks) { - try { - byte[] data = ZKUtil.getData(this.watcher, - ZKUtil.joinZNode(watcher.splitLogZNode, task)); - if (data == null) continue; - SplitLogTask slt = SplitLogTask.parseFrom(data); - previousRecoveryMode = slt.getMode(); - if (previousRecoveryMode == RecoveryMode.UNKNOWN) { - // created by old code base where we don't set recovery mode in splitlogtask - // we can safely set to LOG_SPLITTING because we're in master initialization code - // before SSH is enabled & there is no outstanding recovering regions - previousRecoveryMode = RecoveryMode.LOG_SPLITTING; - } - break; - } catch (DeserializationException e) { - LOG.warn("Failed parse data for znode " + task, e); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - } - } - } - - synchronized(this) { - if(this.isDrainingDone) { - return; - } - if (!hasSplitLogTask && !hasRecoveringRegions) { - this.isDrainingDone = true; - this.recoveryMode = recoveryModeInConfig; - return; - } else if (!isForInitialization) { - // splitlogtask hasn't drained yet, keep existing recovery mode - return; - } - - if (previousRecoveryMode != RecoveryMode.UNKNOWN) { - this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig); - this.recoveryMode = previousRecoveryMode; - } else { - this.recoveryMode = recoveryModeInConfig; - } - } - } - public RecoveryMode getRecoveryMode() { - return this.recoveryMode; - } - - /** - * Returns if distributed log replay is turned on or not - * @param conf - * @return true when distributed log replay is turned on - */ - private boolean isDistributedLogReplay(Configuration conf) { - boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); - int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); - if (LOG.isDebugEnabled()) { - LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version); - } - // For distributed log replay, hfile version must be 3 at least; we need tag support. - return dlr && (version >= 3); + return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().getRecoveryMode(); } /** @@ -1355,11 +590,12 @@ public class SplitLogManager extends ZooKeeperListener { *

* All access is synchronized. */ - static class TaskBatch { - int installed = 0; - int done = 0; - int error = 0; - volatile boolean isDead = false; + @InterfaceAudience.Private + public static class TaskBatch { + public int installed = 0; + public int done = 0; + public int error = 0; + public volatile boolean isDead = false; @Override public String toString() { @@ -1370,28 +606,25 @@ public class SplitLogManager extends ZooKeeperListener { /** * in memory state of an active task. */ - static class Task { - volatile long last_update; - volatile int last_version; - volatile ServerName cur_worker_name; - volatile TaskBatch batch; - volatile TerminationStatus status; - volatile int incarnation; - final AtomicInteger unforcedResubmits = new AtomicInteger(); - volatile boolean resubmitThresholdReached; + @InterfaceAudience.Private + public static class Task { + public volatile long last_update; + public volatile int last_version; + public volatile ServerName cur_worker_name; + public volatile TaskBatch batch; + public volatile TerminationStatus status; + public volatile int incarnation; + public final AtomicInteger unforcedResubmits = new AtomicInteger(); + public volatile boolean resubmitThresholdReached; @Override public String toString() { - return ("last_update = " + last_update + - " last_version = " + last_version + - " cur_worker_name = " + cur_worker_name + - " status = " + status + - " incarnation = " + incarnation + - " resubmits = " + unforcedResubmits.get() + - " batch = " + batch); + return ("last_update = " + last_update + " last_version = " + last_version + + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = " + + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch); } - Task() { + public Task() { incarnation = 0; last_version = -1; status = IN_PROGRESS; @@ -1422,31 +655,8 @@ public class SplitLogManager extends ZooKeeperListener { } } - void handleDeadWorker(ServerName workerName) { - // resubmit the tasks on the TimeoutMonitor thread. Makes it easier - // to reason about concurrency. Makes it easier to retry. - synchronized (deadWorkersLock) { - if (deadWorkers == null) { - deadWorkers = new HashSet(100); - } - deadWorkers.add(workerName); - } - LOG.info("dead splitlog worker " + workerName); - } - - void handleDeadWorkers(Set serverNames) { - synchronized (deadWorkersLock) { - if (deadWorkers == null) { - deadWorkers = new HashSet(100); - } - deadWorkers.addAll(serverNames); - } - LOG.info("dead splitlog workers " + serverNames); - } - /** - * Periodically checks all active tasks and resubmits the ones that have timed - * out + * Periodically checks all active tasks and resubmits the ones that have timed out */ private class TimeoutMonitor extends Chore { private long lastLog = 0; @@ -1485,14 +695,16 @@ public class SplitLogManager extends ZooKeeperListener { found_assigned_task = true; if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); - if (resubmit(path, task, FORCE)) { + if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { resubmitted++; } else { handleDeadWorker(cur_worker); - LOG.warn("Failed to resubmit task " + path + " owned by dead " + - cur_worker + ", will retry."); + LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker + + ", will retry."); } - } else if (resubmit(path, task, CHECK)) { + } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { resubmitted++; } } @@ -1515,39 +727,46 @@ public class SplitLogManager extends ZooKeeperListener { // manager will be indefinitely creating RESCAN nodes. TODO may be the // master should spawn both a manager and a worker thread to guarantee // that there is always one worker in the system - if (tot > 0 && !found_assigned_task && - ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) > - unassignedTimeout)) { + if (tot > 0 + && !found_assigned_task + && ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) > unassignedTimeout)) { for (Map.Entry e : tasks.entrySet()) { - String path = e.getKey(); + String key = e.getKey(); Task task = e.getValue(); // we have to do task.isUnassigned() check again because tasks might // have been asynchronously assigned. There is no locking required // for these checks ... it is OK even if tryGetDataSetWatch() is - // called unnecessarily for a task + // called unnecessarily for a taskpath if (task.isUnassigned() && (task.status != FAILURE)) { // We just touch the znode to make sure its still there - tryGetDataSetWatch(path); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().checkTaskStillAvailable(key); } } - createRescanNode(Long.MAX_VALUE); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().checkTasks(); SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); } - + Set failedDeletions = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().getDetails().getFailedDeletions(); // Retry previously failed deletes if (failedDeletions.size() > 0) { List tmpPaths = new ArrayList(failedDeletions); for (String tmpPath : tmpPaths) { // deleteNode is an async call - deleteNode(tmpPath, zkretries); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().deleteTask(tmpPath); } failedDeletions.removeAll(tmpPaths); } - // Garbage collect left-over /hbase/recovering-regions/... znode - long timeInterval = EnvironmentEdgeManager.currentTimeMillis() - - lastRecoveringNodeCreationTime; + // Garbage collect left-over + long timeInterval = + EnvironmentEdgeManager.currentTimeMillis() + - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().getLastRecoveryTime(); if (!failedRecoveringRegionDeletions.isEmpty() || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) { // inside the function there have more checks before GC anything @@ -1556,223 +775,24 @@ public class SplitLogManager extends ZooKeeperListener { new ArrayList, Boolean>>(failedRecoveringRegionDeletions); failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions); for (Pair, Boolean> failedDeletion : previouslyFailedDeletions) { - removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond()); + removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond()); } } else { - removeRecoveringRegionsFromZK(null, null); + removeRecoveringRegions(null, null); } } } } - /** - * Asynchronous handler for zk create node results. - * Retries on failures. - */ - class CreateAsyncCallback implements AsyncCallback.StringCallback { - private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - SplitLogCounters.tot_mgr_node_create_result.incrementAndGet(); - if (rc != 0) { - if (needAbandonRetries(rc, "Create znode " + path)) { - createNodeFailure(path); - return; - } - if (rc == KeeperException.Code.NODEEXISTS.intValue()) { - // What if there is a delete pending against this pre-existing - // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE - // state. Only operations that will be carried out on this node by - // this manager are get-znode-data, task-finisher and delete-znode. - // And all code pieces correctly handle the case of suddenly - // disappearing task-znode. - LOG.debug("found pre-existing znode " + path); - SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet(); - } else { - Long retry_count = (Long)ctx; - LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + - path + " remaining retries=" + retry_count); - if (retry_count == 0) { - SplitLogCounters.tot_mgr_node_create_err.incrementAndGet(); - createNodeFailure(path); - } else { - SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); - createNode(path, retry_count - 1); - } - return; - } - } - createNodeSuccess(path); - } + public enum ResubmitDirective { + CHECK(), FORCE(); } - /** - * Asynchronous handler for zk get-data-set-watch on node results. - * Retries on failures. - */ - class GetDataAsyncCallback implements AsyncCallback.DataCallback { - private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, - Stat stat) { - SplitLogCounters.tot_mgr_get_data_result.incrementAndGet(); - if (rc != 0) { - if (needAbandonRetries(rc, "GetData from znode " + path)) { - return; - } - if (rc == KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); - LOG.warn("task znode " + path + " vanished or not created yet."); - // ignore since we should not end up in a case where there is in-memory task, - // but no znode. The only case is between the time task is created in-memory - // and the znode is created. See HBASE-11217. - return; - } - Long retry_count = (Long) ctx; - - if (retry_count < 0) { - LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + - path + ". Ignoring error. No error handling. No retrying."); - return; - } - LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + - path + " remaining retries=" + retry_count); - if (retry_count == 0) { - SplitLogCounters.tot_mgr_get_data_err.incrementAndGet(); - getDataSetWatchFailure(path); - } else { - SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet(); - getDataSetWatch(path, retry_count - 1); - } - return; - } - try { - getDataSetWatchSuccess(path, data, stat.getVersion()); - } catch (DeserializationException e) { - LOG.warn("Deserialization problem", e); - } - return; - } - } - - /** - * Asynchronous handler for zk delete node results. - * Retries on failures. - */ - class DeleteAsyncCallback implements AsyncCallback.VoidCallback { - private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx) { - SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); - if (rc != 0) { - if (needAbandonRetries(rc, "Delete znode " + path)) { - failedDeletions.add(path); - return; - } - if (rc != KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet(); - Long retry_count = (Long) ctx; - LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + - path + " remaining retries=" + retry_count); - if (retry_count == 0) { - LOG.warn("delete failed " + path); - failedDeletions.add(path); - deleteNodeFailure(path); - } else { - deleteNode(path, retry_count - 1); - } - return; - } else { - LOG.info(path + - " does not exist. Either was created but deleted behind our" + - " back by another pending delete OR was deleted" + - " in earlier retry rounds. zkretries = " + (Long) ctx); - } - } else { - LOG.debug("deleted " + path); - } - deleteNodeSuccess(path); - } - } - - /** - * Asynchronous handler for zk create RESCAN-node results. - * Retries on failures. - *

- * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal - * for all the {@link SplitLogWorker}s to rescan for new tasks. - */ - class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { - private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (rc != 0) { - if (needAbandonRetries(rc, "CreateRescan znode " + path)) { - return; - } - Long retry_count = (Long)ctx; - LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path + - " remaining retries=" + retry_count); - if (retry_count == 0) { - createRescanFailure(); - } else { - createRescanNode(retry_count - 1); - } - return; - } - // path is the original arg, name is the actual name that was created - createRescanSuccess(name); - } - } - - /** - * {@link SplitLogManager} can use objects implementing this interface to - * finish off a partially done task by {@link SplitLogWorker}. This provides - * a serialization point at the end of the task processing. Must be - * restartable and idempotent. - */ - public interface TaskFinisher { - /** - * status that can be returned finish() - */ - enum Status { - /** - * task completed successfully - */ - DONE(), - /** - * task completed with error - */ - ERR(); - } - /** - * finish the partially done task. workername provides clue to where the - * partial results of the partially done tasks are present. taskname is the - * name of the task that was put up in zookeeper. - *

- * @param workerName - * @param taskname - * @return DONE if task completed successfully, ERR otherwise - */ - Status finish(ServerName workerName, String taskname); - } - - enum ResubmitDirective { - CHECK(), - FORCE(); - } - - enum TerminationStatus { - IN_PROGRESS("in_progress"), - SUCCESS("success"), - FAILURE("failure"), - DELETED("deleted"); + public enum TerminationStatus { + IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted"); String statusMsg; + TerminationStatus(String msg) { statusMsg = msg; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index deb5ed12ded..b8746a05af4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -89,7 +90,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.RegionState.State; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -1523,8 +1524,8 @@ public class HRegionServer extends HasThread implements this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } - this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, - conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS)); + this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( + "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); @@ -1577,7 +1578,7 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); - this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); + this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this); splitLogWorker.start(); } @@ -2867,7 +2868,7 @@ public class HRegionServer extends HasThread implements minSeqIdForLogReplay = storeSeqIdForReplay; } } - + try { long lastRecordedFlushedSequenceId = -1; String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, @@ -2880,7 +2881,7 @@ public class HRegionServer extends HasThread implements throw new InterruptedIOException(); } if (data != null) { - lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); + lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data); } if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) { ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay)); @@ -2893,11 +2894,11 @@ public class HRegionServer extends HasThread implements LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " + previousRSName); } else { - LOG.warn("Can't find failed region server for recovering region " + + LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName()); } } catch (NoNodeException ignore) { - LOG.debug("Region " + region.getEncodedName() + + LOG.debug("Region " + region.getEncodedName() + " must have completed recovery because its recovery znode has been removed", ignore); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0f89a8ba2f4..023040de090 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -159,6 +158,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; @@ -1294,7 +1294,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (previous == null) { // check if the region to be opened is marked in recovering state in ZK - if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), + if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), region.getEncodedName())) { // check if current region open is for distributedLogReplay. This check is to support // rolling restart/upgrade where we want to Master/RS see same configuration @@ -1306,7 +1306,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // could happen when turn distributedLogReplay off from on. List tmpRegions = new ArrayList(); tmpRegions.add(region.getEncodedName()); - SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions); + ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), + tmpRegions); } } // If there is no action in progress, we can submit a specific handler. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 6ade09907b6..3c3d2a28621 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -22,111 +22,69 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang.math.RandomUtils; -import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.SplitLogCounters; -import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.master.SplitLogManager; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; + +import com.google.common.annotations.VisibleForTesting; /** - * This worker is spawned in every regionserver (should we also spawn one in - * the master?). The Worker waits for log splitting tasks to be put up by the - * {@link SplitLogManager} running in the master and races with other workers - * in other serves to acquire those tasks. The coordination is done via - * zookeeper. All the action takes place at /hbase/splitlog znode. + * This worker is spawned in every regionserver, including master. The Worker waits for log + * splitting tasks to be put up by the {@link SplitLogManager} running in the master and races with + * other workers in other serves to acquire those tasks. The coordination is done via coordination + * engine. *

- * If a worker has successfully moved the task from state UNASSIGNED to - * OWNED then it owns the task. It keeps heart beating the manager by - * periodically moving the task from UNASSIGNED to OWNED state. On success it - * moves the task to TASK_DONE. On unrecoverable error it moves task state to - * ERR. If it cannot continue but wants the master to retry the task then it - * moves the task state to RESIGNED. + * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task. + * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED + * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to + * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to + * RESIGNED. *

- * The manager can take a task away from a worker by moving the task from - * OWNED to UNASSIGNED. In the absence of a global lock there is a - * unavoidable race here - a worker might have just finished its task when it - * is stripped of its ownership. Here we rely on the idempotency of the log + * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In + * the absence of a global lock there is a unavoidable race here - a worker might have just finished + * its task when it is stripped of its ownership. Here we rely on the idempotency of the log * splitting task for correctness */ @InterfaceAudience.Private -public class SplitLogWorker extends ZooKeeperListener implements Runnable { - public static final int DEFAULT_MAX_SPLITTERS = 2; +public class SplitLogWorker implements Runnable { private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); - private static final int checkInterval = 5000; // 5 seconds - private static final int FAILED_TO_OWN_TASK = -1; Thread worker; - private final ServerName serverName; - private final TaskExecutor splitTaskExecutor; // thread pool which executes recovery work - private final ExecutorService executorService; - - private final Object taskReadyLock = new Object(); - volatile int taskReadySeq = 0; - private volatile String currentTask = null; - private int currentVersion; - private volatile boolean exitWorker; - private final Object grabTaskLock = new Object(); - private boolean workerInGrabTask = false; - private final int report_period; - private RegionServerServices server = null; - private Configuration conf = null; - protected final AtomicInteger tasksInProgress = new AtomicInteger(0); - private int maxConcurrentTasks = 0; - - public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, + private SplitLogWorkerCoordination coordination; + private Configuration conf; + private RegionServerServices server; + public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { - super(watcher); this.server = server; - this.serverName = server.getServerName(); - this.splitTaskExecutor = splitTaskExecutor; - report_period = conf.getInt("hbase.splitlog.report.period", - conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); this.conf = conf; - this.executorService = this.server.getExecutorService(); - this.maxConcurrentTasks = - conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + this.coordination = + ((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager()) + .getSplitLogWorkerCoordination(); + this.server = server; + coordination.init(server, conf, splitTaskExecutor, this); } - public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, + public SplitLogWorker(final Server hserver, final Configuration conf, final RegionServerServices server, final LastSequenceId sequenceIdChecker) { - this(watcher, conf, server, new TaskExecutor() { + this(server, conf, server, new TaskExecutor() { @Override public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) { Path rootdir; @@ -143,7 +101,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { // encountered a bad non-retry-able persistent error. try { if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), - fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) { + fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { @@ -151,8 +109,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { return Status.RESIGNED; } catch (IOException e) { Throwable cause = e.getCause(); - if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException - || cause instanceof ConnectException + if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException + || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) { LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " + "resigning", e); @@ -160,9 +118,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } else if (cause instanceof InterruptedException) { LOG.warn("log splitting of " + filename + " interrupted, resigning", e); return Status.RESIGNED; - } else if(cause instanceof KeeperException) { - LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e); - return Status.RESIGNED; } LOG.warn("log splitting of " + filename + " failed, returning error", e); return Status.ERR; @@ -175,32 +130,22 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { @Override public void run() { try { - LOG.info("SplitLogWorker " + this.serverName + " starting"); - this.watcher.registerListener(this); + LOG.info("SplitLogWorker " + server.getServerName() + " starting"); + coordination.registerListener(); // pre-initialize a new connection for splitlogworker configuration HConnectionManager.getConnection(conf); - // wait for master to create the splitLogZnode - int res = -1; - while (res == -1 && !exitWorker) { - try { - res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); - } catch (KeeperException e) { - // ignore - LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e); - } - if (res == -1) { - LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create"); - Thread.sleep(1000); - } + // wait for Coordination Engine is ready + boolean res = false; + while (!res && !coordination.isStop()) { + res = coordination.isReady(); } - - if (!exitWorker) { - taskLoop(); + if (!coordination.isStop()) { + coordination.taskLoop(); } } catch (Throwable t) { if (ExceptionUtil.isInterrupt(t)) { - LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" : + LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" : " (ERROR: exitWorker is not set, exiting anyway)")); } else { // only a logical error can cause here. Printing it out @@ -208,394 +153,24 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { LOG.error("unexpected error ", t); } } finally { - LOG.info("SplitLogWorker " + this.serverName + " exiting"); + coordination.removeListener(); + LOG.info("SplitLogWorker " + server.getServerName() + " exiting"); } } - - /** - * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task - * one at a time. This policy puts an upper-limit on the number of - * simultaneous log splitting that could be happening in a cluster. - *

- * Synchronization using {@link #taskReadyLock} ensures that it will - * try to grab every task that has been put up - */ - private void taskLoop() throws InterruptedException { - while (!exitWorker) { - int seq_start = taskReadySeq; - List paths = getTaskList(); - if (paths == null) { - LOG.warn("Could not get tasks, did someone remove " + - this.watcher.splitLogZNode + " ... worker thread exiting."); - return; - } - // pick meta wal firstly - int offset = (int) (Math.random() * paths.size()); - for(int i = 0; i < paths.size(); i ++){ - if(HLogUtil.isMetaFile(paths.get(i))) { - offset = i; - break; - } - } - int numTasks = paths.size(); - for (int i = 0; i < numTasks; i++) { - int idx = (i + offset) % paths.size(); - // don't call ZKSplitLog.getNodeName() because that will lead to - // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); - } else { - LOG.debug("Current region server " + this.serverName + " has " - + this.tasksInProgress.get() + " tasks in progress and can't take more."); - break; - } - if (exitWorker) { - return; - } - } - SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); - synchronized (taskReadyLock) { - while (seq_start == taskReadySeq) { - taskReadyLock.wait(checkInterval); - if (this.server != null) { - // check to see if we have stale recovering regions in our internal memory state - Map recoveringRegions = this.server.getRecoveringRegions(); - if (!recoveringRegions.isEmpty()) { - // Make a local copy to prevent ConcurrentModificationException when other threads - // modify recoveringRegions - List tmpCopy = new ArrayList(recoveringRegions.keySet()); - for (String region : tmpCopy) { - String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region); - try { - if (ZKUtil.checkExists(this.watcher, nodePath) == -1) { - HRegion r = recoveringRegions.remove(region); - if (r != null) { - r.setRecovering(false); - } - LOG.debug("Mark recovering region:" + region + " up."); - } else { - // current check is a defensive(or redundant) mechanism to prevent us from - // having stale recovering regions in our internal RS memory state while - // zookeeper(source of truth) says differently. We stop at the first good one - // because we should not have a single instance such as this in normal case so - // check the first one is good enough. - break; - } - } catch (KeeperException e) { - // ignore zookeeper error - LOG.debug("Got a zookeeper when trying to open a recovering region", e); - break; - } - } - } - } - } - } - } - } - - /** - * try to grab a 'lock' on the task zk node to own and execute the task. - *

- * @param path zk node for the task - */ - private void grabTask(String path) { - Stat stat = new Stat(); - byte[] data; - synchronized (grabTaskLock) { - currentTask = path; - workerInGrabTask = true; - if (Thread.interrupted()) { - return; - } - } - try { - try { - if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) { - SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); - return; - } - } catch (KeeperException e) { - LOG.warn("Failed to get data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; - } - SplitLogTask slt; - try { - slt = SplitLogTask.parseFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed parse data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; - } - if (!slt.isUnassigned()) { - SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); - return; - } - - currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(), - stat.getVersion()); - if (currentVersion < 0) { - SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); - return; - } - - if (ZKSplitLog.isRescanNode(watcher, currentTask)) { - HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()), - SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion); - return; - } - - LOG.info("worker " + serverName + " acquired task " + path); - SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); - getDataSetWatchAsync(); - - submitTask(path, slt.getMode(), currentVersion, this.report_period); - - // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks - try { - int sleepTime = RandomUtils.nextInt(500) + 500; - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.warn("Interrupted while yielding for other region servers", e); - Thread.currentThread().interrupt(); - } - } finally { - synchronized (grabTaskLock) { - workerInGrabTask = false; - // clear the interrupt from stopTask() otherwise the next task will - // suffer - Thread.interrupted(); - } - } - } - - - /** - * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. - *

- * This method is also used to periodically heartbeat the task progress by transitioning the node - * from OWNED to OWNED. - *

- * @param isFirstTime - * @param zkw - * @param server - * @param task - * @param taskZKVersion - * @return non-negative integer value when task can be owned by current region server otherwise -1 - */ - protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw, - ServerName server, String task, RecoveryMode mode, int taskZKVersion) { - int latestZKVersion = FAILED_TO_OWN_TASK; - try { - SplitLogTask slt = new SplitLogTask.Owned(server, mode); - Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); - if (stat == null) { - LOG.warn("zk.setData() returned null for path " + task); - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); - return FAILED_TO_OWN_TASK; - } - latestZKVersion = stat.getVersion(); - SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); - return latestZKVersion; - } catch (KeeperException e) { - if (!isFirstTime) { - if (e.code().equals(KeeperException.Code.NONODE)) { - LOG.warn("NONODE failed to assert ownership for " + task, e); - } else if (e.code().equals(KeeperException.Code.BADVERSION)) { - LOG.warn("BADVERSION failed to assert ownership for " + task, e); - } else { - LOG.warn("failed to assert ownership for " + task, e); - } - } - } catch (InterruptedException e1) { - LOG.warn("Interrupted while trying to assert ownership of " + - task + " " + StringUtils.stringifyException(e1)); - Thread.currentThread().interrupt(); - } - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); - return FAILED_TO_OWN_TASK; - } - - /** - * This function calculates how many splitters it could create based on expected average tasks per - * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks current total number of available tasks - */ - private int calculateAvailableSplitters(int numTasks) { - // at lease one RS(itself) available - int availableRSs = 1; - try { - List regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode); - availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); - } catch (KeeperException e) { - // do nothing - LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); - } - - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one - // calculate how many more splitters we could spawn - return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get(); - } - - /** - * Submit a log split task to executor service - * @param curTask - * @param curTaskZKVersion - */ - void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion, - final int reportPeriod) { - final MutableInt zkVersion = new MutableInt(curTaskZKVersion); - - CancelableProgressable reporter = new CancelableProgressable() { - private long last_report_at = 0; - - @Override - public boolean progress() { - long t = EnvironmentEdgeManager.currentTimeMillis(); - if ((t - last_report_at) > reportPeriod) { - last_report_at = t; - int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode, - zkVersion.intValue()); - if (latestZKVersion < 0) { - LOG.warn("Failed to heartbeat the task" + curTask); - return false; - } - zkVersion.setValue(latestZKVersion); - } - return true; - } - }; - - HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, - this.tasksInProgress, this.splitTaskExecutor, mode); - this.executorService.submit(hsh); - } - - void getDataSetWatchAsync() { - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(currentTask, this.watcher, - new GetDataAsyncCallback(), null); - SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet(); - } - - void getDataSetWatchSuccess(String path, byte[] data) { - SplitLogTask slt; - try { - slt = SplitLogTask.parseFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed parse", e); - return; - } - synchronized (grabTaskLock) { - if (workerInGrabTask) { - // currentTask can change but that's ok - String taskpath = currentTask; - if (taskpath != null && taskpath.equals(path)) { - // have to compare data. cannot compare version because then there - // will be race with attemptToOwnTask() - // cannot just check whether the node has been transitioned to - // UNASSIGNED because by the time this worker sets the data watch - // the node might have made two transitions - from owned by this - // worker to unassigned to owned by another worker - if (! slt.isOwned(this.serverName) && - ! slt.isDone(this.serverName) && - ! slt.isErr(this.serverName) && - ! slt.isResigned(this.serverName)) { - LOG.info("task " + taskpath + " preempted from " + - serverName + ", current task state and owner=" + slt.toString()); - stopTask(); - } - } - } - } - } - - void getDataSetWatchFailure(String path) { - synchronized (grabTaskLock) { - if (workerInGrabTask) { - // currentTask can change but that's ok - String taskpath = currentTask; - if (taskpath != null && taskpath.equals(path)) { - LOG.info("retrying data watch on " + path); - SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet(); - getDataSetWatchAsync(); - } else { - // no point setting a watch on the task which this worker is not - // working upon anymore - } - } - } - } - - @Override - public void nodeDataChanged(String path) { - // there will be a self generated dataChanged event every time attemptToOwnTask() - // heartbeats the task znode by upping its version - synchronized (grabTaskLock) { - if (workerInGrabTask) { - // currentTask can change - String taskpath = currentTask; - if (taskpath!= null && taskpath.equals(path)) { - getDataSetWatchAsync(); - } - } - } - } - - - private List getTaskList() throws InterruptedException { - List childrenPaths = null; - long sleepTime = 1000; - // It will be in loop till it gets the list of children or - // it will come out if worker thread exited. - while (!exitWorker) { - try { - childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, - this.watcher.splitLogZNode); - if (childrenPaths != null) { - return childrenPaths; - } - } catch (KeeperException e) { - LOG.warn("Could not get children of znode " - + this.watcher.splitLogZNode, e); - } - LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode - + " after sleep for " + sleepTime + "ms!"); - Thread.sleep(sleepTime); - } - return childrenPaths; - } - - @Override - public void nodeChildrenChanged(String path) { - if(path.equals(watcher.splitLogZNode)) { - LOG.debug("tasks arrived or departed"); - synchronized (taskReadyLock) { - taskReadySeq++; - taskReadyLock.notify(); - } - } - } - /** * If the worker is doing a task i.e. splitting a log file then stop the task. * It doesn't exit the worker thread. */ - void stopTask() { + public void stopTask() { LOG.info("Sending interrupt to stop the worker thread"); worker.interrupt(); // TODO interrupt often gets swallowed, do what else? } - /** * start the SplitLogWorker thread */ public void start() { - worker = new Thread(null, this, "SplitLogWorker-" + serverName); - exitWorker = false; + worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName()); worker.start(); } @@ -603,29 +178,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { * stop the SplitLogWorker thread */ public void stop() { - exitWorker = true; + coordination.stopProcessingTasks(); stopTask(); } - /** - * Asynchronous handler for zk get-data-set-watch on node results. - */ - class GetDataAsyncCallback implements AsyncCallback.DataCallback { - private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - SplitLogCounters.tot_wkr_get_data_result.incrementAndGet(); - if (rc != 0) { - LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); - getDataSetWatchFailure(path); - return; - } - data = watcher.getRecoverableZooKeeper().removeMetaData(data); - getDataSetWatchSuccess(path, data); - } - } - /** * Objects implementing this interface actually do the task that has been * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight @@ -642,4 +198,13 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } Status exec(String name, RecoveryMode mode, CancelableProgressable p); } + + /** + * Returns the number of tasks processed by coordination. + * This method is used by tests only + */ + @VisibleForTesting + public int getTaskReadySeq() { + return coordination.getTaskReadySeq(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java index 9bfdeed2cdd..06d21d90d47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -30,17 +28,13 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; /** * Handles log splitting a wal @@ -49,28 +43,24 @@ import org.apache.zookeeper.KeeperException; public class HLogSplitterHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class); private final ServerName serverName; - private final String curTask; - private final String wal; - private final ZooKeeperWatcher zkw; private final CancelableProgressable reporter; private final AtomicInteger inProgressTasks; - private final MutableInt curTaskZKVersion; private final TaskExecutor splitTaskExecutor; private final RecoveryMode mode; + private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails; + private final SplitLogWorkerCoordination coordination; - public HLogSplitterHandler(final Server server, String curTask, - final MutableInt curTaskZKVersion, - CancelableProgressable reporter, + + public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination, + SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter, AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) { super(server, EventType.RS_LOG_REPLAY); - this.curTask = curTask; - this.wal = ZKSplitLog.getFileName(curTask); + this.splitTaskDetails = splitDetails; + this.coordination = coordination; this.reporter = reporter; this.inProgressTasks = inProgressTasks; this.inProgressTasks.incrementAndGet(); this.serverName = server.getServerName(); - this.zkw = server.getZooKeeper(); - this.curTaskZKVersion = curTaskZKVersion; this.splitTaskExecutor = splitTaskExecutor; this.mode = mode; } @@ -79,20 +69,20 @@ public class HLogSplitterHandler extends EventHandler { public void process() throws IOException { long startTime = System.currentTimeMillis(); try { - Status status = this.splitTaskExecutor.exec(wal, mode, reporter); + Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter); switch (status) { case DONE: - endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode), - SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); + coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode), + SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); - LOG.warn("task execution prempted " + wal); + LOG.warn("task execution prempted " + splitTaskDetails.getWALFile()); break; case ERR: if (server != null && !server.isStopped()) { - endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode), - SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); + coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode), + SplitLogCounters.tot_wkr_task_err, splitTaskDetails); break; } // if the RS is exiting then there is probably a tons of stuff @@ -100,45 +90,17 @@ public class HLogSplitterHandler extends EventHandler { //$FALL-THROUGH$ case RESIGNED: if (server != null && server.isStopped()) { - LOG.info("task execution interrupted because worker is exiting " + curTask); + LOG.info("task execution interrupted because worker is exiting " + + splitTaskDetails.toString()); } - endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode), - SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); + coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode), + SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); break; } } finally { - LOG.info("worker " + serverName + " done with task " + curTask + " in " + LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in " + (System.currentTimeMillis() - startTime) + "ms"); this.inProgressTasks.decrementAndGet(); } } - - /** - * endTask() can fail and the only way to recover out of it is for the - * {@link SplitLogManager} to timeout the task node. - * @param slt - * @param ctr - */ - public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task, - int taskZKVersion) { - try { - if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) { - LOG.info("successfully transitioned task " + task + " to final state " + slt); - ctr.incrementAndGet(); - return; - } - LOG.warn("failed to transistion task " + task + " to end state " + slt - + " because of version mismatch "); - } catch (KeeperException.BadVersionException bve) { - LOG.warn("transisition task " + task + " to " + slt - + " failed because of version mismatch", bve); - } catch (KeeperException.NoNodeException e) { - LOG.fatal( - "logic error - end task " + task + " " + slt - + " failed because task doesn't exist", e); - } catch (KeeperException e) { - LOG.warn("failed to end task, " + task + " " + slt, e); - } - SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 2df9f509837..67b936fe3dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -77,9 +77,10 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -109,7 +110,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; @@ -139,8 +139,7 @@ public class HLogSplitter { private Set disablingOrDisabledTables = new HashSet(); - private ZooKeeperWatcher watcher; - private CoordinatedStateManager csm; + private BaseCoordinatedStateManager csm; private MonitoredTask status; @@ -166,7 +165,7 @@ public class HLogSplitter { private final int minBatchSize; HLogSplitter(Configuration conf, Path rootDir, - FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, + FileSystem fs, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf @@ -175,8 +174,7 @@ public class HLogSplitter { this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; - this.watcher = zkw; - this.csm = csm; + this.csm = (BaseCoordinatedStateManager)csm; this.controller = new PipelineController(); entryBuffers = new EntryBuffers(controller, @@ -189,7 +187,7 @@ public class HLogSplitter { this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - if (zkw != null && csm != null && this.distributedLogReplay) { + if (csm != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads); } else { if (this.distributedLogReplay) { @@ -213,15 +211,14 @@ public class HLogSplitter { * @param conf * @param reporter * @param idChecker - * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we - * dump out recoved.edits files for regions to replay on. + * @param cp coordination state manager * @return false if it is interrupted by the progress-able. * @throws IOException */ public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, - ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode); + CoordinatedStateManager cp, RecoveryMode mode) throws IOException { + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, cp, mode); return s.splitLogFile(logfile, reporter); } @@ -235,8 +232,8 @@ public class HLogSplitter { List splits = new ArrayList(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, - RecoveryMode.LOG_SPLITTING); + HLogSplitter s = + new HLogSplitter(conf, rootDir, fs, null, null, RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -289,7 +286,7 @@ public class HLogSplitter { LOG.warn("Nothing to split in log file " + logPath); return true; } - if(watcher != null && csm != null) { + if(csm != null) { try { TableStateManager tsm = csm.getTableStateManager(); disablingOrDisabledTables = tsm.getTablesInStates( @@ -314,7 +311,8 @@ public class HLogSplitter { if (lastFlushedSequenceId == null) { if (this.distributedLogReplay) { RegionStoreSequenceIds ids = - SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key); + csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, + key); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); } @@ -352,7 +350,8 @@ public class HLogSplitter { throw iie; } catch (CorruptedLogFileException e) { LOG.warn("Could not parse, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); + csm.getSplitLogWorkerCoordination().markCorrupted(rootDir, + logfile.getPath().getName(), fs); isCorrupted = true; } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; @@ -1417,8 +1416,9 @@ public class HLogSplitter { public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { super(controller, entryBuffers, numWriters); - this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", - SplitLogManager.DEFAULT_TIMEOUT); + this.waitRegionOnlineTimeOut = + conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, + ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriters); this.logRecoveredEditsOutputSink.setReporter(reporter); @@ -1640,8 +1640,8 @@ public class HLogSplitter { // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = - SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc - .getRegionInfo().getEncodedName()); + csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, + loc.getRegionInfo().getEncodedName()); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); Map storeIds = new TreeMap(Bytes.BYTES_COMPARATOR); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 943b94465cd..ac6042fdd34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,8 +30,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.zookeeper.KeeperException; /** * Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker} @@ -120,4 +125,100 @@ public class ZKSplitLog { return isCorrupt; } + /* + * Following methods come from SplitLogManager + */ + + /** + * check if /hbase/recovering-regions/ exists. Returns true if exists + * and set watcher as well. + * @param zkw + * @param regionEncodedName region encode name + * @return true when /hbase/recovering-regions/ exists + * @throws KeeperException + */ + public static boolean + isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName) + throws KeeperException { + boolean result = false; + String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName); + + byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath); + if (node != null) { + result = true; + } + return result; + } + + /** + * @param bytes - Content of a failed region server or recovering region znode. + * @return long - The last flushed sequence Id for the region server + */ + public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) { + long lastRecordedFlushedSequenceId = -1l; + try { + lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes); + } catch (DeserializationException e) { + lastRecordedFlushedSequenceId = -1l; + LOG.warn("Can't parse last flushed sequence Id", e); + } + return lastRecordedFlushedSequenceId; + } + + public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List regions) { + try { + if (regions == null) { + // remove all children under /home/recovering-regions + LOG.debug("Garbage collecting all recovering region znodes"); + ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode); + } else { + for (String curRegion : regions) { + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion); + ZKUtil.deleteNodeRecursively(watcher, nodePath); + } + } + } catch (KeeperException e) { + LOG.warn("Cannot remove recovering regions from ZooKeeper", e); + } + } + + /** + * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK + * @param zkw + * @param serverName + * @param encodedRegionName + * @return the last flushed sequence ids recorded in ZK of the region for serverName + * @throws IOException + */ + + public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw, + String serverName, String encodedRegionName) throws IOException { + // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits, + // last flushed sequence Id changes when newly assigned RS flushes writes to the region. + // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed + // sequence Id name space (sequence Id only valid for a particular RS instance), changes + // when different newly assigned RS flushes the region. + // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of + // last flushed sequence Id for each failed RS instance. + RegionStoreSequenceIds result = null; + String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName); + nodePath = ZKUtil.joinZNode(nodePath, serverName); + try { + byte[] data; + try { + data = ZKUtil.getData(zkw, nodePath); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + if (data != null) { + result = ZKUtil.parseRegionStoreSequenceIds(data); + } + } catch (KeeperException e) { + throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server=" + + serverName + "; region=" + encodedRegionName, e); + } catch (DeserializationException e) { + LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e); + } + return result; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 289b6302487..c51428ebdef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -77,6 +77,9 @@ import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; @@ -651,8 +654,8 @@ public class TestDistributedLogSplitting { break; } - slm.markRegionsRecoveringInZK(firstFailedServer, regionSet); - slm.markRegionsRecoveringInZK(secondFailedServer, regionSet); + slm.markRegionsRecovering(firstFailedServer, regionSet); + slm.markRegionsRecovering(secondFailedServer, regionSet); List recoveringRegions = ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName())); @@ -880,7 +883,7 @@ public class TestDistributedLogSplitting { break; } - slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet); + slm.markRegionsRecovering(hrs.getServerName(), regionSet); // move region in order for the region opened in recovering state final HRegionInfo hri = region; final HRegionServer tmpRS = dstRS; @@ -1064,7 +1067,10 @@ public class TestDistributedLogSplitting { out.write(0); out.write(Bytes.toBytes("corrupted bytes")); out.close(); - slm.ignoreZKDeleteForTesting = true; + ZKSplitLogManagerCoordination coordination = + (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master + .getCoordinatedStateManager()).getSplitLogManagerCoordination(); + coordination.setIgnoreDeleteForTesting(true); executor = Executors.newSingleThreadExecutor(); Runnable runnable = new Runnable() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index ceb6adad36a..125cacda13e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -19,11 +19,8 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; @@ -48,22 +45,26 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -84,13 +85,14 @@ public class TestSplitLogManager { private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1"); private final ServerManager sm = Mockito.mock(ServerManager.class); - private final MasterServices master = Mockito.mock(MasterServices.class); + private final MasterServices master = Mockito.mock(MasterServices.class); static { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); } private ZooKeeperWatcher zkw; + private DummyServer ds; private static boolean stopped = false; private SplitLogManager slm; private Configuration conf; @@ -99,6 +101,68 @@ public class TestSplitLogManager { private static HBaseTestingUtility TEST_UTIL; + class DummyServer implements Server { + private ZooKeeperWatcher zkw; + private Configuration conf; + private CoordinatedStateManager cm; + + public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { + this.zkw = zkw; + this.conf = conf; + cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); + cm.initialize(this); + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public ServerName getServerName() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return cm; + } + + @Override + public HConnection getShortCircuitConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + } + static Stoppable stopper = new Stoppable() { @Override public void stop(String why) { @@ -109,7 +173,6 @@ public class TestSplitLogManager { public boolean isStopped() { return stopped; } - }; @Before @@ -118,7 +181,10 @@ public class TestSplitLogManager { TEST_UTIL.startMiniZKCluster(); conf = TEST_UTIL.getConfiguration(); // Use a different ZK wrapper instance for each tests. - zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); + zkw = + new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); + ds = new DummyServer(zkw, conf); + ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); @@ -131,18 +197,20 @@ public class TestSplitLogManager { resetCounters(); // By default, we let the test manage the error as before, so the server - // does not appear as dead from the master point of view, only from the split log pov. + // does not appear as dead from the master point of view, only from the split log pov. Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); Mockito.when(master.getServerManager()).thenReturn(sm); to = 6000; - conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); to = to + 4 * 100; - - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? - RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); + + this.mode = + (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY + : RecoveryMode.LOG_SPLITTING); } @After @@ -171,17 +239,17 @@ public class TestSplitLogManager { throws Exception { TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return (e.eval() != oldval); - } + @Override + public boolean evaluate() throws Exception { + return (e.eval() != oldval); + } }); assertEquals(newval, e.eval()); } - private String submitTaskAndWait(TaskBatch batch, String name) - throws KeeperException, InterruptedException { + private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, + InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); zkw.registerListener(listener); @@ -206,7 +274,7 @@ public class TestSplitLogManager { public void testTaskCreation() throws Exception { LOG.info("TestTaskCreation - test the creation of a task in zk"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -226,7 +294,7 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -252,7 +320,7 @@ public class TestSplitLogManager { CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -273,9 +341,8 @@ public class TestSplitLogManager { @Test public void testMultipleResubmits() throws Exception { LOG.info("TestMultipleResbmits - no indefinite resubmissions"); - conf.setInt("hbase.splitlog.max.resubmit", 2); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -307,7 +374,7 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -336,7 +403,7 @@ public class TestSplitLogManager { public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); @@ -356,7 +423,7 @@ public class TestSplitLogManager { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -371,14 +438,14 @@ public class TestSplitLogManager { } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); - conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT); + conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); } @Test public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -412,7 +479,7 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); // submit another task which will stay in unassigned mode @@ -441,7 +508,7 @@ public class TestSplitLogManager { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -466,7 +533,7 @@ public class TestSplitLogManager { @Test public void testWorkerCrash() throws Exception { - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -491,7 +558,7 @@ public class TestSplitLogManager { @Test public void testEmptyLogDir() throws Exception { LOG.info("testEmptyLogDir"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString()); @@ -514,15 +581,15 @@ public class TestSplitLogManager { HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L)); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); - slm.removeStaleRecoveringRegionsFromZK(null); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm.removeStaleRecoveringRegions(null); List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty()); } - + @Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); @@ -535,12 +602,12 @@ public class TestSplitLogManager { ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER); - assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING); - + slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER); + assertTrue(slm.isLogSplitting()); + zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); slm.setRecoveryMode(false); - assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY); + assertTrue(slm.isLogReplaying()); } - + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index dcb1e88355c..5caa544cbba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -19,8 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThat; +import static org.hamcrest.CoreMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,19 +31,23 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -65,11 +70,74 @@ public class TestSplitLogWorker { } private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private DummyServer ds; private ZooKeeperWatcher zkw; private SplitLogWorker slw; private ExecutorService executorService; private RecoveryMode mode; + class DummyServer implements Server { + private ZooKeeperWatcher zkw; + private Configuration conf; + private CoordinatedStateManager cm; + + public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { + this.zkw = zkw; + this.conf = conf; + cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); + cm.initialize(this); + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public ServerName getServerName() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return cm; + } + + @Override + public HConnection getShortCircuitConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + } + private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) throws Exception { assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval, @@ -106,19 +174,22 @@ public class TestSplitLogWorker { Configuration conf = TEST_UTIL.getConfiguration(); zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); + ds = new DummyServer(zkw, conf); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); - assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); + assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1))); LOG.debug(zkw.baseZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); - assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); + assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1))); + LOG.debug(zkw.splitLogZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.rsZNode); - assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1); + assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1))); + SplitLogCounters.resetCounters(); executorService = new ExecutorService("TestSplitLogWorker"); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @@ -157,12 +228,12 @@ public class TestSplitLogWorker { final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); @@ -170,7 +241,7 @@ public class TestSplitLogWorker { SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); } finally { - stopSplitLogWorker(slw); + stopSplitLogWorker(slw); } } @@ -193,14 +264,14 @@ public class TestSplitLogWorker { final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), - new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), + new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); RegionServerServices mockedRS1 = getRegionServer(SVR1); RegionServerServices mockedRS2 = getRegionServer(SVR2); SplitLogWorker slw1 = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); SplitLogWorker slw2 = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); slw1.start(); slw2.start(); try { @@ -227,7 +298,7 @@ public class TestSplitLogWorker { final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); RegionServerServices mockedRS = getRegionServer(SRV); SplitLogWorker slw = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start @@ -236,11 +307,11 @@ public class TestSplitLogWorker { // this time create a task node after starting the splitLogWorker zkw.getRecoverableZooKeeper().create(PATH, - new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), + new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); - assertEquals(1, slw.taskReadySeq); + assertEquals(1, slw.getTaskReadySeq()); byte [] bytes = ZKUtil.getData(zkw, PATH); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SRV)); @@ -260,14 +331,14 @@ public class TestSplitLogWorker { final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); RegionServerServices mockedRS = getRegionServer(SRV); SplitLogWorker slw = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start Thread.sleep(100); waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); - SplitLogTask unassignedManager = + SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER, this.mode); zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -287,7 +358,7 @@ public class TestSplitLogWorker { waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); - assertEquals(2, slw.taskReadySeq); + assertEquals(2, slw.getTaskReadySeq()); byte [] bytes = ZKUtil.getData(zkw, PATH2); slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SRV)); @@ -302,7 +373,7 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); final ServerName SRV = ServerName.valueOf("svr,1,1"); RegionServerServices mockedRS = getRegionServer(SRV); - slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); Thread.yield(); // let the worker start Thread.sleep(100); @@ -358,14 +429,13 @@ public class TestSplitLogWorker { Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); RegionServerServices mockedRS = getRegionServer(RS); - for (int i = 0; i < maxTasks; i++) { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); @@ -408,7 +478,7 @@ public class TestSplitLogWorker { Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); slw.start(); try { int acquiredTasks = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java index a133deabf1b..1c70fb5ba06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java @@ -138,7 +138,7 @@ public class TestHLogReaderOnSecureHLog { RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); Path rootdir = FSUtils.getRootDir(conf); try { - HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); @@ -181,7 +181,7 @@ public class TestHLogReaderOnSecureHLog { RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); Path rootdir = FSUtils.getRootDir(conf); try { - HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index e7997de4884..8faf6093528 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -809,7 +809,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); // Set up a splitter that will throw an IOE on the output side HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null, null, this.mode) { + conf, HBASEDIR, fs, null, null, this.mode) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); @@ -942,7 +942,7 @@ public class TestHLogSplit { try { conf.setInt("hbase.splitlog.report.period", 1000); boolean ret = HLogSplitter.splitLogFile( - HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode); + HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode); assertFalse("Log splitting should failed", ret); assertTrue(count.get() > 0); } catch (IOException e) { @@ -1001,7 +1001,7 @@ public class TestHLogSplit { // Create a splitter that reads and writes the data without touching disk HLogSplitter logSplitter = new HLogSplitter( - localConf, HBASEDIR, fs, null, null, null, this.mode) { + localConf, HBASEDIR, fs, null, null, this.mode) { /* Produce a mock writer that doesn't write anywhere */ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) @@ -1222,7 +1222,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null, null, this.mode) { + conf, HBASEDIR, fs, null, null, this.mode) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 0edad8bfdc7..4132a5c0f6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -885,7 +885,7 @@ public class TestWALReplay { wal.close(); FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null, null, mode); + this.fs, this.conf, null, null, null, mode); FileStatus[] listStatus1 = this.fs.listStatus( new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), "recovered.edits")));