diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
new file mode 100644
index 00000000000..3d9ec884ba2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
@@ -0,0 +1,221 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Coordination for SplitLogManager. It creates and works with tasks for split log operations
+ * Manager prepares task by calling {@link #prepareTask} and submit it by
+ * {@link #submitTask(String)}. After that it periodically check the number of remaining tasks by
+ * {@link #remainingTasksInCoordination()} and waits until it become zero.
+ *
+ * Methods required for task life circle:
+ * {@link #markRegionsRecovering(ServerName, Set)} mark regions for log replaying. Used by
+ * {@link MasterFileSystem}
+ * {@link #removeRecoveringRegions(Set, Boolean)} make regions cleanup that previous were marked as
+ * recovering. Called after all tasks processed
+ * {@link #removeStaleRecoveringRegions(Set)} remove stale recovering. called by
+ * {@link MasterFileSystem} after Active Master is initialized
+ * {@link #getLastRecoveryTime()} required for garbage collector and should indicate when the last
+ * recovery has been made
+ * {@link #checkTaskStillAvailable(String)} Check that task is still there
+ * {@link #checkTasks()} check for unassigned tasks and resubmit them
+ */
+@InterfaceAudience.Private
+public interface SplitLogManagerCoordination {
+
+ /**
+ * Detail class that shares data between coordination and split log manager
+ */
+ public static class SplitLogManagerDetails {
+ final private ConcurrentMap tasks;
+ final private MasterServices master;
+ final private Set failedDeletions;
+ final private ServerName serverName;
+
+ public SplitLogManagerDetails(ConcurrentMap tasks, MasterServices master,
+ Set failedDeletions, ServerName serverName) {
+ this.tasks = tasks;
+ this.master = master;
+ this.failedDeletions = failedDeletions;
+ this.serverName = serverName;
+ }
+
+ /**
+ * @return the master value
+ */
+ public MasterServices getMaster() {
+ return master;
+ }
+
+ /**
+ * @return map of tasks
+ */
+ public ConcurrentMap getTasks() {
+ return tasks;
+ }
+
+ /**
+ * @return a set of failed deletions
+ */
+ public Set getFailedDeletions() {
+ return failedDeletions;
+ }
+
+ /**
+ * @return server name
+ */
+ public ServerName getServerName() {
+ return serverName;
+ }
+ }
+
+ /**
+ * Provide the configuration from the SplitLogManager
+ */
+ void setDetails(SplitLogManagerDetails details);
+
+ /**
+ * Returns the configuration that was provided previously
+ */
+ SplitLogManagerDetails getDetails();
+
+ /**
+ * Prepare the new task
+ * @param taskName name of the task
+ * @return the task id
+ */
+ String prepareTask(String taskName);
+
+ /**
+ * Mark regions in recovering state for distributed log replay
+ * @param serverName server name
+ * @param userRegions set of regions to be marked
+ * @throws IOException in case of failure
+ * @throws InterruptedIOException
+ */
+ void markRegionsRecovering(final ServerName serverName, Set userRegions)
+ throws IOException, InterruptedIOException;
+
+ /**
+ * tells Coordination that it should check for new tasks
+ */
+ void checkTasks();
+
+ /**
+ * It removes recovering regions from Coordination
+ * @param serverNames servers which are just recovered
+ * @param isMetaRecovery whether current recovery is for the meta region on
+ * serverNames
+ */
+ void removeRecoveringRegions(Set serverNames, Boolean isMetaRecovery) throws IOException;
+
+ /**
+ * Return the number of remaining tasks
+ */
+ int remainingTasksInCoordination();
+
+ /**
+ * Check that the task is still there
+ * @param task node to check
+ */
+ void checkTaskStillAvailable(String task);
+
+ /**
+ * Change the recovery mode.
+ * @param b the recovery mode state
+ * @throws InterruptedIOException
+ * @throws IOException in case of failure
+ */
+ void setRecoveryMode(boolean b) throws InterruptedIOException, IOException;
+
+ /**
+ * Removes known stale servers
+ * @param knownServers set of previously failed servers
+ * @throws IOException in case of failure
+ * @throws InterruptedIOException
+ */
+ void removeStaleRecoveringRegions(Set knownServers) throws IOException,
+ InterruptedIOException;
+
+ /**
+ * Resubmit the task in case if found unassigned or failed
+ * @param taskName path related to task
+ * @param task to resubmit
+ * @param force whether it should be forced
+ * @return whether it was successful
+ */
+
+ boolean resubmitTask(String taskName, Task task, ResubmitDirective force);
+
+ /**
+ * @param taskName to be submitted
+ */
+ void submitTask(String taskName);
+
+ /**
+ * @param taskName to be removed
+ */
+ void deleteTask(String taskName);
+
+ /**
+ * @return shows whether the log recovery mode is in replaying state
+ */
+ boolean isReplaying();
+
+ /**
+ * @return shows whether the log recovery mode is in splitting state
+ */
+ boolean isSplitting();
+
+ /**
+ * @return the time of last attempt to recover
+ */
+ long getLastRecoveryTime();
+
+ /**
+ * Temporary function, mostly for UTs. In the regular code isReplaying or isSplitting should be
+ * used.
+ * @return the current log recovery mode.
+ */
+ RecoveryMode getRecoveryMode();
+
+ /**
+ * Support method to init constants such as timeout. Mostly required for UTs.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ void init() throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
new file mode 100644
index 00000000000..5341ef646a0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -0,0 +1,141 @@
+ /**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Coordinated operations for {@link SplitLogWorker} and {@link HLogSplitterHandler} Important
+ * methods for SplitLogWorker:
+ * {@link #isReady()} called from {@link SplitLogWorker#run()} to check whether the coordination is
+ * ready to supply the tasks
+ * {@link #taskLoop()} loop for new tasks until the worker is stopped
+ * {@link #isStop()} a flag indicates whether worker should finish
+ * {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener
+ * for external changes in coordination (if required)
+ * {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that
+ *
+ * Important methods for HLogSplitterHandler:
+ * splitting task has completed.
+ */
+@InterfaceAudience.Private
+public interface SplitLogWorkerCoordination {
+
+/* SplitLogWorker part */
+ public static final int DEFAULT_MAX_SPLITTERS = 2;
+
+ /**
+ * Initialize internal values. This method should be used when corresponding SplitLogWorker
+ * instance is created
+ * @param server instance of RegionServerServices to work with
+ * @param conf is current configuration.
+ * @param splitTaskExecutor split executor from SplitLogWorker
+ * @param worker instance of SplitLogWorker
+ */
+ void init(RegionServerServices server, Configuration conf,
+ TaskExecutor splitTaskExecutor, SplitLogWorker worker);
+
+ /**
+ * called when Coordination should stop processing tasks and exit
+ */
+ void stopProcessingTasks();
+
+ /**
+ * @return the current value of exitWorker
+ */
+ boolean isStop();
+
+ /**
+ * Wait for the new tasks and grab one
+ * @throws InterruptedException if the SplitLogWorker was stopped
+ */
+ void taskLoop() throws InterruptedException;
+
+ /**
+ * marks log file as corrupted
+ * @param rootDir where to find the log
+ * @param name of the log
+ * @param fs file system
+ */
+ void markCorrupted(Path rootDir, String name, FileSystem fs);
+
+ /**
+ * Check whether the log splitter is ready to supply tasks
+ * @return false if there is no tasks
+ * @throws InterruptedException if the SplitLogWorker was stopped
+ */
+ boolean isReady() throws InterruptedException;
+
+ /**
+ * Used by unit tests to check how many tasks were processed
+ * @return number of tasks
+ */
+ @VisibleForTesting
+ int getTaskReadySeq();
+
+ /**
+ * set the listener for task changes. Implementation specific
+ */
+ void registerListener();
+
+ /**
+ * remove the listener for task changes. Implementation specific
+ */
+ void removeListener();
+
+ /* HLogSplitterHandler part */
+
+ /**
+ * Notify coordination engine that splitting task has completed.
+ * @param slt See {@link SplitLogTask}
+ * @param ctr counter to be updated
+ * @param splitTaskDetails details about log split task (specific to coordination engine being
+ * used).
+ */
+ void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails);
+
+ /**
+ * Interface for log-split tasks Used to carry implementation details in encapsulated way through
+ * Handlers to the coordination API.
+ */
+ static interface SplitTaskDetails {
+
+ /**
+ * @return full file path in HDFS for the WAL file to be split.
+ */
+ String getWALFile();
+ }
+
+ RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key)
+ throws IOException;
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
new file mode 100644
index 00000000000..243a37bd2be
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -0,0 +1,1103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * ZooKeeper based implementation of {@link SplitLogManagerCoordination}
+ */
+@InterfaceAudience.Private
+public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
+ SplitLogManagerCoordination {
+
+ public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails {
+
+ ZkSplitLogManagerDetails(ConcurrentMap tasks, MasterServices master,
+ Set failedDeletions, ServerName serverName) {
+ super(tasks, master, failedDeletions, serverName);
+ }
+ }
+
+ public static final int DEFAULT_TIMEOUT = 120000;
+ public static final int DEFAULT_ZK_RETRIES = 3;
+ public static final int DEFAULT_MAX_RESUBMIT = 3;
+
+ private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class);
+
+ private Server server;
+ private long zkretries;
+ private long resubmitThreshold;
+ private long timeout;
+ private TaskFinisher taskFinisher;
+
+ SplitLogManagerDetails details;
+
+ private final Stoppable stopper = null;
+
+ // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
+ // whether to GC stale recovering znodes
+ private volatile long lastRecoveringNodeCreationTime = 0;
+ private Configuration conf;
+ public boolean ignoreZKDeleteForTesting = false;
+
+ private RecoveryMode recoveryMode;
+
+ private boolean isDrainingDone = false;
+
+ public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
+ ZooKeeperWatcher watcher) {
+ super(watcher);
+ taskFinisher = new TaskFinisher() {
+ @Override
+ public Status finish(ServerName workerName, String logfile) {
+ try {
+ HLogSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
+ } catch (IOException e) {
+ LOG.warn("Could not finish splitting of log file " + logfile, e);
+ return Status.ERR;
+ }
+ return Status.DONE;
+ }
+ };
+ this.server = manager.getServer();
+ this.conf = server.getConfiguration();
+ }
+
+ @Override
+ public void init() throws IOException {
+ this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
+ this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
+ this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
+ setRecoveryMode(true);
+ if (this.watcher != null) {
+ this.watcher.registerListener(this);
+ lookForOrphans();
+ }
+ }
+
+ @Override
+ public String prepareTask(String taskname) {
+ return ZKSplitLog.getEncodedNodeName(watcher, taskname);
+ }
+
+ @Override
+ public int remainingTasksInCoordination() {
+ int count = 0;
+ try {
+ List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+ if (tasks != null) {
+ for (String t : tasks) {
+ if (!ZKSplitLog.isRescanNode(watcher, t)) {
+ count++;
+ }
+ }
+ }
+ } catch (KeeperException ke) {
+ LOG.warn("Failed to check remaining tasks", ke);
+ count = -1;
+ }
+ return count;
+ }
+
+ /**
+ * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants
+ * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create
+ * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this
+ * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
+ * @param path
+ */
+ private void handleUnassignedTask(String path) {
+ if (ZKSplitLog.isRescanNode(watcher, path)) {
+ return;
+ }
+ Task task = findOrCreateOrphanTask(path);
+ if (task.isOrphan() && (task.incarnation == 0)) {
+ LOG.info("resubmitting unassigned orphan task " + path);
+ // ignore failure to resubmit. The timeout-monitor will handle it later
+ // albeit in a more crude fashion
+ resubmitTask(path, task, FORCE);
+ }
+ }
+
+ @Override
+ public void deleteTask(String path) {
+ deleteNode(path, zkretries);
+ }
+
+ @Override
+ public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
+ // its ok if this thread misses the update to task.deleted. It will fail later
+ if (task.status != IN_PROGRESS) {
+ return false;
+ }
+ int version;
+ if (directive != FORCE) {
+ // We're going to resubmit:
+ // 1) immediately if the worker server is now marked as dead
+ // 2) after a configurable timeout if the server is not marked as dead but has still not
+ // finished the task. This allows to continue if the worker cannot actually handle it,
+ // for any reason.
+ final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
+ final boolean alive =
+ details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
+ .isServerOnline(task.cur_worker_name) : true;
+ if (alive && time < timeout) {
+ LOG.trace("Skipping the resubmit of " + task.toString() + " because the server "
+ + task.cur_worker_name + " is not marked as dead, we waited for " + time
+ + " while the timeout is " + timeout);
+ return false;
+ }
+
+ if (task.unforcedResubmits.get() >= resubmitThreshold) {
+ if (!task.resubmitThresholdReached) {
+ task.resubmitThresholdReached = true;
+ SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
+ LOG.info("Skipping resubmissions of task " + path + " because threshold "
+ + resubmitThreshold + " reached");
+ }
+ return false;
+ }
+ // race with heartbeat() that might be changing last_version
+ version = task.last_version;
+ } else {
+ SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
+ version = -1;
+ }
+ LOG.info("resubmitting task " + path);
+ task.incarnation++;
+ boolean result = resubmit(this.details.getServerName(), path, version);
+ if (!result) {
+ task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
+ return false;
+ }
+ // don't count forced resubmits
+ if (directive != FORCE) {
+ task.unforcedResubmits.incrementAndGet();
+ }
+ task.setUnassigned();
+ rescan(Long.MAX_VALUE);
+ SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
+ return true;
+ }
+
+
+ @Override
+ public void checkTasks() {
+ rescan(Long.MAX_VALUE);
+ };
+
+ /**
+ * signal the workers that a task was resubmitted by creating the RESCAN node.
+ */
+ private void rescan(long retries) {
+ // The RESCAN node will be deleted almost immediately by the
+ // SplitLogManager as soon as it is created because it is being
+ // created in the DONE state. This behavior prevents a buildup
+ // of RESCAN nodes. But there is also a chance that a SplitLogWorker
+ // might miss the watch-trigger that creation of RESCAN node provides.
+ // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
+ // therefore this behavior is safe.
+ SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), this.recoveryMode);
+ this.watcher
+ .getRecoverableZooKeeper()
+ .getZooKeeper()
+ .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries));
+ }
+
+ @Override
+ public void submitTask(String path) {
+ createNode(path, zkretries);
+ }
+
+ @Override
+ public void checkTaskStillAvailable(String path) {
+ // A negative retry count will lead to ignoring all error processing.
+ this.watcher
+ .getRecoverableZooKeeper()
+ .getZooKeeper()
+ .getData(path, this.watcher, new GetDataAsyncCallback(),
+ Long.valueOf(-1) /* retry count */);
+ SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
+ }
+
+ /**
+ * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
+ * region server hosting the region can allow reads to the recovered region
+ * @param recoveredServerNameSet servers which are just recovered
+ * @param isMetaRecovery whether current recovery is for the meta region on
+ * serverNames
+ */
+ @Override
+ public void removeRecoveringRegions(final Set recoveredServerNameSet,
+ Boolean isMetaRecovery) throws IOException {
+ final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
+ int count = 0;
+ try {
+ List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+ if (tasks != null) {
+ for (String t : tasks) {
+ if (!ZKSplitLog.isRescanNode(watcher, t)) {
+ count++;
+ }
+ }
+ }
+ if (count == 0 && this.details.getMaster().isInitialized()
+ && !this.details.getMaster().getServerManager().areDeadServersInProgress()) {
+ // no splitting work items left
+ ZKSplitLog.deleteRecoveringRegionZNodes(watcher, null);
+ // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
+ // this point.
+ lastRecoveringNodeCreationTime = Long.MAX_VALUE;
+ } else if (!recoveredServerNameSet.isEmpty()) {
+ // remove recovering regions which doesn't have any RS associated with it
+ List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+ if (regions != null) {
+ for (String region : regions) {
+ if (isMetaRecovery != null) {
+ if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
+ || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
+ // skip non-meta regions when recovering the meta region or
+ // skip the meta region when recovering user regions
+ continue;
+ }
+ }
+ String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
+ List failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
+ if (failedServers == null || failedServers.isEmpty()) {
+ ZKUtil.deleteNode(watcher, nodePath);
+ continue;
+ }
+ if (recoveredServerNameSet.containsAll(failedServers)) {
+ ZKUtil.deleteNodeRecursively(watcher, nodePath);
+ } else {
+ for (String failedServer : failedServers) {
+ if (recoveredServerNameSet.contains(failedServer)) {
+ String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
+ ZKUtil.deleteNode(watcher, tmpPath);
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (KeeperException ke) {
+ LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
+ throw new IOException(ke);
+ }
+ }
+
+ private void deleteNode(String path, Long retries) {
+ SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
+ // Once a task znode is ready for delete, that is it is in the TASK_DONE
+ // state, then no one should be writing to it anymore. That is no one
+ // will be updating the znode version any more.
+ this.watcher.getRecoverableZooKeeper().getZooKeeper()
+ .delete(path, -1, new DeleteAsyncCallback(), retries);
+ }
+
+ private void deleteNodeSuccess(String path) {
+ if (ignoreZKDeleteForTesting) {
+ return;
+ }
+ Task task;
+ task = details.getTasks().remove(path);
+ if (task == null) {
+ if (ZKSplitLog.isRescanNode(watcher, path)) {
+ SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
+ }
+ SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
+ LOG.debug("deleted task without in memory state " + path);
+ return;
+ }
+ synchronized (task) {
+ task.status = DELETED;
+ task.notify();
+ }
+ SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
+ }
+
+ private void deleteNodeFailure(String path) {
+ LOG.info("Failed to delete node " + path + " and will retry soon.");
+ return;
+ }
+
+ private void createRescanSuccess(String path) {
+ SplitLogCounters.tot_mgr_rescan.incrementAndGet();
+ getDataSetWatch(path, zkretries);
+ }
+
+ private void createRescanFailure() {
+ LOG.fatal("logic failure, rescan failure must not happen");
+ }
+
+ /**
+ * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
+ * @param statusCode integer value of a ZooKeeper exception code
+ * @param action description message about the retried action
+ * @return true when need to abandon retries otherwise false
+ */
+ private boolean needAbandonRetries(int statusCode, String action) {
+ if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
+ LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
+ + "action=" + action);
+ return true;
+ }
+ return false;
+ }
+
+ private void createNode(String path, Long retry_count) {
+ SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), this.recoveryMode);
+ ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
+ retry_count);
+ SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
+ return;
+ }
+
+ private void createNodeSuccess(String path) {
+ LOG.debug("put up splitlog task at znode " + path);
+ getDataSetWatch(path, zkretries);
+ }
+
+ private void createNodeFailure(String path) {
+ // TODO the Manager should split the log locally instead of giving up
+ LOG.warn("failed to create task node" + path);
+ setDone(path, FAILURE);
+ }
+
+ private void getDataSetWatch(String path, Long retry_count) {
+ this.watcher.getRecoverableZooKeeper().getZooKeeper()
+ .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
+ SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
+ }
+
+
+ private void getDataSetWatchSuccess(String path, byte[] data, int version)
+ throws DeserializationException {
+ if (data == null) {
+ if (version == Integer.MIN_VALUE) {
+ // assume all done. The task znode suddenly disappeared.
+ setDone(path, SUCCESS);
+ return;
+ }
+ SplitLogCounters.tot_mgr_null_data.incrementAndGet();
+ LOG.fatal("logic error - got null data " + path);
+ setDone(path, FAILURE);
+ return;
+ }
+ data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
+ SplitLogTask slt = SplitLogTask.parseFrom(data);
+ if (slt.isUnassigned()) {
+ LOG.debug("task not yet acquired " + path + " ver = " + version);
+ handleUnassignedTask(path);
+ } else if (slt.isOwned()) {
+ heartbeat(path, version, slt.getServerName());
+ } else if (slt.isResigned()) {
+ LOG.info("task " + path + " entered state: " + slt.toString());
+ resubmitOrFail(path, FORCE);
+ } else if (slt.isDone()) {
+ LOG.info("task " + path + " entered state: " + slt.toString());
+ if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
+ if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
+ setDone(path, SUCCESS);
+ } else {
+ resubmitOrFail(path, CHECK);
+ }
+ } else {
+ setDone(path, SUCCESS);
+ }
+ } else if (slt.isErr()) {
+ LOG.info("task " + path + " entered state: " + slt.toString());
+ resubmitOrFail(path, CHECK);
+ } else {
+ LOG.fatal("logic error - unexpected zk state for path = " + path + " data = "
+ + slt.toString());
+ setDone(path, FAILURE);
+ }
+ }
+
+ private void resubmitOrFail(String path, ResubmitDirective directive) {
+ if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
+ setDone(path, FAILURE);
+ }
+ }
+
+ private void getDataSetWatchFailure(String path) {
+ LOG.warn("failed to set data watch " + path);
+ setDone(path, FAILURE);
+ }
+
+ private void setDone(String path, TerminationStatus status) {
+ Task task = details.getTasks().get(path);
+ if (task == null) {
+ if (!ZKSplitLog.isRescanNode(watcher, path)) {
+ SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
+ LOG.debug("unacquired orphan task is done " + path);
+ }
+ } else {
+ synchronized (task) {
+ if (task.status == IN_PROGRESS) {
+ if (status == SUCCESS) {
+ SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
+ LOG.info("Done splitting " + path);
+ } else {
+ SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
+ LOG.warn("Error splitting " + path);
+ }
+ task.status = status;
+ if (task.batch != null) {
+ synchronized (task.batch) {
+ if (status == SUCCESS) {
+ task.batch.done++;
+ } else {
+ task.batch.error++;
+ }
+ task.batch.notify();
+ }
+ }
+ }
+ }
+ }
+ // delete the task node in zk. It's an async
+ // call and no one is blocked waiting for this node to be deleted. All
+ // task names are unique (log.) there is no risk of deleting
+ // a future task.
+ // if a deletion fails, TimeoutMonitor will retry the same deletion later
+ deleteNode(path, zkretries);
+ return;
+ }
+
+ Task findOrCreateOrphanTask(String path) {
+ Task orphanTask = new Task();
+ Task task;
+ task = details.getTasks().putIfAbsent(path, orphanTask);
+ if (task == null) {
+ LOG.info("creating orphan task " + path);
+ SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
+ task = orphanTask;
+ }
+ return task;
+ }
+
+ private void heartbeat(String path, int new_version, ServerName workerName) {
+ Task task = findOrCreateOrphanTask(path);
+ if (new_version != task.last_version) {
+ if (task.isUnassigned()) {
+ LOG.info("task " + path + " acquired by " + workerName);
+ }
+ task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
+ SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
+ } else {
+ // duplicate heartbeats - heartbeats w/o zk node version
+ // changing - are possible. The timeout thread does
+ // getDataSetWatch() just to check whether a node still
+ // exists or not
+ }
+ return;
+ }
+
+ private void lookForOrphans() {
+ List orphans;
+ try {
+ orphans = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.splitLogZNode);
+ if (orphans == null) {
+ LOG.warn("could not get children of " + this.watcher.splitLogZNode);
+ return;
+ }
+ } catch (KeeperException e) {
+ LOG.warn("could not get children of " + this.watcher.splitLogZNode + " "
+ + StringUtils.stringifyException(e));
+ return;
+ }
+ int rescan_nodes = 0;
+ for (String path : orphans) {
+ String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
+ if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
+ rescan_nodes++;
+ LOG.debug("found orphan rescan node " + path);
+ } else {
+ LOG.info("found orphan task " + path);
+ }
+ getDataSetWatch(nodepath, zkretries);
+ }
+ LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
+ + " rescan nodes");
+ }
+
+ /**
+ * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
+ * all regions of the passed in region servers
+ * @param serverName the name of a region server
+ * @param userRegions user regiones assigned on the region server
+ */
+ @Override
+ public void markRegionsRecovering(final ServerName serverName, Set userRegions)
+ throws IOException, InterruptedIOException {
+ this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
+ for (HRegionInfo region : userRegions) {
+ String regionEncodeName = region.getEncodedName();
+ long retries = this.zkretries;
+
+ do {
+ String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
+ long lastRecordedFlushedSequenceId = -1;
+ try {
+ long lastSequenceId =
+ this.details.getMaster().getServerManager()
+ .getLastFlushedSequenceId(regionEncodeName.getBytes());
+
+ /*
+ * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
+ * flushed sequence id for the server]
+ */
+ byte[] data = ZKUtil.getData(this.watcher, nodePath);
+ if (data == null) {
+ ZKUtil
+ .createSetData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
+ } else {
+ lastRecordedFlushedSequenceId =
+ ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
+ if (lastRecordedFlushedSequenceId < lastSequenceId) {
+ // update last flushed sequence id in the region level
+ ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
+ }
+ }
+ // go one level deeper with server name
+ nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
+ if (lastSequenceId <= lastRecordedFlushedSequenceId) {
+ // the newly assigned RS failed even before any flush to the region
+ lastSequenceId = lastRecordedFlushedSequenceId;
+ }
+ ZKUtil.createSetData(this.watcher, nodePath,
+ ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
+ LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
+ + serverName);
+
+ // break retry loop
+ break;
+ } catch (KeeperException e) {
+ // ignore ZooKeeper exceptions inside retry loop
+ if (retries <= 1) {
+ throw new IOException(e);
+ }
+ // wait a little bit for retry
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e1) {
+ throw new InterruptedIOException();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ } while ((--retries) > 0 && (!this.stopper.isStopped()));
+ }
+ }
+
+ @Override
+ public void nodeDataChanged(String path) {
+ Task task;
+ task = details.getTasks().get(path);
+ if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
+ if (task != null) {
+ task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
+ }
+ getDataSetWatch(path, zkretries);
+ }
+ }
+
+ /**
+ * ZooKeeper implementation of
+ * {@link SplitLogManagerCoordination#removeStaleRecoveringRegions(Set)}
+ */
+ @Override
+ public void removeStaleRecoveringRegions(final Set knownFailedServers)
+ throws IOException, InterruptedIOException {
+
+ try {
+ List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+ if (tasks != null) {
+ for (String t : tasks) {
+ byte[] data;
+ try {
+ data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ if (data != null) {
+ SplitLogTask slt = null;
+ try {
+ slt = SplitLogTask.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse data for znode " + t, e);
+ }
+ if (slt != null && slt.isDone()) {
+ continue;
+ }
+ }
+ // decode the file name
+ t = ZKSplitLog.getFileName(t);
+ ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
+ if (serverName != null) {
+ knownFailedServers.add(serverName.getServerName());
+ } else {
+ LOG.warn("Found invalid WAL log file name:" + t);
+ }
+ }
+ }
+
+ // remove recovering regions which doesn't have any RS associated with it
+ List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+ if (regions != null) {
+ for (String region : regions) {
+ String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
+ List regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
+ if (regionFailedServers == null || regionFailedServers.isEmpty()) {
+ ZKUtil.deleteNode(watcher, nodePath);
+ continue;
+ }
+ boolean needMoreRecovery = false;
+ for (String tmpFailedServer : regionFailedServers) {
+ if (knownFailedServers.contains(tmpFailedServer)) {
+ needMoreRecovery = true;
+ break;
+ }
+ }
+ if (!needMoreRecovery) {
+ ZKUtil.deleteNodeRecursively(watcher, nodePath);
+ }
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean isReplaying() {
+ return this.recoveryMode == RecoveryMode.LOG_REPLAY;
+ }
+
+ @Override
+ public boolean isSplitting() {
+ return this.recoveryMode == RecoveryMode.LOG_SPLITTING;
+ }
+
+ /**
+ * This function is to set recovery mode from outstanding split log tasks from before or current
+ * configuration setting
+ * @param isForInitialization
+ * @throws IOException
+ */
+ @Override
+ public void setRecoveryMode(boolean isForInitialization) throws IOException {
+ if (this.isDrainingDone) {
+ // when there is no outstanding splitlogtask after master start up, we already have up to date
+ // recovery mode
+ return;
+ }
+ if (this.watcher == null) {
+ // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
+ this.isDrainingDone = true;
+ this.recoveryMode = RecoveryMode.LOG_SPLITTING;
+ return;
+ }
+ boolean hasSplitLogTask = false;
+ boolean hasRecoveringRegions = false;
+ RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
+ RecoveryMode recoveryModeInConfig =
+ (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
+
+ // Firstly check if there are outstanding recovering regions
+ try {
+ List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+ if (regions != null && !regions.isEmpty()) {
+ hasRecoveringRegions = true;
+ previousRecoveryMode = RecoveryMode.LOG_REPLAY;
+ }
+ if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+ // Secondly check if there are outstanding split log task
+ List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+ if (tasks != null && !tasks.isEmpty()) {
+ hasSplitLogTask = true;
+ if (isForInitialization) {
+ // during initialization, try to get recovery mode from splitlogtask
+ for (String task : tasks) {
+ try {
+ byte[] data =
+ ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, task));
+ if (data == null) continue;
+ SplitLogTask slt = SplitLogTask.parseFrom(data);
+ previousRecoveryMode = slt.getMode();
+ if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+ // created by old code base where we don't set recovery mode in splitlogtask
+ // we can safely set to LOG_SPLITTING because we're in master initialization code
+ // before SSH is enabled & there is no outstanding recovering regions
+ previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
+ }
+ break;
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse data for znode " + task, e);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+
+ synchronized (this) {
+ if (this.isDrainingDone) {
+ return;
+ }
+ if (!hasSplitLogTask && !hasRecoveringRegions) {
+ this.isDrainingDone = true;
+ this.recoveryMode = recoveryModeInConfig;
+ return;
+ } else if (!isForInitialization) {
+ // splitlogtask hasn't drained yet, keep existing recovery mode
+ return;
+ }
+
+ if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
+ this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
+ this.recoveryMode = previousRecoveryMode;
+ } else {
+ this.recoveryMode = recoveryModeInConfig;
+ }
+ }
+ }
+
+ /**
+ * Returns if distributed log replay is turned on or not
+ * @param conf
+ * @return true when distributed log replay is turned on
+ */
+ private boolean isDistributedLogReplay(Configuration conf) {
+ boolean dlr =
+ conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+ HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+ int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
+ }
+ // For distributed log replay, hfile version must be 3 at least; we need tag support.
+ return dlr && (version >= 3);
+ }
+
+ private boolean resubmit(ServerName serverName, String path, int version) {
+ try {
+ // blocking zk call but this is done from the timeout thread
+ SplitLogTask slt =
+ new SplitLogTask.Unassigned(this.details.getServerName(), this.recoveryMode);
+ if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
+ LOG.debug("failed to resubmit task " + path + " version changed");
+ return false;
+ }
+ } catch (NoNodeException e) {
+ LOG.warn("failed to resubmit because znode doesn't exist " + path
+ + " task done (or forced done by removing the znode)");
+ try {
+ getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ } catch (DeserializationException e1) {
+ LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
+ return false;
+ }
+ return false;
+ } catch (KeeperException.BadVersionException e) {
+ LOG.debug("failed to resubmit task " + path + " version changed");
+ return false;
+ } catch (KeeperException e) {
+ SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
+ LOG.warn("failed to resubmit " + path, e);
+ return false;
+ }
+ return true;
+ }
+
+
+ /**
+ * {@link SplitLogManager} can use objects implementing this interface to finish off a partially
+ * done task by {@link SplitLogWorker}. This provides a serialization point at the end of the task
+ * processing. Must be restartable and idempotent.
+ */
+ public interface TaskFinisher {
+ /**
+ * status that can be returned finish()
+ */
+ enum Status {
+ /**
+ * task completed successfully
+ */
+ DONE(),
+ /**
+ * task completed with error
+ */
+ ERR();
+ }
+
+ /**
+ * finish the partially done task. workername provides clue to where the partial results of the
+ * partially done tasks are present. taskname is the name of the task that was put up in
+ * zookeeper.
+ *
+ * @param workerName
+ * @param taskname
+ * @return DONE if task completed successfully, ERR otherwise
+ */
+ Status finish(ServerName workerName, String taskname);
+ }
+
+ /**
+ * Asynchronous handler for zk create node results. Retries on failures.
+ */
+ public class CreateAsyncCallback implements AsyncCallback.StringCallback {
+ private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
+ if (rc != 0) {
+ if (needAbandonRetries(rc, "Create znode " + path)) {
+ createNodeFailure(path);
+ return;
+ }
+ if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+ // What if there is a delete pending against this pre-existing
+ // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
+ // state. Only operations that will be carried out on this node by
+ // this manager are get-znode-data, task-finisher and delete-znode.
+ // And all code pieces correctly handle the case of suddenly
+ // disappearing task-znode.
+ LOG.debug("found pre-existing znode " + path);
+ SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
+ } else {
+ Long retry_count = (Long) ctx;
+ LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path
+ + " remaining retries=" + retry_count);
+ if (retry_count == 0) {
+ SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
+ createNodeFailure(path);
+ } else {
+ SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
+ createNode(path, retry_count - 1);
+ }
+ return;
+ }
+ }
+ createNodeSuccess(path);
+ }
+ }
+
+ /**
+ * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures.
+ */
+ public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
+ private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
+ if (rc != 0) {
+ if (needAbandonRetries(rc, "GetData from znode " + path)) {
+ return;
+ }
+ if (rc == KeeperException.Code.NONODE.intValue()) {
+ SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
+ LOG.warn("task znode " + path + " vanished or not created yet.");
+ // ignore since we should not end up in a case where there is in-memory task,
+ // but no znode. The only case is between the time task is created in-memory
+ // and the znode is created. See HBASE-11217.
+ return;
+ }
+ Long retry_count = (Long) ctx;
+
+ if (retry_count < 0) {
+ LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
+ + ". Ignoring error. No error handling. No retrying.");
+ return;
+ }
+ LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
+ + " remaining retries=" + retry_count);
+ if (retry_count == 0) {
+ SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
+ getDataSetWatchFailure(path);
+ } else {
+ SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
+ getDataSetWatch(path, retry_count - 1);
+ }
+ return;
+ }
+ try {
+ getDataSetWatchSuccess(path, data, stat.getVersion());
+ } catch (DeserializationException e) {
+ LOG.warn("Deserialization problem", e);
+ }
+ return;
+ }
+ }
+
+ /**
+ * Asynchronous handler for zk delete node results. Retries on failures.
+ */
+ public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
+ private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
+ if (rc != 0) {
+ if (needAbandonRetries(rc, "Delete znode " + path)) {
+ details.getFailedDeletions().add(path);
+ return;
+ }
+ if (rc != KeeperException.Code.NONODE.intValue()) {
+ SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
+ Long retry_count = (Long) ctx;
+ LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path
+ + " remaining retries=" + retry_count);
+ if (retry_count == 0) {
+ LOG.warn("delete failed " + path);
+ details.getFailedDeletions().add(path);
+ deleteNodeFailure(path);
+ } else {
+ deleteNode(path, retry_count - 1);
+ }
+ return;
+ } else {
+ LOG.info(path + " does not exist. Either was created but deleted behind our"
+ + " back by another pending delete OR was deleted"
+ + " in earlier retry rounds. zkretries = " + ctx);
+ }
+ } else {
+ LOG.debug("deleted " + path);
+ }
+ deleteNodeSuccess(path);
+ }
+ }
+
+ /**
+ * Asynchronous handler for zk create RESCAN-node results. Retries on failures.
+ *
+ * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the
+ * {@link SplitLogWorker}s to rescan for new tasks.
+ */
+ public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
+ private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ if (rc != 0) {
+ if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
+ return;
+ }
+ Long retry_count = (Long) ctx;
+ LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
+ + retry_count);
+ if (retry_count == 0) {
+ createRescanFailure();
+ } else {
+ rescan(retry_count - 1);
+ }
+ return;
+ }
+ // path is the original arg, name is the actual name that was created
+ createRescanSuccess(name);
+ }
+ }
+
+ @Override
+ public void setDetails(SplitLogManagerDetails details) {
+ this.details = details;
+ }
+
+ @Override
+ public SplitLogManagerDetails getDetails() {
+ return details;
+ }
+
+ @Override
+ public RecoveryMode getRecoveryMode() {
+ return recoveryMode;
+ }
+
+ @Override
+ public long getLastRecoveryTime() {
+ return lastRecoveringNodeCreationTime;
+ }
+
+ /**
+ * Temporary function that is used by unit tests only
+ */
+ public void setIgnoreDeleteForTesting(boolean b) {
+ ignoreZKDeleteForTesting = b;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
new file mode 100644
index 00000000000..3989211b537
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -0,0 +1,654 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * ZooKeeper based implementation of {@link SplitLogWorkerCoordination}
+ * It listen for changes in ZooKeeper and
+ *
+ */
+@InterfaceAudience.Private
+public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
+ SplitLogWorkerCoordination {
+
+ private static final Log LOG = LogFactory.getLog(ZkSplitLogWorkerCoordination.class);
+
+ private static final int checkInterval = 5000; // 5 seconds
+ private static final int FAILED_TO_OWN_TASK = -1;
+
+ private SplitLogWorker worker;
+
+ private TaskExecutor splitTaskExecutor;
+
+ private final Object taskReadyLock = new Object();
+ volatile int taskReadySeq = 0;
+ private volatile String currentTask = null;
+ private int currentVersion;
+ private volatile boolean shouldStop = false;
+ private final Object grabTaskLock = new Object();
+ private boolean workerInGrabTask = false;
+ private int reportPeriod;
+ private RegionServerServices server = null;
+ protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
+ private int maxConcurrentTasks = 0;
+
+ private final ZkCoordinatedStateManager manager;
+
+ public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager,
+ ZooKeeperWatcher watcher) {
+ super(watcher);
+ manager = zkCoordinatedStateManager;
+
+ }
+
+ /**
+ * Override handler from {@link ZooKeeperListener}
+ */
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.equals(watcher.splitLogZNode)) {
+ LOG.debug("tasks arrived or departed");
+ synchronized (taskReadyLock) {
+ taskReadySeq++;
+ taskReadyLock.notify();
+ }
+ }
+ }
+
+ /**
+ * Override handler from {@link ZooKeeperListener}
+ */
+ @Override
+ public void nodeDataChanged(String path) {
+ // there will be a self generated dataChanged event every time attemptToOwnTask()
+ // heartbeats the task znode by upping its version
+ synchronized (grabTaskLock) {
+ if (workerInGrabTask) {
+ // currentTask can change
+ String taskpath = currentTask;
+ if (taskpath != null && taskpath.equals(path)) {
+ getDataSetWatchAsync();
+ }
+ }
+ }
+ }
+
+ /**
+ * Override setter from {@link SplitLogWorkerCoordination}
+ */
+ @Override
+ public void init(RegionServerServices server, Configuration conf,
+ TaskExecutor splitExecutor, SplitLogWorker worker) {
+ this.server = server;
+ this.worker = worker;
+ this.splitTaskExecutor = splitExecutor;
+ maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
+ reportPeriod =
+ conf.getInt("hbase.splitlog.report.period",
+ conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
+ ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
+ }
+
+ /* Support functions for Zookeeper async callback */
+
+ void getDataSetWatchFailure(String path) {
+ synchronized (grabTaskLock) {
+ if (workerInGrabTask) {
+ // currentTask can change but that's ok
+ String taskpath = currentTask;
+ if (taskpath != null && taskpath.equals(path)) {
+ LOG.info("retrying data watch on " + path);
+ SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
+ getDataSetWatchAsync();
+ } else {
+ // no point setting a watch on the task which this worker is not
+ // working upon anymore
+ }
+ }
+ }
+ }
+
+ public void getDataSetWatchAsync() {
+ watcher.getRecoverableZooKeeper().getZooKeeper()
+ .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
+ SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
+ }
+
+ void getDataSetWatchSuccess(String path, byte[] data) {
+ SplitLogTask slt;
+ try {
+ slt = SplitLogTask.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return;
+ }
+ synchronized (grabTaskLock) {
+ if (workerInGrabTask) {
+ // currentTask can change but that's ok
+ String taskpath = currentTask;
+ if (taskpath != null && taskpath.equals(path)) {
+ ServerName serverName = manager.getServer().getServerName();
+ // have to compare data. cannot compare version because then there
+ // will be race with attemptToOwnTask()
+ // cannot just check whether the node has been transitioned to
+ // UNASSIGNED because by the time this worker sets the data watch
+ // the node might have made two transitions - from owned by this
+ // worker to unassigned to owned by another worker
+ if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
+ && !slt.isResigned(serverName)) {
+ LOG.info("task " + taskpath + " preempted from " + serverName
+ + ", current task state and owner=" + slt.toString());
+ worker.stopTask();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * try to grab a 'lock' on the task zk node to own and execute the task.
+ *
+ * @param path zk node for the task
+ */
+ private void grabTask(String path) {
+ Stat stat = new Stat();
+ byte[] data;
+ synchronized (grabTaskLock) {
+ currentTask = path;
+ workerInGrabTask = true;
+ if (Thread.interrupted()) {
+ return;
+ }
+ }
+ try {
+ try {
+ if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
+ SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
+ return;
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Failed to get data for znode " + path, e);
+ SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+ return;
+ }
+ SplitLogTask slt;
+ try {
+ slt = SplitLogTask.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse data for znode " + path, e);
+ SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+ return;
+ }
+ if (!slt.isUnassigned()) {
+ SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
+ return;
+ }
+
+ currentVersion =
+ attemptToOwnTask(true, watcher, server.getServerName(), path,
+ slt.getMode(), stat.getVersion());
+ if (currentVersion < 0) {
+ SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
+ return;
+ }
+
+ if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
+ ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
+ new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
+ splitTaskDetails.setTaskNode(currentTask);
+ splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
+
+ endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
+ SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
+ return;
+ }
+
+ LOG.info("worker " + server.getServerName() + " acquired task " + path);
+ SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
+ getDataSetWatchAsync();
+
+ submitTask(path, slt.getMode(), currentVersion, reportPeriod);
+
+ // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
+ try {
+ int sleepTime = RandomUtils.nextInt(500) + 500;
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while yielding for other region servers", e);
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ synchronized (grabTaskLock) {
+ workerInGrabTask = false;
+ // clear the interrupt from stopTask() otherwise the next task will
+ // suffer
+ Thread.interrupted();
+ }
+ }
+ }
+
+ /**
+ * Submit a log split task to executor service
+ * @param curTask task to submit
+ * @param curTaskZKVersion current version of task
+ */
+ void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
+ final int reportPeriod) {
+ final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
+
+ CancelableProgressable reporter = new CancelableProgressable() {
+ private long last_report_at = 0;
+
+ @Override
+ public boolean progress() {
+ long t = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t - last_report_at) > reportPeriod) {
+ last_report_at = t;
+ int latestZKVersion =
+ attemptToOwnTask(false, watcher, server.getServerName(), curTask,
+ mode, zkVersion.intValue());
+ if (latestZKVersion < 0) {
+ LOG.warn("Failed to heartbeat the task" + curTask);
+ return false;
+ }
+ zkVersion.setValue(latestZKVersion);
+ }
+ return true;
+ }
+ };
+ ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
+ new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
+ splitTaskDetails.setTaskNode(curTask);
+ splitTaskDetails.setCurTaskZKVersion(zkVersion);
+
+ HLogSplitterHandler hsh =
+ new HLogSplitterHandler(server, this, splitTaskDetails, reporter,
+ this.tasksInProgress, splitTaskExecutor, mode);
+ server.getExecutorService().submit(hsh);
+ }
+
+ /**
+ * This function calculates how many splitters it could create based on expected average tasks per
+ * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
+ * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
+ * @param numTasks current total number of available tasks
+ */
+ private int calculateAvailableSplitters(int numTasks) {
+ // at lease one RS(itself) available
+ int availableRSs = 1;
+ try {
+ List regionServers =
+ ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
+ availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
+ } catch (KeeperException e) {
+ // do nothing
+ LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
+ }
+
+ int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
+ expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
+ // calculate how many more splitters we could spawn
+ return Math.min(expectedTasksPerRS, maxConcurrentTasks)
+ - this.tasksInProgress.get();
+ }
+
+ /**
+ * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
+ *
+ * This method is also used to periodically heartbeat the task progress by transitioning the node
+ * from OWNED to OWNED.
+ *
+ * @param isFirstTime shows whther it's the first attempt.
+ * @param zkw zk wathcer
+ * @param server name
+ * @param task to own
+ * @param taskZKVersion version of the task in zk
+ * @return non-negative integer value when task can be owned by current region server otherwise -1
+ */
+ protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
+ ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
+ int latestZKVersion = FAILED_TO_OWN_TASK;
+ try {
+ SplitLogTask slt = new SplitLogTask.Owned(server, mode);
+ Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
+ if (stat == null) {
+ LOG.warn("zk.setData() returned null for path " + task);
+ SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
+ return FAILED_TO_OWN_TASK;
+ }
+ latestZKVersion = stat.getVersion();
+ SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
+ return latestZKVersion;
+ } catch (KeeperException e) {
+ if (!isFirstTime) {
+ if (e.code().equals(KeeperException.Code.NONODE)) {
+ LOG.warn("NONODE failed to assert ownership for " + task, e);
+ } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
+ LOG.warn("BADVERSION failed to assert ownership for " + task, e);
+ } else {
+ LOG.warn("failed to assert ownership for " + task, e);
+ }
+ }
+ } catch (InterruptedException e1) {
+ LOG.warn("Interrupted while trying to assert ownership of " + task + " "
+ + StringUtils.stringifyException(e1));
+ Thread.currentThread().interrupt();
+ }
+ SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
+ return FAILED_TO_OWN_TASK;
+ }
+
+ /**
+ * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This
+ * policy puts an upper-limit on the number of simultaneous log splitting that could be happening
+ * in a cluster.
+ *
+ * Synchronization using {@link #taskReadyLock} ensures that it will try to grab every task that
+ * has been put up
+ * @throws InterruptedException
+ */
+ @Override
+ public void taskLoop() throws InterruptedException {
+ while (!shouldStop) {
+ int seq_start = taskReadySeq;
+ List paths = null;
+ paths = getTaskList();
+ if (paths == null) {
+ LOG.warn("Could not get tasks, did someone remove " + watcher.splitLogZNode
+ + " ... worker thread exiting.");
+ return;
+ }
+ // pick meta wal firstly
+ int offset = (int) (Math.random() * paths.size());
+ for (int i = 0; i < paths.size(); i++) {
+ if (HLogUtil.isMetaFile(paths.get(i))) {
+ offset = i;
+ break;
+ }
+ }
+ int numTasks = paths.size();
+ for (int i = 0; i < numTasks; i++) {
+ int idx = (i + offset) % paths.size();
+ // don't call ZKSplitLog.getNodeName() because that will lead to
+ // double encoding of the path name
+ if (this.calculateAvailableSplitters(numTasks) > 0) {
+ grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
+ } else {
+ LOG.debug("Current region server " + server.getServerName() + " has "
+ + this.tasksInProgress.get() + " tasks in progress and can't take more.");
+ break;
+ }
+ if (shouldStop) {
+ return;
+ }
+ }
+ SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
+ synchronized (taskReadyLock) {
+ while (seq_start == taskReadySeq) {
+ taskReadyLock.wait(checkInterval);
+ if (server != null) {
+ // check to see if we have stale recovering regions in our internal memory state
+ Map recoveringRegions = server.getRecoveringRegions();
+ if (!recoveringRegions.isEmpty()) {
+ // Make a local copy to prevent ConcurrentModificationException when other threads
+ // modify recoveringRegions
+ List tmpCopy = new ArrayList(recoveringRegions.keySet());
+ for (String region : tmpCopy) {
+ String nodePath =
+ ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
+ try {
+ if (ZKUtil.checkExists(watcher, nodePath) == -1) {
+ HRegion r = recoveringRegions.remove(region);
+ if (r != null) {
+ r.setRecovering(false);
+ }
+ LOG.debug("Mark recovering region:" + region + " up.");
+ } else {
+ // current check is a defensive(or redundant) mechanism to prevent us from
+ // having stale recovering regions in our internal RS memory state while
+ // zookeeper(source of truth) says differently. We stop at the first good one
+ // because we should not have a single instance such as this in normal case so
+ // check the first one is good enough.
+ break;
+ }
+ } catch (KeeperException e) {
+ // ignore zookeeper error
+ LOG.debug("Got a zookeeper when trying to open a recovering region", e);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private List getTaskList() throws InterruptedException {
+ List childrenPaths = null;
+ long sleepTime = 1000;
+ // It will be in loop till it gets the list of children or
+ // it will come out if worker thread exited.
+ while (!shouldStop) {
+ try {
+ childrenPaths =
+ ZKUtil.listChildrenAndWatchForNewChildren(watcher,
+ watcher.splitLogZNode);
+ if (childrenPaths != null) {
+ return childrenPaths;
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Could not get children of znode " + watcher.splitLogZNode, e);
+ }
+ LOG.debug("Retry listChildren of znode " + watcher.splitLogZNode
+ + " after sleep for " + sleepTime + "ms!");
+ Thread.sleep(sleepTime);
+ }
+ return childrenPaths;
+ }
+
+ @Override
+ public void markCorrupted(Path rootDir, String name, FileSystem fs) {
+ ZKSplitLog.markCorrupted(rootDir, name, fs);
+ }
+
+ @Override
+ public boolean isReady() throws InterruptedException {
+ int result = -1;
+ try {
+ result = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
+ } catch (KeeperException e) {
+ // ignore
+ LOG.warn("Exception when checking for " + watcher.splitLogZNode
+ + " ... retrying", e);
+ }
+ if (result == -1) {
+ LOG.info(watcher.splitLogZNode
+ + " znode does not exist, waiting for master to create");
+ Thread.sleep(1000);
+ }
+ return (result != -1);
+ }
+
+ @Override
+ public int getTaskReadySeq() {
+ return taskReadySeq;
+ }
+
+ @Override
+ public void registerListener() {
+ watcher.registerListener(this);
+ }
+
+ @Override
+ public void removeListener() {
+ watcher.unregisterListener(this);
+ }
+
+
+ @Override
+ public void stopProcessingTasks() {
+ this.shouldStop = true;
+
+ }
+
+ @Override
+ public boolean isStop() {
+ return shouldStop;
+ }
+
+ @Override
+ public RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key)
+ throws IOException {
+ return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, key);
+ }
+
+ /**
+ * Asynchronous handler for zk get-data-set-watch on node results.
+ */
+ class GetDataAsyncCallback implements AsyncCallback.DataCallback {
+ private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
+ if (rc != 0) {
+ LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
+ getDataSetWatchFailure(path);
+ return;
+ }
+ data = watcher.getRecoverableZooKeeper().removeMetaData(data);
+ getDataSetWatchSuccess(path, data);
+ }
+ }
+
+ /*
+ * Next part is related to HLogSplitterHandler
+ */
+ /**
+ * endTask() can fail and the only way to recover out of it is for the {@link SplitLogManager} to
+ * timeout the task node.
+ * @param slt
+ * @param ctr
+ */
+ @Override
+ public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) {
+ ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
+ String task = zkDetails.getTaskNode();
+ int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
+ try {
+ if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
+ LOG.info("successfully transitioned task " + task + " to final state " + slt);
+ ctr.incrementAndGet();
+ return;
+ }
+ LOG.warn("failed to transistion task " + task + " to end state " + slt
+ + " because of version mismatch ");
+ } catch (KeeperException.BadVersionException bve) {
+ LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
+ bve);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.fatal(
+ "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
+ } catch (KeeperException e) {
+ LOG.warn("failed to end task, " + task + " " + slt, e);
+ }
+ SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
+ }
+
+ /**
+ * When ZK-based implementation wants to complete the task, it needs to know task znode and
+ * current znode cversion (needed for subsequent update operation).
+ */
+ public static class ZkSplitTaskDetails implements SplitTaskDetails {
+ private String taskNode;
+ private MutableInt curTaskZKVersion;
+
+ public ZkSplitTaskDetails() {
+ }
+
+ public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
+ this.taskNode = taskNode;
+ this.curTaskZKVersion = curTaskZKVersion;
+ }
+
+ public String getTaskNode() {
+ return taskNode;
+ }
+
+ public void setTaskNode(String taskNode) {
+ this.taskNode = taskNode;
+ }
+
+ public MutableInt getCurTaskZKVersion() {
+ return curTaskZKVersion;
+ }
+
+ public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
+ this.curTaskZKVersion = curTaskZKVersion;
+ }
+
+ @Override
+ public String getWALFile() {
+ return ZKSplitLog.getFileName(taskNode);
+ }
+ }
+
+}