From 2ceb875957c117460d1d88dd43db0f60577fde8a Mon Sep 17 00:00:00 2001 From: stack Date: Fri, 29 Aug 2014 16:47:14 -0700 Subject: [PATCH] HBASE-11072 Abstract WAL splitting from ZK (Sergey Soldatov) --- .../SplitLogManagerCoordination.java | 221 ++++ .../SplitLogWorkerCoordination.java | 141 +++ .../ZKSplitLogManagerCoordination.java | 1103 +++++++++++++++++ .../ZkSplitLogWorkerCoordination.java | 654 ++++++++++ 4 files changed, 2119 insertions(+) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java new file mode 100644 index 00000000000..3d9ec884ba2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java @@ -0,0 +1,221 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coordination; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; +import org.apache.hadoop.hbase.master.SplitLogManager.Task; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Coordination for SplitLogManager. It creates and works with tasks for split log operations
+ * Manager prepares task by calling {@link #prepareTask} and submit it by + * {@link #submitTask(String)}. After that it periodically check the number of remaining tasks by + * {@link #remainingTasksInCoordination()} and waits until it become zero. + *

+ * Methods required for task life circle:
+ * {@link #markRegionsRecovering(ServerName, Set)} mark regions for log replaying. Used by + * {@link MasterFileSystem}
+ * {@link #removeRecoveringRegions(Set, Boolean)} make regions cleanup that previous were marked as + * recovering. Called after all tasks processed
+ * {@link #removeStaleRecoveringRegions(Set)} remove stale recovering. called by + * {@link MasterFileSystem} after Active Master is initialized
+ * {@link #getLastRecoveryTime()} required for garbage collector and should indicate when the last + * recovery has been made
+ * {@link #checkTaskStillAvailable(String)} Check that task is still there
+ * {@link #checkTasks()} check for unassigned tasks and resubmit them + */ +@InterfaceAudience.Private +public interface SplitLogManagerCoordination { + + /** + * Detail class that shares data between coordination and split log manager + */ + public static class SplitLogManagerDetails { + final private ConcurrentMap tasks; + final private MasterServices master; + final private Set failedDeletions; + final private ServerName serverName; + + public SplitLogManagerDetails(ConcurrentMap tasks, MasterServices master, + Set failedDeletions, ServerName serverName) { + this.tasks = tasks; + this.master = master; + this.failedDeletions = failedDeletions; + this.serverName = serverName; + } + + /** + * @return the master value + */ + public MasterServices getMaster() { + return master; + } + + /** + * @return map of tasks + */ + public ConcurrentMap getTasks() { + return tasks; + } + + /** + * @return a set of failed deletions + */ + public Set getFailedDeletions() { + return failedDeletions; + } + + /** + * @return server name + */ + public ServerName getServerName() { + return serverName; + } + } + + /** + * Provide the configuration from the SplitLogManager + */ + void setDetails(SplitLogManagerDetails details); + + /** + * Returns the configuration that was provided previously + */ + SplitLogManagerDetails getDetails(); + + /** + * Prepare the new task + * @param taskName name of the task + * @return the task id + */ + String prepareTask(String taskName); + + /** + * Mark regions in recovering state for distributed log replay + * @param serverName server name + * @param userRegions set of regions to be marked + * @throws IOException in case of failure + * @throws InterruptedIOException + */ + void markRegionsRecovering(final ServerName serverName, Set userRegions) + throws IOException, InterruptedIOException; + + /** + * tells Coordination that it should check for new tasks + */ + void checkTasks(); + + /** + * It removes recovering regions from Coordination + * @param serverNames servers which are just recovered + * @param isMetaRecovery whether current recovery is for the meta region on + * serverNames + */ + void removeRecoveringRegions(Set serverNames, Boolean isMetaRecovery) throws IOException; + + /** + * Return the number of remaining tasks + */ + int remainingTasksInCoordination(); + + /** + * Check that the task is still there + * @param task node to check + */ + void checkTaskStillAvailable(String task); + + /** + * Change the recovery mode. + * @param b the recovery mode state + * @throws InterruptedIOException + * @throws IOException in case of failure + */ + void setRecoveryMode(boolean b) throws InterruptedIOException, IOException; + + /** + * Removes known stale servers + * @param knownServers set of previously failed servers + * @throws IOException in case of failure + * @throws InterruptedIOException + */ + void removeStaleRecoveringRegions(Set knownServers) throws IOException, + InterruptedIOException; + + /** + * Resubmit the task in case if found unassigned or failed + * @param taskName path related to task + * @param task to resubmit + * @param force whether it should be forced + * @return whether it was successful + */ + + boolean resubmitTask(String taskName, Task task, ResubmitDirective force); + + /** + * @param taskName to be submitted + */ + void submitTask(String taskName); + + /** + * @param taskName to be removed + */ + void deleteTask(String taskName); + + /** + * @return shows whether the log recovery mode is in replaying state + */ + boolean isReplaying(); + + /** + * @return shows whether the log recovery mode is in splitting state + */ + boolean isSplitting(); + + /** + * @return the time of last attempt to recover + */ + long getLastRecoveryTime(); + + /** + * Temporary function, mostly for UTs. In the regular code isReplaying or isSplitting should be + * used. + * @return the current log recovery mode. + */ + RecoveryMode getRecoveryMode(); + + /** + * Support method to init constants such as timeout. Mostly required for UTs. + * @throws IOException + */ + @VisibleForTesting + void init() throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java new file mode 100644 index 00000000000..5341ef646a0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java @@ -0,0 +1,141 @@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coordination; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; +import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Coordinated operations for {@link SplitLogWorker} and {@link HLogSplitterHandler} Important + * methods for SplitLogWorker:
+ * {@link #isReady()} called from {@link SplitLogWorker#run()} to check whether the coordination is + * ready to supply the tasks
+ * {@link #taskLoop()} loop for new tasks until the worker is stopped
+ * {@link #isStop()} a flag indicates whether worker should finish
+ * {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener + * for external changes in coordination (if required)
+ * {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that + *

+ * Important methods for HLogSplitterHandler:
+ * splitting task has completed. + */ +@InterfaceAudience.Private +public interface SplitLogWorkerCoordination { + +/* SplitLogWorker part */ + public static final int DEFAULT_MAX_SPLITTERS = 2; + + /** + * Initialize internal values. This method should be used when corresponding SplitLogWorker + * instance is created + * @param server instance of RegionServerServices to work with + * @param conf is current configuration. + * @param splitTaskExecutor split executor from SplitLogWorker + * @param worker instance of SplitLogWorker + */ + void init(RegionServerServices server, Configuration conf, + TaskExecutor splitTaskExecutor, SplitLogWorker worker); + + /** + * called when Coordination should stop processing tasks and exit + */ + void stopProcessingTasks(); + + /** + * @return the current value of exitWorker + */ + boolean isStop(); + + /** + * Wait for the new tasks and grab one + * @throws InterruptedException if the SplitLogWorker was stopped + */ + void taskLoop() throws InterruptedException; + + /** + * marks log file as corrupted + * @param rootDir where to find the log + * @param name of the log + * @param fs file system + */ + void markCorrupted(Path rootDir, String name, FileSystem fs); + + /** + * Check whether the log splitter is ready to supply tasks + * @return false if there is no tasks + * @throws InterruptedException if the SplitLogWorker was stopped + */ + boolean isReady() throws InterruptedException; + + /** + * Used by unit tests to check how many tasks were processed + * @return number of tasks + */ + @VisibleForTesting + int getTaskReadySeq(); + + /** + * set the listener for task changes. Implementation specific + */ + void registerListener(); + + /** + * remove the listener for task changes. Implementation specific + */ + void removeListener(); + + /* HLogSplitterHandler part */ + + /** + * Notify coordination engine that splitting task has completed. + * @param slt See {@link SplitLogTask} + * @param ctr counter to be updated + * @param splitTaskDetails details about log split task (specific to coordination engine being + * used). + */ + void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails); + + /** + * Interface for log-split tasks Used to carry implementation details in encapsulated way through + * Handlers to the coordination API. + */ + static interface SplitTaskDetails { + + /** + * @return full file path in HDFS for the WAL file to be split. + */ + String getWALFile(); + } + + RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key) + throws IOException; + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java new file mode 100644 index 00000000000..243a37bd2be --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -0,0 +1,1103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coordination; + +import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; +import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; +import org.apache.hadoop.hbase.master.SplitLogManager.Task; +import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +/** + * ZooKeeper based implementation of {@link SplitLogManagerCoordination} + */ +@InterfaceAudience.Private +public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements + SplitLogManagerCoordination { + + public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails { + + ZkSplitLogManagerDetails(ConcurrentMap tasks, MasterServices master, + Set failedDeletions, ServerName serverName) { + super(tasks, master, failedDeletions, serverName); + } + } + + public static final int DEFAULT_TIMEOUT = 120000; + public static final int DEFAULT_ZK_RETRIES = 3; + public static final int DEFAULT_MAX_RESUBMIT = 3; + + private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class); + + private Server server; + private long zkretries; + private long resubmitThreshold; + private long timeout; + private TaskFinisher taskFinisher; + + SplitLogManagerDetails details; + + private final Stoppable stopper = null; + + // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check + // whether to GC stale recovering znodes + private volatile long lastRecoveringNodeCreationTime = 0; + private Configuration conf; + public boolean ignoreZKDeleteForTesting = false; + + private RecoveryMode recoveryMode; + + private boolean isDrainingDone = false; + + public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager, + ZooKeeperWatcher watcher) { + super(watcher); + taskFinisher = new TaskFinisher() { + @Override + public Status finish(ServerName workerName, String logfile) { + try { + HLogSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration()); + } catch (IOException e) { + LOG.warn("Could not finish splitting of log file " + logfile, e); + return Status.ERR; + } + return Status.DONE; + } + }; + this.server = manager.getServer(); + this.conf = server.getConfiguration(); + } + + @Override + public void init() throws IOException { + this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); + this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); + this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT); + setRecoveryMode(true); + if (this.watcher != null) { + this.watcher.registerListener(this); + lookForOrphans(); + } + } + + @Override + public String prepareTask(String taskname) { + return ZKSplitLog.getEncodedNodeName(watcher, taskname); + } + + @Override + public int remainingTasksInCoordination() { + int count = 0; + try { + List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); + if (tasks != null) { + for (String t : tasks) { + if (!ZKSplitLog.isRescanNode(watcher, t)) { + count++; + } + } + } + } catch (KeeperException ke) { + LOG.warn("Failed to check remaining tasks", ke); + count = -1; + } + return count; + } + + /** + * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants + * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create + * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this + * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. + * @param path + */ + private void handleUnassignedTask(String path) { + if (ZKSplitLog.isRescanNode(watcher, path)) { + return; + } + Task task = findOrCreateOrphanTask(path); + if (task.isOrphan() && (task.incarnation == 0)) { + LOG.info("resubmitting unassigned orphan task " + path); + // ignore failure to resubmit. The timeout-monitor will handle it later + // albeit in a more crude fashion + resubmitTask(path, task, FORCE); + } + } + + @Override + public void deleteTask(String path) { + deleteNode(path, zkretries); + } + + @Override + public boolean resubmitTask(String path, Task task, ResubmitDirective directive) { + // its ok if this thread misses the update to task.deleted. It will fail later + if (task.status != IN_PROGRESS) { + return false; + } + int version; + if (directive != FORCE) { + // We're going to resubmit: + // 1) immediately if the worker server is now marked as dead + // 2) after a configurable timeout if the server is not marked as dead but has still not + // finished the task. This allows to continue if the worker cannot actually handle it, + // for any reason. + final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update; + final boolean alive = + details.getMaster().getServerManager() != null ? details.getMaster().getServerManager() + .isServerOnline(task.cur_worker_name) : true; + if (alive && time < timeout) { + LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " + + task.cur_worker_name + " is not marked as dead, we waited for " + time + + " while the timeout is " + timeout); + return false; + } + + if (task.unforcedResubmits.get() >= resubmitThreshold) { + if (!task.resubmitThresholdReached) { + task.resubmitThresholdReached = true; + SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet(); + LOG.info("Skipping resubmissions of task " + path + " because threshold " + + resubmitThreshold + " reached"); + } + return false; + } + // race with heartbeat() that might be changing last_version + version = task.last_version; + } else { + SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet(); + version = -1; + } + LOG.info("resubmitting task " + path); + task.incarnation++; + boolean result = resubmit(this.details.getServerName(), path, version); + if (!result) { + task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); + return false; + } + // don't count forced resubmits + if (directive != FORCE) { + task.unforcedResubmits.incrementAndGet(); + } + task.setUnassigned(); + rescan(Long.MAX_VALUE); + SplitLogCounters.tot_mgr_resubmit.incrementAndGet(); + return true; + } + + + @Override + public void checkTasks() { + rescan(Long.MAX_VALUE); + }; + + /** + * signal the workers that a task was resubmitted by creating the RESCAN node. + */ + private void rescan(long retries) { + // The RESCAN node will be deleted almost immediately by the + // SplitLogManager as soon as it is created because it is being + // created in the DONE state. This behavior prevents a buildup + // of RESCAN nodes. But there is also a chance that a SplitLogWorker + // might miss the watch-trigger that creation of RESCAN node provides. + // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks + // therefore this behavior is safe. + SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), this.recoveryMode); + this.watcher + .getRecoverableZooKeeper() + .getZooKeeper() + .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); + } + + @Override + public void submitTask(String path) { + createNode(path, zkretries); + } + + @Override + public void checkTaskStillAvailable(String path) { + // A negative retry count will lead to ignoring all error processing. + this.watcher + .getRecoverableZooKeeper() + .getZooKeeper() + .getData(path, this.watcher, new GetDataAsyncCallback(), + Long.valueOf(-1) /* retry count */); + SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); + } + + /** + * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the + * region server hosting the region can allow reads to the recovered region + * @param recoveredServerNameSet servers which are just recovered + * @param isMetaRecovery whether current recovery is for the meta region on + * serverNames + */ + @Override + public void removeRecoveringRegions(final Set recoveredServerNameSet, + Boolean isMetaRecovery) throws IOException { + final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(); + int count = 0; + try { + List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); + if (tasks != null) { + for (String t : tasks) { + if (!ZKSplitLog.isRescanNode(watcher, t)) { + count++; + } + } + } + if (count == 0 && this.details.getMaster().isInitialized() + && !this.details.getMaster().getServerManager().areDeadServersInProgress()) { + // no splitting work items left + ZKSplitLog.deleteRecoveringRegionZNodes(watcher, null); + // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at + // this point. + lastRecoveringNodeCreationTime = Long.MAX_VALUE; + } else if (!recoveredServerNameSet.isEmpty()) { + // remove recovering regions which doesn't have any RS associated with it + List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); + if (regions != null) { + for (String region : regions) { + if (isMetaRecovery != null) { + if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName)) + || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) { + // skip non-meta regions when recovering the meta region or + // skip the meta region when recovering user regions + continue; + } + } + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); + List failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath); + if (failedServers == null || failedServers.isEmpty()) { + ZKUtil.deleteNode(watcher, nodePath); + continue; + } + if (recoveredServerNameSet.containsAll(failedServers)) { + ZKUtil.deleteNodeRecursively(watcher, nodePath); + } else { + for (String failedServer : failedServers) { + if (recoveredServerNameSet.contains(failedServer)) { + String tmpPath = ZKUtil.joinZNode(nodePath, failedServer); + ZKUtil.deleteNode(watcher, tmpPath); + } + } + } + } + } + } + } catch (KeeperException ke) { + LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke); + throw new IOException(ke); + } + } + + private void deleteNode(String path, Long retries) { + SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet(); + // Once a task znode is ready for delete, that is it is in the TASK_DONE + // state, then no one should be writing to it anymore. That is no one + // will be updating the znode version any more. + this.watcher.getRecoverableZooKeeper().getZooKeeper() + .delete(path, -1, new DeleteAsyncCallback(), retries); + } + + private void deleteNodeSuccess(String path) { + if (ignoreZKDeleteForTesting) { + return; + } + Task task; + task = details.getTasks().remove(path); + if (task == null) { + if (ZKSplitLog.isRescanNode(watcher, path)) { + SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); + } + SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); + LOG.debug("deleted task without in memory state " + path); + return; + } + synchronized (task) { + task.status = DELETED; + task.notify(); + } + SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); + } + + private void deleteNodeFailure(String path) { + LOG.info("Failed to delete node " + path + " and will retry soon."); + return; + } + + private void createRescanSuccess(String path) { + SplitLogCounters.tot_mgr_rescan.incrementAndGet(); + getDataSetWatch(path, zkretries); + } + + private void createRescanFailure() { + LOG.fatal("logic failure, rescan failure must not happen"); + } + + /** + * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions + * @param statusCode integer value of a ZooKeeper exception code + * @param action description message about the retried action + * @return true when need to abandon retries otherwise false + */ + private boolean needAbandonRetries(int statusCode, String action) { + if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) { + LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for " + + "action=" + action); + return true; + } + return false; + } + + private void createNode(String path, Long retry_count) { + SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), this.recoveryMode); + ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), + retry_count); + SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); + return; + } + + private void createNodeSuccess(String path) { + LOG.debug("put up splitlog task at znode " + path); + getDataSetWatch(path, zkretries); + } + + private void createNodeFailure(String path) { + // TODO the Manager should split the log locally instead of giving up + LOG.warn("failed to create task node" + path); + setDone(path, FAILURE); + } + + private void getDataSetWatch(String path, Long retry_count) { + this.watcher.getRecoverableZooKeeper().getZooKeeper() + .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); + SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); + } + + + private void getDataSetWatchSuccess(String path, byte[] data, int version) + throws DeserializationException { + if (data == null) { + if (version == Integer.MIN_VALUE) { + // assume all done. The task znode suddenly disappeared. + setDone(path, SUCCESS); + return; + } + SplitLogCounters.tot_mgr_null_data.incrementAndGet(); + LOG.fatal("logic error - got null data " + path); + setDone(path, FAILURE); + return; + } + data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); + SplitLogTask slt = SplitLogTask.parseFrom(data); + if (slt.isUnassigned()) { + LOG.debug("task not yet acquired " + path + " ver = " + version); + handleUnassignedTask(path); + } else if (slt.isOwned()) { + heartbeat(path, version, slt.getServerName()); + } else if (slt.isResigned()) { + LOG.info("task " + path + " entered state: " + slt.toString()); + resubmitOrFail(path, FORCE); + } else if (slt.isDone()) { + LOG.info("task " + path + " entered state: " + slt.toString()); + if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { + if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { + setDone(path, SUCCESS); + } else { + resubmitOrFail(path, CHECK); + } + } else { + setDone(path, SUCCESS); + } + } else if (slt.isErr()) { + LOG.info("task " + path + " entered state: " + slt.toString()); + resubmitOrFail(path, CHECK); + } else { + LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + + slt.toString()); + setDone(path, FAILURE); + } + } + + private void resubmitOrFail(String path, ResubmitDirective directive) { + if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) { + setDone(path, FAILURE); + } + } + + private void getDataSetWatchFailure(String path) { + LOG.warn("failed to set data watch " + path); + setDone(path, FAILURE); + } + + private void setDone(String path, TerminationStatus status) { + Task task = details.getTasks().get(path); + if (task == null) { + if (!ZKSplitLog.isRescanNode(watcher, path)) { + SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); + LOG.debug("unacquired orphan task is done " + path); + } + } else { + synchronized (task) { + if (task.status == IN_PROGRESS) { + if (status == SUCCESS) { + SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); + LOG.info("Done splitting " + path); + } else { + SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); + LOG.warn("Error splitting " + path); + } + task.status = status; + if (task.batch != null) { + synchronized (task.batch) { + if (status == SUCCESS) { + task.batch.done++; + } else { + task.batch.error++; + } + task.batch.notify(); + } + } + } + } + } + // delete the task node in zk. It's an async + // call and no one is blocked waiting for this node to be deleted. All + // task names are unique (log.) there is no risk of deleting + // a future task. + // if a deletion fails, TimeoutMonitor will retry the same deletion later + deleteNode(path, zkretries); + return; + } + + Task findOrCreateOrphanTask(String path) { + Task orphanTask = new Task(); + Task task; + task = details.getTasks().putIfAbsent(path, orphanTask); + if (task == null) { + LOG.info("creating orphan task " + path); + SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); + task = orphanTask; + } + return task; + } + + private void heartbeat(String path, int new_version, ServerName workerName) { + Task task = findOrCreateOrphanTask(path); + if (new_version != task.last_version) { + if (task.isUnassigned()) { + LOG.info("task " + path + " acquired by " + workerName); + } + task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName); + SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); + } else { + // duplicate heartbeats - heartbeats w/o zk node version + // changing - are possible. The timeout thread does + // getDataSetWatch() just to check whether a node still + // exists or not + } + return; + } + + private void lookForOrphans() { + List orphans; + try { + orphans = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.splitLogZNode); + if (orphans == null) { + LOG.warn("could not get children of " + this.watcher.splitLogZNode); + return; + } + } catch (KeeperException e) { + LOG.warn("could not get children of " + this.watcher.splitLogZNode + " " + + StringUtils.stringifyException(e)); + return; + } + int rescan_nodes = 0; + for (String path : orphans) { + String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path); + if (ZKSplitLog.isRescanNode(watcher, nodepath)) { + rescan_nodes++; + LOG.debug("found orphan rescan node " + path); + } else { + LOG.info("found orphan task " + path); + } + getDataSetWatch(nodepath, zkretries); + } + LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes + + " rescan nodes"); + } + + /** + * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for + * all regions of the passed in region servers + * @param serverName the name of a region server + * @param userRegions user regiones assigned on the region server + */ + @Override + public void markRegionsRecovering(final ServerName serverName, Set userRegions) + throws IOException, InterruptedIOException { + this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis(); + for (HRegionInfo region : userRegions) { + String regionEncodeName = region.getEncodedName(); + long retries = this.zkretries; + + do { + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName); + long lastRecordedFlushedSequenceId = -1; + try { + long lastSequenceId = + this.details.getMaster().getServerManager() + .getLastFlushedSequenceId(regionEncodeName.getBytes()); + + /* + * znode layout: .../region_id[last known flushed sequence id]/failed server[last known + * flushed sequence id for the server] + */ + byte[] data = ZKUtil.getData(this.watcher, nodePath); + if (data == null) { + ZKUtil + .createSetData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId)); + } else { + lastRecordedFlushedSequenceId = + ZKSplitLog.parseLastFlushedSequenceIdFrom(data); + if (lastRecordedFlushedSequenceId < lastSequenceId) { + // update last flushed sequence id in the region level + ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId)); + } + } + // go one level deeper with server name + nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName()); + if (lastSequenceId <= lastRecordedFlushedSequenceId) { + // the newly assigned RS failed even before any flush to the region + lastSequenceId = lastRecordedFlushedSequenceId; + } + ZKUtil.createSetData(this.watcher, nodePath, + ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null)); + LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server " + + serverName); + + // break retry loop + break; + } catch (KeeperException e) { + // ignore ZooKeeper exceptions inside retry loop + if (retries <= 1) { + throw new IOException(e); + } + // wait a little bit for retry + try { + Thread.sleep(20); + } catch (InterruptedException e1) { + throw new InterruptedIOException(); + } + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } while ((--retries) > 0 && (!this.stopper.isStopped())); + } + } + + @Override + public void nodeDataChanged(String path) { + Task task; + task = details.getTasks().get(path); + if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { + if (task != null) { + task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis()); + } + getDataSetWatch(path, zkretries); + } + } + + /** + * ZooKeeper implementation of + * {@link SplitLogManagerCoordination#removeStaleRecoveringRegions(Set)} + */ + @Override + public void removeStaleRecoveringRegions(final Set knownFailedServers) + throws IOException, InterruptedIOException { + + try { + List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); + if (tasks != null) { + for (String t : tasks) { + byte[] data; + try { + data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t)); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + if (data != null) { + SplitLogTask slt = null; + try { + slt = SplitLogTask.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse data for znode " + t, e); + } + if (slt != null && slt.isDone()) { + continue; + } + } + // decode the file name + t = ZKSplitLog.getFileName(t); + ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t)); + if (serverName != null) { + knownFailedServers.add(serverName.getServerName()); + } else { + LOG.warn("Found invalid WAL log file name:" + t); + } + } + } + + // remove recovering regions which doesn't have any RS associated with it + List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); + if (regions != null) { + for (String region : regions) { + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); + List regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath); + if (regionFailedServers == null || regionFailedServers.isEmpty()) { + ZKUtil.deleteNode(watcher, nodePath); + continue; + } + boolean needMoreRecovery = false; + for (String tmpFailedServer : regionFailedServers) { + if (knownFailedServers.contains(tmpFailedServer)) { + needMoreRecovery = true; + break; + } + } + if (!needMoreRecovery) { + ZKUtil.deleteNodeRecursively(watcher, nodePath); + } + } + } + } catch (KeeperException e) { + throw new IOException(e); + } + } + + @Override + public boolean isReplaying() { + return this.recoveryMode == RecoveryMode.LOG_REPLAY; + } + + @Override + public boolean isSplitting() { + return this.recoveryMode == RecoveryMode.LOG_SPLITTING; + } + + /** + * This function is to set recovery mode from outstanding split log tasks from before or current + * configuration setting + * @param isForInitialization + * @throws IOException + */ + @Override + public void setRecoveryMode(boolean isForInitialization) throws IOException { + if (this.isDrainingDone) { + // when there is no outstanding splitlogtask after master start up, we already have up to date + // recovery mode + return; + } + if (this.watcher == null) { + // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING + this.isDrainingDone = true; + this.recoveryMode = RecoveryMode.LOG_SPLITTING; + return; + } + boolean hasSplitLogTask = false; + boolean hasRecoveringRegions = false; + RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN; + RecoveryMode recoveryModeInConfig = + (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING; + + // Firstly check if there are outstanding recovering regions + try { + List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); + if (regions != null && !regions.isEmpty()) { + hasRecoveringRegions = true; + previousRecoveryMode = RecoveryMode.LOG_REPLAY; + } + if (previousRecoveryMode == RecoveryMode.UNKNOWN) { + // Secondly check if there are outstanding split log task + List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); + if (tasks != null && !tasks.isEmpty()) { + hasSplitLogTask = true; + if (isForInitialization) { + // during initialization, try to get recovery mode from splitlogtask + for (String task : tasks) { + try { + byte[] data = + ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, task)); + if (data == null) continue; + SplitLogTask slt = SplitLogTask.parseFrom(data); + previousRecoveryMode = slt.getMode(); + if (previousRecoveryMode == RecoveryMode.UNKNOWN) { + // created by old code base where we don't set recovery mode in splitlogtask + // we can safely set to LOG_SPLITTING because we're in master initialization code + // before SSH is enabled & there is no outstanding recovering regions + previousRecoveryMode = RecoveryMode.LOG_SPLITTING; + } + break; + } catch (DeserializationException e) { + LOG.warn("Failed parse data for znode " + task, e); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + } + } + } + } catch (KeeperException e) { + throw new IOException(e); + } + + synchronized (this) { + if (this.isDrainingDone) { + return; + } + if (!hasSplitLogTask && !hasRecoveringRegions) { + this.isDrainingDone = true; + this.recoveryMode = recoveryModeInConfig; + return; + } else if (!isForInitialization) { + // splitlogtask hasn't drained yet, keep existing recovery mode + return; + } + + if (previousRecoveryMode != RecoveryMode.UNKNOWN) { + this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig); + this.recoveryMode = previousRecoveryMode; + } else { + this.recoveryMode = recoveryModeInConfig; + } + } + } + + /** + * Returns if distributed log replay is turned on or not + * @param conf + * @return true when distributed log replay is turned on + */ + private boolean isDistributedLogReplay(Configuration conf) { + boolean dlr = + conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, + HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); + int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); + if (LOG.isDebugEnabled()) { + LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version); + } + // For distributed log replay, hfile version must be 3 at least; we need tag support. + return dlr && (version >= 3); + } + + private boolean resubmit(ServerName serverName, String path, int version) { + try { + // blocking zk call but this is done from the timeout thread + SplitLogTask slt = + new SplitLogTask.Unassigned(this.details.getServerName(), this.recoveryMode); + if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { + LOG.debug("failed to resubmit task " + path + " version changed"); + return false; + } + } catch (NoNodeException e) { + LOG.warn("failed to resubmit because znode doesn't exist " + path + + " task done (or forced done by removing the znode)"); + try { + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + } catch (DeserializationException e1) { + LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1); + return false; + } + return false; + } catch (KeeperException.BadVersionException e) { + LOG.debug("failed to resubmit task " + path + " version changed"); + return false; + } catch (KeeperException e) { + SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet(); + LOG.warn("failed to resubmit " + path, e); + return false; + } + return true; + } + + + /** + * {@link SplitLogManager} can use objects implementing this interface to finish off a partially + * done task by {@link SplitLogWorker}. This provides a serialization point at the end of the task + * processing. Must be restartable and idempotent. + */ + public interface TaskFinisher { + /** + * status that can be returned finish() + */ + enum Status { + /** + * task completed successfully + */ + DONE(), + /** + * task completed with error + */ + ERR(); + } + + /** + * finish the partially done task. workername provides clue to where the partial results of the + * partially done tasks are present. taskname is the name of the task that was put up in + * zookeeper. + *

+ * @param workerName + * @param taskname + * @return DONE if task completed successfully, ERR otherwise + */ + Status finish(ServerName workerName, String taskname); + } + + /** + * Asynchronous handler for zk create node results. Retries on failures. + */ + public class CreateAsyncCallback implements AsyncCallback.StringCallback { + private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + SplitLogCounters.tot_mgr_node_create_result.incrementAndGet(); + if (rc != 0) { + if (needAbandonRetries(rc, "Create znode " + path)) { + createNodeFailure(path); + return; + } + if (rc == KeeperException.Code.NODEEXISTS.intValue()) { + // What if there is a delete pending against this pre-existing + // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE + // state. Only operations that will be carried out on this node by + // this manager are get-znode-data, task-finisher and delete-znode. + // And all code pieces correctly handle the case of suddenly + // disappearing task-znode. + LOG.debug("found pre-existing znode " + path); + SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet(); + } else { + Long retry_count = (Long) ctx; + LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path + + " remaining retries=" + retry_count); + if (retry_count == 0) { + SplitLogCounters.tot_mgr_node_create_err.incrementAndGet(); + createNodeFailure(path); + } else { + SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); + createNode(path, retry_count - 1); + } + return; + } + } + createNodeSuccess(path); + } + } + + /** + * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures. + */ + public class GetDataAsyncCallback implements AsyncCallback.DataCallback { + private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + SplitLogCounters.tot_mgr_get_data_result.incrementAndGet(); + if (rc != 0) { + if (needAbandonRetries(rc, "GetData from znode " + path)) { + return; + } + if (rc == KeeperException.Code.NONODE.intValue()) { + SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); + LOG.warn("task znode " + path + " vanished or not created yet."); + // ignore since we should not end up in a case where there is in-memory task, + // but no znode. The only case is between the time task is created in-memory + // and the znode is created. See HBASE-11217. + return; + } + Long retry_count = (Long) ctx; + + if (retry_count < 0) { + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + + ". Ignoring error. No error handling. No retrying."); + return; + } + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + + " remaining retries=" + retry_count); + if (retry_count == 0) { + SplitLogCounters.tot_mgr_get_data_err.incrementAndGet(); + getDataSetWatchFailure(path); + } else { + SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet(); + getDataSetWatch(path, retry_count - 1); + } + return; + } + try { + getDataSetWatchSuccess(path, data, stat.getVersion()); + } catch (DeserializationException e) { + LOG.warn("Deserialization problem", e); + } + return; + } + } + + /** + * Asynchronous handler for zk delete node results. Retries on failures. + */ + public class DeleteAsyncCallback implements AsyncCallback.VoidCallback { + private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx) { + SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); + if (rc != 0) { + if (needAbandonRetries(rc, "Delete znode " + path)) { + details.getFailedDeletions().add(path); + return; + } + if (rc != KeeperException.Code.NONODE.intValue()) { + SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet(); + Long retry_count = (Long) ctx; + LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path + + " remaining retries=" + retry_count); + if (retry_count == 0) { + LOG.warn("delete failed " + path); + details.getFailedDeletions().add(path); + deleteNodeFailure(path); + } else { + deleteNode(path, retry_count - 1); + } + return; + } else { + LOG.info(path + " does not exist. Either was created but deleted behind our" + + " back by another pending delete OR was deleted" + + " in earlier retry rounds. zkretries = " + ctx); + } + } else { + LOG.debug("deleted " + path); + } + deleteNodeSuccess(path); + } + } + + /** + * Asynchronous handler for zk create RESCAN-node results. Retries on failures. + *

+ * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the + * {@link SplitLogWorker}s to rescan for new tasks. + */ + public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { + private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + if (rc != 0) { + if (needAbandonRetries(rc, "CreateRescan znode " + path)) { + return; + } + Long retry_count = (Long) ctx; + LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" + + retry_count); + if (retry_count == 0) { + createRescanFailure(); + } else { + rescan(retry_count - 1); + } + return; + } + // path is the original arg, name is the actual name that was created + createRescanSuccess(name); + } + } + + @Override + public void setDetails(SplitLogManagerDetails details) { + this.details = details; + } + + @Override + public SplitLogManagerDetails getDetails() { + return details; + } + + @Override + public RecoveryMode getRecoveryMode() { + return recoveryMode; + } + + @Override + public long getLastRecoveryTime() { + return lastRecoveringNodeCreationTime; + } + + /** + * Temporary function that is used by unit tests only + */ + public void setIgnoreDeleteForTesting(boolean b) { + ignoreZKDeleteForTesting = b; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java new file mode 100644 index 00000000000..3989211b537 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -0,0 +1,654 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coordination; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; +import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +/** + * ZooKeeper based implementation of {@link SplitLogWorkerCoordination} + * It listen for changes in ZooKeeper and + * + */ +@InterfaceAudience.Private +public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements + SplitLogWorkerCoordination { + + private static final Log LOG = LogFactory.getLog(ZkSplitLogWorkerCoordination.class); + + private static final int checkInterval = 5000; // 5 seconds + private static final int FAILED_TO_OWN_TASK = -1; + + private SplitLogWorker worker; + + private TaskExecutor splitTaskExecutor; + + private final Object taskReadyLock = new Object(); + volatile int taskReadySeq = 0; + private volatile String currentTask = null; + private int currentVersion; + private volatile boolean shouldStop = false; + private final Object grabTaskLock = new Object(); + private boolean workerInGrabTask = false; + private int reportPeriod; + private RegionServerServices server = null; + protected final AtomicInteger tasksInProgress = new AtomicInteger(0); + private int maxConcurrentTasks = 0; + + private final ZkCoordinatedStateManager manager; + + public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager, + ZooKeeperWatcher watcher) { + super(watcher); + manager = zkCoordinatedStateManager; + + } + + /** + * Override handler from {@link ZooKeeperListener} + */ + @Override + public void nodeChildrenChanged(String path) { + if (path.equals(watcher.splitLogZNode)) { + LOG.debug("tasks arrived or departed"); + synchronized (taskReadyLock) { + taskReadySeq++; + taskReadyLock.notify(); + } + } + } + + /** + * Override handler from {@link ZooKeeperListener} + */ + @Override + public void nodeDataChanged(String path) { + // there will be a self generated dataChanged event every time attemptToOwnTask() + // heartbeats the task znode by upping its version + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + getDataSetWatchAsync(); + } + } + } + } + + /** + * Override setter from {@link SplitLogWorkerCoordination} + */ + @Override + public void init(RegionServerServices server, Configuration conf, + TaskExecutor splitExecutor, SplitLogWorker worker) { + this.server = server; + this.worker = worker; + this.splitTaskExecutor = splitExecutor; + maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + reportPeriod = + conf.getInt("hbase.splitlog.report.period", + conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, + ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3); + } + + /* Support functions for Zookeeper async callback */ + + void getDataSetWatchFailure(String path) { + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + LOG.info("retrying data watch on " + path); + SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet(); + getDataSetWatchAsync(); + } else { + // no point setting a watch on the task which this worker is not + // working upon anymore + } + } + } + } + + public void getDataSetWatchAsync() { + watcher.getRecoverableZooKeeper().getZooKeeper() + .getData(currentTask, watcher, new GetDataAsyncCallback(), null); + SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet(); + } + + void getDataSetWatchSuccess(String path, byte[] data) { + SplitLogTask slt; + try { + slt = SplitLogTask.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse", e); + return; + } + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + ServerName serverName = manager.getServer().getServerName(); + // have to compare data. cannot compare version because then there + // will be race with attemptToOwnTask() + // cannot just check whether the node has been transitioned to + // UNASSIGNED because by the time this worker sets the data watch + // the node might have made two transitions - from owned by this + // worker to unassigned to owned by another worker + if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName) + && !slt.isResigned(serverName)) { + LOG.info("task " + taskpath + " preempted from " + serverName + + ", current task state and owner=" + slt.toString()); + worker.stopTask(); + } + } + } + } + } + + /** + * try to grab a 'lock' on the task zk node to own and execute the task. + *

+ * @param path zk node for the task + */ + private void grabTask(String path) { + Stat stat = new Stat(); + byte[] data; + synchronized (grabTaskLock) { + currentTask = path; + workerInGrabTask = true; + if (Thread.interrupted()) { + return; + } + } + try { + try { + if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { + SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); + return; + } + } catch (KeeperException e) { + LOG.warn("Failed to get data for znode " + path, e); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + return; + } + SplitLogTask slt; + try { + slt = SplitLogTask.parseFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed parse data for znode " + path, e); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + return; + } + if (!slt.isUnassigned()) { + SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); + return; + } + + currentVersion = + attemptToOwnTask(true, watcher, server.getServerName(), path, + slt.getMode(), stat.getVersion()); + if (currentVersion < 0) { + SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); + return; + } + + if (ZKSplitLog.isRescanNode(watcher, currentTask)) { + ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = + new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); + splitTaskDetails.setTaskNode(currentTask); + splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion)); + + endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()), + SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); + return; + } + + LOG.info("worker " + server.getServerName() + " acquired task " + path); + SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); + getDataSetWatchAsync(); + + submitTask(path, slt.getMode(), currentVersion, reportPeriod); + + // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks + try { + int sleepTime = RandomUtils.nextInt(500) + 500; + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.warn("Interrupted while yielding for other region servers", e); + Thread.currentThread().interrupt(); + } + } finally { + synchronized (grabTaskLock) { + workerInGrabTask = false; + // clear the interrupt from stopTask() otherwise the next task will + // suffer + Thread.interrupted(); + } + } + } + + /** + * Submit a log split task to executor service + * @param curTask task to submit + * @param curTaskZKVersion current version of task + */ + void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion, + final int reportPeriod) { + final MutableInt zkVersion = new MutableInt(curTaskZKVersion); + + CancelableProgressable reporter = new CancelableProgressable() { + private long last_report_at = 0; + + @Override + public boolean progress() { + long t = EnvironmentEdgeManager.currentTimeMillis(); + if ((t - last_report_at) > reportPeriod) { + last_report_at = t; + int latestZKVersion = + attemptToOwnTask(false, watcher, server.getServerName(), curTask, + mode, zkVersion.intValue()); + if (latestZKVersion < 0) { + LOG.warn("Failed to heartbeat the task" + curTask); + return false; + } + zkVersion.setValue(latestZKVersion); + } + return true; + } + }; + ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = + new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); + splitTaskDetails.setTaskNode(curTask); + splitTaskDetails.setCurTaskZKVersion(zkVersion); + + HLogSplitterHandler hsh = + new HLogSplitterHandler(server, this, splitTaskDetails, reporter, + this.tasksInProgress, splitTaskExecutor, mode); + server.getExecutorService().submit(hsh); + } + + /** + * This function calculates how many splitters it could create based on expected average tasks per + * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
+ * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) + * @param numTasks current total number of available tasks + */ + private int calculateAvailableSplitters(int numTasks) { + // at lease one RS(itself) available + int availableRSs = 1; + try { + List regionServers = + ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode); + availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); + } catch (KeeperException e) { + // do nothing + LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); + } + + int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); + expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one + // calculate how many more splitters we could spawn + return Math.min(expectedTasksPerRS, maxConcurrentTasks) + - this.tasksInProgress.get(); + } + + /** + * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. + *

+ * This method is also used to periodically heartbeat the task progress by transitioning the node + * from OWNED to OWNED. + *

+ * @param isFirstTime shows whther it's the first attempt. + * @param zkw zk wathcer + * @param server name + * @param task to own + * @param taskZKVersion version of the task in zk + * @return non-negative integer value when task can be owned by current region server otherwise -1 + */ + protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw, + ServerName server, String task, RecoveryMode mode, int taskZKVersion) { + int latestZKVersion = FAILED_TO_OWN_TASK; + try { + SplitLogTask slt = new SplitLogTask.Owned(server, mode); + Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); + if (stat == null) { + LOG.warn("zk.setData() returned null for path " + task); + SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); + return FAILED_TO_OWN_TASK; + } + latestZKVersion = stat.getVersion(); + SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); + return latestZKVersion; + } catch (KeeperException e) { + if (!isFirstTime) { + if (e.code().equals(KeeperException.Code.NONODE)) { + LOG.warn("NONODE failed to assert ownership for " + task, e); + } else if (e.code().equals(KeeperException.Code.BADVERSION)) { + LOG.warn("BADVERSION failed to assert ownership for " + task, e); + } else { + LOG.warn("failed to assert ownership for " + task, e); + } + } + } catch (InterruptedException e1) { + LOG.warn("Interrupted while trying to assert ownership of " + task + " " + + StringUtils.stringifyException(e1)); + Thread.currentThread().interrupt(); + } + SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); + return FAILED_TO_OWN_TASK; + } + + /** + * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This + * policy puts an upper-limit on the number of simultaneous log splitting that could be happening + * in a cluster. + *

+ * Synchronization using {@link #taskReadyLock} ensures that it will try to grab every task that + * has been put up + * @throws InterruptedException + */ + @Override + public void taskLoop() throws InterruptedException { + while (!shouldStop) { + int seq_start = taskReadySeq; + List paths = null; + paths = getTaskList(); + if (paths == null) { + LOG.warn("Could not get tasks, did someone remove " + watcher.splitLogZNode + + " ... worker thread exiting."); + return; + } + // pick meta wal firstly + int offset = (int) (Math.random() * paths.size()); + for (int i = 0; i < paths.size(); i++) { + if (HLogUtil.isMetaFile(paths.get(i))) { + offset = i; + break; + } + } + int numTasks = paths.size(); + for (int i = 0; i < numTasks; i++) { + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + if (this.calculateAvailableSplitters(numTasks) > 0) { + grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + } else { + LOG.debug("Current region server " + server.getServerName() + " has " + + this.tasksInProgress.get() + " tasks in progress and can't take more."); + break; + } + if (shouldStop) { + return; + } + } + SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); + synchronized (taskReadyLock) { + while (seq_start == taskReadySeq) { + taskReadyLock.wait(checkInterval); + if (server != null) { + // check to see if we have stale recovering regions in our internal memory state + Map recoveringRegions = server.getRecoveringRegions(); + if (!recoveringRegions.isEmpty()) { + // Make a local copy to prevent ConcurrentModificationException when other threads + // modify recoveringRegions + List tmpCopy = new ArrayList(recoveringRegions.keySet()); + for (String region : tmpCopy) { + String nodePath = + ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); + try { + if (ZKUtil.checkExists(watcher, nodePath) == -1) { + HRegion r = recoveringRegions.remove(region); + if (r != null) { + r.setRecovering(false); + } + LOG.debug("Mark recovering region:" + region + " up."); + } else { + // current check is a defensive(or redundant) mechanism to prevent us from + // having stale recovering regions in our internal RS memory state while + // zookeeper(source of truth) says differently. We stop at the first good one + // because we should not have a single instance such as this in normal case so + // check the first one is good enough. + break; + } + } catch (KeeperException e) { + // ignore zookeeper error + LOG.debug("Got a zookeeper when trying to open a recovering region", e); + break; + } + } + } + } + } + } + } + } + + private List getTaskList() throws InterruptedException { + List childrenPaths = null; + long sleepTime = 1000; + // It will be in loop till it gets the list of children or + // it will come out if worker thread exited. + while (!shouldStop) { + try { + childrenPaths = + ZKUtil.listChildrenAndWatchForNewChildren(watcher, + watcher.splitLogZNode); + if (childrenPaths != null) { + return childrenPaths; + } + } catch (KeeperException e) { + LOG.warn("Could not get children of znode " + watcher.splitLogZNode, e); + } + LOG.debug("Retry listChildren of znode " + watcher.splitLogZNode + + " after sleep for " + sleepTime + "ms!"); + Thread.sleep(sleepTime); + } + return childrenPaths; + } + + @Override + public void markCorrupted(Path rootDir, String name, FileSystem fs) { + ZKSplitLog.markCorrupted(rootDir, name, fs); + } + + @Override + public boolean isReady() throws InterruptedException { + int result = -1; + try { + result = ZKUtil.checkExists(watcher, watcher.splitLogZNode); + } catch (KeeperException e) { + // ignore + LOG.warn("Exception when checking for " + watcher.splitLogZNode + + " ... retrying", e); + } + if (result == -1) { + LOG.info(watcher.splitLogZNode + + " znode does not exist, waiting for master to create"); + Thread.sleep(1000); + } + return (result != -1); + } + + @Override + public int getTaskReadySeq() { + return taskReadySeq; + } + + @Override + public void registerListener() { + watcher.registerListener(this); + } + + @Override + public void removeListener() { + watcher.unregisterListener(this); + } + + + @Override + public void stopProcessingTasks() { + this.shouldStop = true; + + } + + @Override + public boolean isStop() { + return shouldStop; + } + + @Override + public RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key) + throws IOException { + return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, key); + } + + /** + * Asynchronous handler for zk get-data-set-watch on node results. + */ + class GetDataAsyncCallback implements AsyncCallback.DataCallback { + private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + SplitLogCounters.tot_wkr_get_data_result.incrementAndGet(); + if (rc != 0) { + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); + getDataSetWatchFailure(path); + return; + } + data = watcher.getRecoverableZooKeeper().removeMetaData(data); + getDataSetWatchSuccess(path, data); + } + } + + /* + * Next part is related to HLogSplitterHandler + */ + /** + * endTask() can fail and the only way to recover out of it is for the {@link SplitLogManager} to + * timeout the task node. + * @param slt + * @param ctr + */ + @Override + public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) { + ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details; + String task = zkDetails.getTaskNode(); + int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue(); + try { + if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { + LOG.info("successfully transitioned task " + task + " to final state " + slt); + ctr.incrementAndGet(); + return; + } + LOG.warn("failed to transistion task " + task + " to end state " + slt + + " because of version mismatch "); + } catch (KeeperException.BadVersionException bve) { + LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch", + bve); + } catch (KeeperException.NoNodeException e) { + LOG.fatal( + "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e); + } catch (KeeperException e) { + LOG.warn("failed to end task, " + task + " " + slt, e); + } + SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); + } + + /** + * When ZK-based implementation wants to complete the task, it needs to know task znode and + * current znode cversion (needed for subsequent update operation). + */ + public static class ZkSplitTaskDetails implements SplitTaskDetails { + private String taskNode; + private MutableInt curTaskZKVersion; + + public ZkSplitTaskDetails() { + } + + public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) { + this.taskNode = taskNode; + this.curTaskZKVersion = curTaskZKVersion; + } + + public String getTaskNode() { + return taskNode; + } + + public void setTaskNode(String taskNode) { + this.taskNode = taskNode; + } + + public MutableInt getCurTaskZKVersion() { + return curTaskZKVersion; + } + + public void setCurTaskZKVersion(MutableInt curTaskZKVersion) { + this.curTaskZKVersion = curTaskZKVersion; + } + + @Override + public String getWALFile() { + return ZKSplitLog.getFileName(taskNode); + } + } + +}