HBASE-11072 Abstract WAL splitting from ZK (Sergey Soldatov)

This commit is contained in:
stack 2014-09-04 21:35:27 -07:00
parent c0d81e9ad0
commit 66220e4929
22 changed files with 2802 additions and 1873 deletions

View File

@ -353,7 +353,7 @@ public final class HConstants {
/** Default value for cluster ID */ /** Default value for cluster ID */
public static final String CLUSTER_ID_DEFAULT = "default-cluster"; public static final String CLUSTER_ID_DEFAULT = "default-cluster";
/** Parameter name for # days to keep MVCC values during a major compaction */ /** Parameter name for # days to keep MVCC values during a major compaction */
public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period"; public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period";
/** At least to keep MVCC values in hfiles for 5 days */ /** At least to keep MVCC values in hfiles for 5 days */
@ -1017,6 +1017,9 @@ public final class HConstants {
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS = public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class"; "hbase.coordinated.state.manager.class";
/** Configuration key for SplitLog manager timeout */
public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout";
/** /**
* Configuration keys for Bucket cache * Configuration keys for Bucket cache
*/ */

View File

@ -52,7 +52,14 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
@Override @Override
public abstract TableStateManager getTableStateManager() throws InterruptedException, public abstract TableStateManager getTableStateManager() throws InterruptedException,
CoordinatedStateException; CoordinatedStateException;
/**
* Method to retrieve coordination for split log worker
*/
public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination();
/**
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
/** /**
* Method to retrieve coordination for split transaction. * Method to retrieve coordination for split transaction.
*/ */

View File

@ -0,0 +1,221 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import com.google.common.annotations.VisibleForTesting;
/**
* Coordination for SplitLogManager. It creates and works with tasks for split log operations<BR>
* Manager prepares task by calling {@link #prepareTask} and submit it by
* {@link #submitTask(String)}. After that it periodically check the number of remaining tasks by
* {@link #remainingTasksInCoordination()} and waits until it become zero.
* <P>
* Methods required for task life circle: <BR>
* {@link #markRegionsRecovering(ServerName, Set)} mark regions for log replaying. Used by
* {@link MasterFileSystem} <BR>
* {@link #removeRecoveringRegions(Set, Boolean)} make regions cleanup that previous were marked as
* recovering. Called after all tasks processed <BR>
* {@link #removeStaleRecoveringRegions(Set)} remove stale recovering. called by
* {@link MasterFileSystem} after Active Master is initialized <BR>
* {@link #getLastRecoveryTime()} required for garbage collector and should indicate when the last
* recovery has been made<BR>
* {@link #checkTaskStillAvailable(String)} Check that task is still there <BR>
* {@link #checkTasks()} check for unassigned tasks and resubmit them
*/
@InterfaceAudience.Private
public interface SplitLogManagerCoordination {
/**
* Detail class that shares data between coordination and split log manager
*/
public static class SplitLogManagerDetails {
final private ConcurrentMap<String, Task> tasks;
final private MasterServices master;
final private Set<String> failedDeletions;
final private ServerName serverName;
public SplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master,
Set<String> failedDeletions, ServerName serverName) {
this.tasks = tasks;
this.master = master;
this.failedDeletions = failedDeletions;
this.serverName = serverName;
}
/**
* @return the master value
*/
public MasterServices getMaster() {
return master;
}
/**
* @return map of tasks
*/
public ConcurrentMap<String, Task> getTasks() {
return tasks;
}
/**
* @return a set of failed deletions
*/
public Set<String> getFailedDeletions() {
return failedDeletions;
}
/**
* @return server name
*/
public ServerName getServerName() {
return serverName;
}
}
/**
* Provide the configuration from the SplitLogManager
*/
void setDetails(SplitLogManagerDetails details);
/**
* Returns the configuration that was provided previously
*/
SplitLogManagerDetails getDetails();
/**
* Prepare the new task
* @param taskName name of the task
* @return the task id
*/
String prepareTask(String taskName);
/**
* Mark regions in recovering state for distributed log replay
* @param serverName server name
* @param userRegions set of regions to be marked
* @throws IOException in case of failure
* @throws InterruptedIOException
*/
void markRegionsRecovering(final ServerName serverName, Set<HRegionInfo> userRegions)
throws IOException, InterruptedIOException;
/**
* tells Coordination that it should check for new tasks
*/
void checkTasks();
/**
* It removes recovering regions from Coordination
* @param serverNames servers which are just recovered
* @param isMetaRecovery whether current recovery is for the meta region on
* <code>serverNames<code>
*/
void removeRecoveringRegions(Set<String> serverNames, Boolean isMetaRecovery) throws IOException;
/**
* Return the number of remaining tasks
*/
int remainingTasksInCoordination();
/**
* Check that the task is still there
* @param task node to check
*/
void checkTaskStillAvailable(String task);
/**
* Change the recovery mode.
* @param b the recovery mode state
* @throws InterruptedIOException
* @throws IOException in case of failure
*/
void setRecoveryMode(boolean b) throws InterruptedIOException, IOException;
/**
* Removes known stale servers
* @param knownServers set of previously failed servers
* @throws IOException in case of failure
* @throws InterruptedIOException
*/
void removeStaleRecoveringRegions(Set<String> knownServers) throws IOException,
InterruptedIOException;
/**
* Resubmit the task in case if found unassigned or failed
* @param taskName path related to task
* @param task to resubmit
* @param force whether it should be forced
* @return whether it was successful
*/
boolean resubmitTask(String taskName, Task task, ResubmitDirective force);
/**
* @param taskName to be submitted
*/
void submitTask(String taskName);
/**
* @param taskName to be removed
*/
void deleteTask(String taskName);
/**
* @return shows whether the log recovery mode is in replaying state
*/
boolean isReplaying();
/**
* @return shows whether the log recovery mode is in splitting state
*/
boolean isSplitting();
/**
* @return the time of last attempt to recover
*/
long getLastRecoveryTime();
/**
* Temporary function, mostly for UTs. In the regular code isReplaying or isSplitting should be
* used.
* @return the current log recovery mode.
*/
RecoveryMode getRecoveryMode();
/**
* Support method to init constants such as timeout. Mostly required for UTs.
* @throws IOException
*/
@VisibleForTesting
void init() throws IOException;
}

View File

@ -0,0 +1,141 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import com.google.common.annotations.VisibleForTesting;
/**
* Coordinated operations for {@link SplitLogWorker} and {@link HLogSplitterHandler} Important
* methods for SplitLogWorker: <BR>
* {@link #isReady()} called from {@link SplitLogWorker#run()} to check whether the coordination is
* ready to supply the tasks <BR>
* {@link #taskLoop()} loop for new tasks until the worker is stopped <BR>
* {@link #isStop()} a flag indicates whether worker should finish <BR>
* {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener
* for external changes in coordination (if required) <BR>
* {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that
* <p>
* Important methods for HLogSplitterHandler: <BR>
* splitting task has completed.
*/
@InterfaceAudience.Private
public interface SplitLogWorkerCoordination {
/* SplitLogWorker part */
public static final int DEFAULT_MAX_SPLITTERS = 2;
/**
* Initialize internal values. This method should be used when corresponding SplitLogWorker
* instance is created
* @param server instance of RegionServerServices to work with
* @param conf is current configuration.
* @param splitTaskExecutor split executor from SplitLogWorker
* @param worker instance of SplitLogWorker
*/
void init(RegionServerServices server, Configuration conf,
TaskExecutor splitTaskExecutor, SplitLogWorker worker);
/**
* called when Coordination should stop processing tasks and exit
*/
void stopProcessingTasks();
/**
* @return the current value of exitWorker
*/
boolean isStop();
/**
* Wait for the new tasks and grab one
* @throws InterruptedException if the SplitLogWorker was stopped
*/
void taskLoop() throws InterruptedException;
/**
* marks log file as corrupted
* @param rootDir where to find the log
* @param name of the log
* @param fs file system
*/
void markCorrupted(Path rootDir, String name, FileSystem fs);
/**
* Check whether the log splitter is ready to supply tasks
* @return false if there is no tasks
* @throws InterruptedException if the SplitLogWorker was stopped
*/
boolean isReady() throws InterruptedException;
/**
* Used by unit tests to check how many tasks were processed
* @return number of tasks
*/
@VisibleForTesting
int getTaskReadySeq();
/**
* set the listener for task changes. Implementation specific
*/
void registerListener();
/**
* remove the listener for task changes. Implementation specific
*/
void removeListener();
/* HLogSplitterHandler part */
/**
* Notify coordination engine that splitting task has completed.
* @param slt See {@link SplitLogTask}
* @param ctr counter to be updated
* @param splitTaskDetails details about log split task (specific to coordination engine being
* used).
*/
void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails);
/**
* Interface for log-split tasks Used to carry implementation details in encapsulated way through
* Handlers to the coordination API.
*/
static interface SplitTaskDetails {
/**
* @return full file path in HDFS for the WAL file to be split.
*/
String getWALFile();
}
RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key)
throws IOException;
}

View File

@ -37,6 +37,8 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
protected ZooKeeperWatcher watcher; protected ZooKeeperWatcher watcher;
protected SplitTransactionCoordination splitTransactionCoordination; protected SplitTransactionCoordination splitTransactionCoordination;
protected CloseRegionCoordination closeRegionCoordination; protected CloseRegionCoordination closeRegionCoordination;
protected SplitLogWorkerCoordination splitLogWorkerCoordination;
protected SplitLogManagerCoordination splitLogManagerCoordination;
protected OpenRegionCoordination openRegionCoordination; protected OpenRegionCoordination openRegionCoordination;
protected RegionMergeCoordination regionMergeCoordination; protected RegionMergeCoordination regionMergeCoordination;
@ -44,7 +46,8 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
public void initialize(Server server) { public void initialize(Server server) {
this.server = server; this.server = server;
this.watcher = server.getZooKeeper(); this.watcher = server.getZooKeeper();
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher);
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher);
splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher); splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher); closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
openRegionCoordination = new ZkOpenRegionCoordination(this, watcher); openRegionCoordination = new ZkOpenRegionCoordination(this, watcher);
@ -66,6 +69,15 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
} }
} }
@Override
public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
return splitLogWorkerCoordination;
}
@Override
public SplitLogManagerCoordination getSplitLogManagerCoordination() {
return splitLogManagerCoordination;
}
@Override @Override
public SplitTransactionCoordination getSplitTransactionCoordination() { public SplitTransactionCoordination getSplitTransactionCoordination() {
return splitTransactionCoordination; return splitTransactionCoordination;

View File

@ -0,0 +1,654 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
/**
* ZooKeeper based implementation of {@link SplitLogWorkerCoordination}
* It listen for changes in ZooKeeper and
*
*/
@InterfaceAudience.Private
public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
SplitLogWorkerCoordination {
private static final Log LOG = LogFactory.getLog(ZkSplitLogWorkerCoordination.class);
private static final int checkInterval = 5000; // 5 seconds
private static final int FAILED_TO_OWN_TASK = -1;
private SplitLogWorker worker;
private TaskExecutor splitTaskExecutor;
private final Object taskReadyLock = new Object();
volatile int taskReadySeq = 0;
private volatile String currentTask = null;
private int currentVersion;
private volatile boolean shouldStop = false;
private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
private int reportPeriod;
private RegionServerServices server = null;
protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
private int maxConcurrentTasks = 0;
private final ZkCoordinatedStateManager manager;
public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager,
ZooKeeperWatcher watcher) {
super(watcher);
manager = zkCoordinatedStateManager;
}
/**
* Override handler from {@link ZooKeeperListener}
*/
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.splitLogZNode)) {
LOG.debug("tasks arrived or departed");
synchronized (taskReadyLock) {
taskReadySeq++;
taskReadyLock.notify();
}
}
}
/**
* Override handler from {@link ZooKeeperListener}
*/
@Override
public void nodeDataChanged(String path) {
// there will be a self generated dataChanged event every time attemptToOwnTask()
// heartbeats the task znode by upping its version
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
getDataSetWatchAsync();
}
}
}
}
/**
* Override setter from {@link SplitLogWorkerCoordination}
*/
@Override
public void init(RegionServerServices server, Configuration conf,
TaskExecutor splitExecutor, SplitLogWorker worker) {
this.server = server;
this.worker = worker;
this.splitTaskExecutor = splitExecutor;
maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
reportPeriod =
conf.getInt("hbase.splitlog.report.period",
conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
}
/* Support functions for Zookeeper async callback */
void getDataSetWatchFailure(String path) {
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change but that's ok
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
LOG.info("retrying data watch on " + path);
SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
getDataSetWatchAsync();
} else {
// no point setting a watch on the task which this worker is not
// working upon anymore
}
}
}
}
public void getDataSetWatchAsync() {
watcher.getRecoverableZooKeeper().getZooKeeper()
.getData(currentTask, watcher, new GetDataAsyncCallback(), null);
SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
}
void getDataSetWatchSuccess(String path, byte[] data) {
SplitLogTask slt;
try {
slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse", e);
return;
}
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change but that's ok
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
ServerName serverName = manager.getServer().getServerName();
// have to compare data. cannot compare version because then there
// will be race with attemptToOwnTask()
// cannot just check whether the node has been transitioned to
// UNASSIGNED because by the time this worker sets the data watch
// the node might have made two transitions - from owned by this
// worker to unassigned to owned by another worker
if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
&& !slt.isResigned(serverName)) {
LOG.info("task " + taskpath + " preempted from " + serverName
+ ", current task state and owner=" + slt.toString());
worker.stopTask();
}
}
}
}
}
/**
* try to grab a 'lock' on the task zk node to own and execute the task.
* <p>
* @param path zk node for the task
*/
private void grabTask(String path) {
Stat stat = new Stat();
byte[] data;
synchronized (grabTaskLock) {
currentTask = path;
workerInGrabTask = true;
if (Thread.interrupted()) {
return;
}
}
try {
try {
if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
return;
}
} catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
SplitLogTask slt;
try {
slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
if (!slt.isUnassigned()) {
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
return;
}
currentVersion =
attemptToOwnTask(true, watcher, server.getServerName(), path,
slt.getMode(), stat.getVersion());
if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
splitTaskDetails.setTaskNode(currentTask);
splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
return;
}
LOG.info("worker " + server.getServerName() + " acquired task " + path);
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
submitTask(path, slt.getMode(), currentVersion, reportPeriod);
// after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
try {
int sleepTime = RandomUtils.nextInt(500) + 500;
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
LOG.warn("Interrupted while yielding for other region servers", e);
Thread.currentThread().interrupt();
}
} finally {
synchronized (grabTaskLock) {
workerInGrabTask = false;
// clear the interrupt from stopTask() otherwise the next task will
// suffer
Thread.interrupted();
}
}
}
/**
* Submit a log split task to executor service
* @param curTask task to submit
* @param curTaskZKVersion current version of task
*/
void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
final int reportPeriod) {
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
CancelableProgressable reporter = new CancelableProgressable() {
private long last_report_at = 0;
@Override
public boolean progress() {
long t = EnvironmentEdgeManager.currentTime();
if ((t - last_report_at) > reportPeriod) {
last_report_at = t;
int latestZKVersion =
attemptToOwnTask(false, watcher, server.getServerName(), curTask,
mode, zkVersion.intValue());
if (latestZKVersion < 0) {
LOG.warn("Failed to heartbeat the task" + curTask);
return false;
}
zkVersion.setValue(latestZKVersion);
}
return true;
}
};
ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
splitTaskDetails.setTaskNode(curTask);
splitTaskDetails.setCurTaskZKVersion(zkVersion);
HLogSplitterHandler hsh =
new HLogSplitterHandler(server, this, splitTaskDetails, reporter,
this.tasksInProgress, splitTaskExecutor, mode);
server.getExecutorService().submit(hsh);
}
/**
* This function calculates how many splitters it could create based on expected average tasks per
* RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
* @param numTasks current total number of available tasks
*/
private int calculateAvailableSplitters(int numTasks) {
// at lease one RS(itself) available
int availableRSs = 1;
try {
List<String> regionServers =
ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
} catch (KeeperException e) {
// do nothing
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
}
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
// calculate how many more splitters we could spawn
return Math.min(expectedTasksPerRS, maxConcurrentTasks)
- this.tasksInProgress.get();
}
/**
* Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
* <p>
* This method is also used to periodically heartbeat the task progress by transitioning the node
* from OWNED to OWNED.
* <p>
* @param isFirstTime shows whther it's the first attempt.
* @param zkw zk wathcer
* @param server name
* @param task to own
* @param taskZKVersion version of the task in zk
* @return non-negative integer value when task can be owned by current region server otherwise -1
*/
protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
int latestZKVersion = FAILED_TO_OWN_TASK;
try {
SplitLogTask slt = new SplitLogTask.Owned(server, mode);
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + task);
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return FAILED_TO_OWN_TASK;
}
latestZKVersion = stat.getVersion();
SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
return latestZKVersion;
} catch (KeeperException e) {
if (!isFirstTime) {
if (e.code().equals(KeeperException.Code.NONODE)) {
LOG.warn("NONODE failed to assert ownership for " + task, e);
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
LOG.warn("BADVERSION failed to assert ownership for " + task, e);
} else {
LOG.warn("failed to assert ownership for " + task, e);
}
}
} catch (InterruptedException e1) {
LOG.warn("Interrupted while trying to assert ownership of " + task + " "
+ StringUtils.stringifyException(e1));
Thread.currentThread().interrupt();
}
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return FAILED_TO_OWN_TASK;
}
/**
* Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This
* policy puts an upper-limit on the number of simultaneous log splitting that could be happening
* in a cluster.
* <p>
* Synchronization using {@link #taskReadyLock} ensures that it will try to grab every task that
* has been put up
* @throws InterruptedException
*/
@Override
public void taskLoop() throws InterruptedException {
while (!shouldStop) {
int seq_start = taskReadySeq;
List<String> paths = null;
paths = getTaskList();
if (paths == null) {
LOG.warn("Could not get tasks, did someone remove " + watcher.splitLogZNode
+ " ... worker thread exiting.");
return;
}
// pick meta wal firstly
int offset = (int) (Math.random() * paths.size());
for (int i = 0; i < paths.size(); i++) {
if (HLogUtil.isMetaFile(paths.get(i))) {
offset = i;
break;
}
}
int numTasks = paths.size();
for (int i = 0; i < numTasks; i++) {
int idx = (i + offset) % paths.size();
// don't call ZKSplitLog.getNodeName() because that will lead to
// double encoding of the path name
if (this.calculateAvailableSplitters(numTasks) > 0) {
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
} else {
LOG.debug("Current region server " + server.getServerName() + " has "
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
break;
}
if (shouldStop) {
return;
}
}
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq) {
taskReadyLock.wait(checkInterval);
if (server != null) {
// check to see if we have stale recovering regions in our internal memory state
Map<String, HRegion> recoveringRegions = server.getRecoveringRegions();
if (!recoveringRegions.isEmpty()) {
// Make a local copy to prevent ConcurrentModificationException when other threads
// modify recoveringRegions
List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
for (String region : tmpCopy) {
String nodePath =
ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
try {
if (ZKUtil.checkExists(watcher, nodePath) == -1) {
HRegion r = recoveringRegions.remove(region);
if (r != null) {
r.setRecovering(false);
}
LOG.debug("Mark recovering region:" + region + " up.");
} else {
// current check is a defensive(or redundant) mechanism to prevent us from
// having stale recovering regions in our internal RS memory state while
// zookeeper(source of truth) says differently. We stop at the first good one
// because we should not have a single instance such as this in normal case so
// check the first one is good enough.
break;
}
} catch (KeeperException e) {
// ignore zookeeper error
LOG.debug("Got a zookeeper when trying to open a recovering region", e);
break;
}
}
}
}
}
}
}
}
private List<String> getTaskList() throws InterruptedException {
List<String> childrenPaths = null;
long sleepTime = 1000;
// It will be in loop till it gets the list of children or
// it will come out if worker thread exited.
while (!shouldStop) {
try {
childrenPaths =
ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.splitLogZNode);
if (childrenPaths != null) {
return childrenPaths;
}
} catch (KeeperException e) {
LOG.warn("Could not get children of znode " + watcher.splitLogZNode, e);
}
LOG.debug("Retry listChildren of znode " + watcher.splitLogZNode
+ " after sleep for " + sleepTime + "ms!");
Thread.sleep(sleepTime);
}
return childrenPaths;
}
@Override
public void markCorrupted(Path rootDir, String name, FileSystem fs) {
ZKSplitLog.markCorrupted(rootDir, name, fs);
}
@Override
public boolean isReady() throws InterruptedException {
int result = -1;
try {
result = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
} catch (KeeperException e) {
// ignore
LOG.warn("Exception when checking for " + watcher.splitLogZNode
+ " ... retrying", e);
}
if (result == -1) {
LOG.info(watcher.splitLogZNode
+ " znode does not exist, waiting for master to create");
Thread.sleep(1000);
}
return (result != -1);
}
@Override
public int getTaskReadySeq() {
return taskReadySeq;
}
@Override
public void registerListener() {
watcher.registerListener(this);
}
@Override
public void removeListener() {
watcher.unregisterListener(this);
}
@Override
public void stopProcessingTasks() {
this.shouldStop = true;
}
@Override
public boolean isStop() {
return shouldStop;
}
@Override
public RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key)
throws IOException {
return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, key);
}
/**
* Asynchronous handler for zk get-data-set-watch on node results.
*/
class GetDataAsyncCallback implements AsyncCallback.DataCallback {
private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
if (rc != 0) {
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
getDataSetWatchFailure(path);
return;
}
data = watcher.getRecoverableZooKeeper().removeMetaData(data);
getDataSetWatchSuccess(path, data);
}
}
/*
* Next part is related to HLogSplitterHandler
*/
/**
* endTask() can fail and the only way to recover out of it is for the {@link SplitLogManager} to
* timeout the task node.
* @param slt
* @param ctr
*/
@Override
public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) {
ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
String task = zkDetails.getTaskNode();
int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
try {
if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
LOG.info("successfully transitioned task " + task + " to final state " + slt);
ctr.incrementAndGet();
return;
}
LOG.warn("failed to transistion task " + task + " to end state " + slt
+ " because of version mismatch ");
} catch (KeeperException.BadVersionException bve) {
LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
bve);
} catch (KeeperException.NoNodeException e) {
LOG.fatal(
"logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
} catch (KeeperException e) {
LOG.warn("failed to end task, " + task + " " + slt, e);
}
SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
}
/**
* When ZK-based implementation wants to complete the task, it needs to know task znode and
* current znode cversion (needed for subsequent update operation).
*/
public static class ZkSplitTaskDetails implements SplitTaskDetails {
private String taskNode;
private MutableInt curTaskZKVersion;
public ZkSplitTaskDetails() {
}
public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
this.taskNode = taskNode;
this.curTaskZKVersion = curTaskZKVersion;
}
public String getTaskNode() {
return taskNode;
}
public void setTaskNode(String taskNode) {
this.taskNode = taskNode;
}
public MutableInt getCurTaskZKVersion() {
return curTaskZKVersion;
}
public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
this.curTaskZKVersion = curTaskZKVersion;
}
@Override
public String getWALFile() {
return ZKSplitLog.getFileName(taskNode);
}
}
}

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.zookeeper.KeeperException;
/** /**
* This class abstracts a bunch of operations the HMaster needs to interact with * This class abstracts a bunch of operations the HMaster needs to interact with
@ -91,12 +90,14 @@ public class MasterFileSystem {
private final MasterServices services; private final MasterServices services;
final static PathFilter META_FILTER = new PathFilter() { final static PathFilter META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) { public boolean accept(Path p) {
return HLogUtil.isMetaFile(p); return HLogUtil.isMetaFile(p);
} }
}; };
final static PathFilter NON_META_FILTER = new PathFilter() { final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) { public boolean accept(Path p) {
return !HLogUtil.isMetaFile(p); return !HLogUtil.isMetaFile(p);
} }
@ -123,14 +124,10 @@ public class MasterFileSystem {
// set up the archived logs path // set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout(); this.oldLogDir = createInitialFileSystemLayout();
HFileSystem.addLocationsOrderInterceptor(conf); HFileSystem.addLocationsOrderInterceptor(conf);
try { this.splitLogManager =
this.splitLogManager = new SplitLogManager(master.getZooKeeper(), new SplitLogManager(master, master.getConfiguration(), master, services,
master.getConfiguration(), master, services, master.getServerName());
master.getServerName()); this.distributedLogReplay = this.splitLogManager.isLogReplaying();
} catch (KeeperException e) {
throw new IOException(e);
}
this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
} }
/** /**
@ -350,11 +347,7 @@ public class MasterFileSystem {
if (regions == null || regions.isEmpty()) { if (regions == null || regions.isEmpty()) {
return; return;
} }
try { this.splitLogManager.markRegionsRecovering(serverName, regions);
this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);
} catch (KeeperException e) {
throw new IOException(e);
}
} }
public void splitLog(final Set<ServerName> serverNames) throws IOException { public void splitLog(final Set<ServerName> serverNames) throws IOException {
@ -362,13 +355,13 @@ public class MasterFileSystem {
} }
/** /**
* Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegionsFromZK(Set)} * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
* @param failedServers * @param failedServers
* @throws KeeperException * @throws IOException
*/ */
void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers) void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
throws KeeperException, InterruptedIOException { throws IOException, InterruptedIOException {
this.splitLogManager.removeStaleRecoveringRegionsFromZK(failedServers); this.splitLogManager.removeStaleRecoveringRegions(failedServers);
} }
/** /**
@ -459,7 +452,7 @@ public class MasterFileSystem {
org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
.migrateFSTableDescriptorsIfNecessary(fs, rd); .migrateFSTableDescriptorsIfNecessary(fs, rd);
} }
// Create tableinfo-s for hbase:meta if not already there. // Create tableinfo-s for hbase:meta if not already there.
new FSTableDescriptors(fs, rd).createTableDescriptor(HTableDescriptor.META_TABLEDESC); new FSTableDescriptors(fs, rd).createTableDescriptor(HTableDescriptor.META_TABLEDESC);
@ -650,15 +643,10 @@ public class MasterFileSystem {
/** /**
* The function is used in SSH to set recovery mode based on configuration after all outstanding * The function is used in SSH to set recovery mode based on configuration after all outstanding
* log split tasks drained. * log split tasks drained.
* @throws KeeperException * @throws IOException
* @throws InterruptedIOException
*/ */
public void setLogRecoveryMode() throws IOException { public void setLogRecoveryMode() throws IOException {
try {
this.splitLogManager.setRecoveryMode(false); this.splitLogManager.setRecoveryMode(false);
} catch (KeeperException e) {
throw new IOException(e);
}
} }
public RecoveryMode getLogRecoveryMode() { public RecoveryMode getLogRecoveryMode() {

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination; import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -90,7 +91,6 @@ import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher; import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -1525,8 +1526,8 @@ public class HRegionServer extends HasThread implements
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
} }
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS)); "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller", Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler); uncaughtExceptionHandler);
@ -1579,7 +1580,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this);
splitLogWorker.start(); splitLogWorker.start();
} }
@ -2855,7 +2856,7 @@ public class HRegionServer extends HasThread implements
minSeqIdForLogReplay = storeSeqIdForReplay; minSeqIdForLogReplay = storeSeqIdForReplay;
} }
} }
try { try {
long lastRecordedFlushedSequenceId = -1; long lastRecordedFlushedSequenceId = -1;
String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
@ -2868,7 +2869,7 @@ public class HRegionServer extends HasThread implements
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
if (data != null) { if (data != null) {
lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
} }
if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) { if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay)); ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
@ -2881,11 +2882,11 @@ public class HRegionServer extends HasThread implements
LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
+ previousRSName); + previousRSName);
} else { } else {
LOG.warn("Can't find failed region server for recovering region " + LOG.warn("Can't find failed region server for recovering region " +
region.getEncodedName()); region.getEncodedName());
} }
} catch (NoNodeException ignore) { } catch (NoNodeException ignore) {
LOG.debug("Region " + region.getEncodedName() + LOG.debug("Region " + region.getEncodedName() +
" must have completed recovery because its recovery znode has been removed", ignore); " must have completed recovery because its recovery znode has been removed", ignore);
} }
} }

View File

@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@ -160,6 +159,7 @@ import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -1306,7 +1306,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (previous == null) { if (previous == null) {
// check if the region to be opened is marked in recovering state in ZK // check if the region to be opened is marked in recovering state in ZK
if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
region.getEncodedName())) { region.getEncodedName())) {
// check if current region open is for distributedLogReplay. This check is to support // check if current region open is for distributedLogReplay. This check is to support
// rolling restart/upgrade where we want to Master/RS see same configuration // rolling restart/upgrade where we want to Master/RS see same configuration
@ -1318,7 +1318,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// could happen when turn distributedLogReplay off from on. // could happen when turn distributedLogReplay off from on.
List<String> tmpRegions = new ArrayList<String>(); List<String> tmpRegions = new ArrayList<String>();
tmpRegions.add(region.getEncodedName()); tmpRegions.add(region.getEncodedName());
SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions); ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
tmpRegions);
} }
} }
// If there is no action in progress, we can submit a specific handler. // If there is no action in progress, we can submit a specific handler.

View File

@ -22,111 +22,69 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
/** /**
* This worker is spawned in every regionserver (should we also spawn one in * This worker is spawned in every regionserver, including master. The Worker waits for log
* the master?). The Worker waits for log splitting tasks to be put up by the * splitting tasks to be put up by the {@link SplitLogManager} running in the master and races with
* {@link SplitLogManager} running in the master and races with other workers * other workers in other serves to acquire those tasks. The coordination is done via coordination
* in other serves to acquire those tasks. The coordination is done via * engine.
* zookeeper. All the action takes place at /hbase/splitlog znode.
* <p> * <p>
* If a worker has successfully moved the task from state UNASSIGNED to * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task.
* OWNED then it owns the task. It keeps heart beating the manager by * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED
* periodically moving the task from UNASSIGNED to OWNED state. On success it * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to
* moves the task to TASK_DONE. On unrecoverable error it moves task state to * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to
* ERR. If it cannot continue but wants the master to retry the task then it * RESIGNED.
* moves the task state to RESIGNED.
* <p> * <p>
* The manager can take a task away from a worker by moving the task from * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In
* OWNED to UNASSIGNED. In the absence of a global lock there is a * the absence of a global lock there is a unavoidable race here - a worker might have just finished
* unavoidable race here - a worker might have just finished its task when it * its task when it is stripped of its ownership. Here we rely on the idempotency of the log
* is stripped of its ownership. Here we rely on the idempotency of the log
* splitting task for correctness * splitting task for correctness
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class SplitLogWorker extends ZooKeeperListener implements Runnable { public class SplitLogWorker implements Runnable {
public static final int DEFAULT_MAX_SPLITTERS = 2;
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
private static final int checkInterval = 5000; // 5 seconds
private static final int FAILED_TO_OWN_TASK = -1;
Thread worker; Thread worker;
private final ServerName serverName;
private final TaskExecutor splitTaskExecutor;
// thread pool which executes recovery work // thread pool which executes recovery work
private final ExecutorService executorService; private SplitLogWorkerCoordination coordination;
private Configuration conf;
private final Object taskReadyLock = new Object(); private RegionServerServices server;
volatile int taskReadySeq = 0; public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
private volatile String currentTask = null;
private int currentVersion;
private volatile boolean exitWorker;
private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
private final int report_period;
private RegionServerServices server = null;
private Configuration conf = null;
protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
private int maxConcurrentTasks = 0;
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) { TaskExecutor splitTaskExecutor) {
super(watcher);
this.server = server; this.server = server;
this.serverName = server.getServerName();
this.splitTaskExecutor = splitTaskExecutor;
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
this.conf = conf; this.conf = conf;
this.executorService = this.server.getExecutorService(); this.coordination =
this.maxConcurrentTasks = ((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); .getSplitLogWorkerCoordination();
this.server = server;
coordination.init(server, conf, splitTaskExecutor, this);
} }
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, public SplitLogWorker(final Server hserver, final Configuration conf,
final RegionServerServices server, final LastSequenceId sequenceIdChecker) { final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
this(watcher, conf, server, new TaskExecutor() { this(server, conf, server, new TaskExecutor() {
@Override @Override
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) { public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
Path rootdir; Path rootdir;
@ -143,7 +101,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// encountered a bad non-retry-able persistent error. // encountered a bad non-retry-able persistent error.
try { try {
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) { fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) {
return Status.PREEMPTED; return Status.PREEMPTED;
} }
} catch (InterruptedIOException iioe) { } catch (InterruptedIOException iioe) {
@ -160,9 +118,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
} else if (cause instanceof InterruptedException) { } else if (cause instanceof InterruptedException) {
LOG.warn("log splitting of " + filename + " interrupted, resigning", e); LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
return Status.RESIGNED; return Status.RESIGNED;
} else if(cause instanceof KeeperException) {
LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
return Status.RESIGNED;
} }
LOG.warn("log splitting of " + filename + " failed, returning error", e); LOG.warn("log splitting of " + filename + " failed, returning error", e);
return Status.ERR; return Status.ERR;
@ -175,32 +130,22 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
@Override @Override
public void run() { public void run() {
try { try {
LOG.info("SplitLogWorker " + this.serverName + " starting"); LOG.info("SplitLogWorker " + server.getServerName() + " starting");
this.watcher.registerListener(this); coordination.registerListener();
// pre-initialize a new connection for splitlogworker configuration // pre-initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf); HConnectionManager.getConnection(conf);
// wait for master to create the splitLogZnode // wait for Coordination Engine is ready
int res = -1; boolean res = false;
while (res == -1 && !exitWorker) { while (!res && !coordination.isStop()) {
try { res = coordination.isReady();
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
} catch (KeeperException e) {
// ignore
LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e);
}
if (res == -1) {
LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
Thread.sleep(1000);
}
} }
if (!coordination.isStop()) {
if (!exitWorker) { coordination.taskLoop();
taskLoop();
} }
} catch (Throwable t) { } catch (Throwable t) {
if (ExceptionUtil.isInterrupt(t)) { if (ExceptionUtil.isInterrupt(t)) {
LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" : LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" :
" (ERROR: exitWorker is not set, exiting anyway)")); " (ERROR: exitWorker is not set, exiting anyway)"));
} else { } else {
// only a logical error can cause here. Printing it out // only a logical error can cause here. Printing it out
@ -208,394 +153,24 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
LOG.error("unexpected error ", t); LOG.error("unexpected error ", t);
} }
} finally { } finally {
LOG.info("SplitLogWorker " + this.serverName + " exiting"); coordination.removeListener();
LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
} }
} }
/**
* Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
* one at a time. This policy puts an upper-limit on the number of
* simultaneous log splitting that could be happening in a cluster.
* <p>
* Synchronization using {@link #taskReadyLock} ensures that it will
* try to grab every task that has been put up
*/
private void taskLoop() throws InterruptedException {
while (!exitWorker) {
int seq_start = taskReadySeq;
List<String> paths = getTaskList();
if (paths == null) {
LOG.warn("Could not get tasks, did someone remove " +
this.watcher.splitLogZNode + " ... worker thread exiting.");
return;
}
// pick meta wal firstly
int offset = (int) (Math.random() * paths.size());
for(int i = 0; i < paths.size(); i ++){
if(HLogUtil.isMetaFile(paths.get(i))) {
offset = i;
break;
}
}
int numTasks = paths.size();
for (int i = 0; i < numTasks; i++) {
int idx = (i + offset) % paths.size();
// don't call ZKSplitLog.getNodeName() because that will lead to
// double encoding of the path name
if (this.calculateAvailableSplitters(numTasks) > 0) {
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
} else {
LOG.debug("Current region server " + this.serverName + " has "
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
break;
}
if (exitWorker) {
return;
}
}
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq) {
taskReadyLock.wait(checkInterval);
if (this.server != null) {
// check to see if we have stale recovering regions in our internal memory state
Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
if (!recoveringRegions.isEmpty()) {
// Make a local copy to prevent ConcurrentModificationException when other threads
// modify recoveringRegions
List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
for (String region : tmpCopy) {
String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
try {
if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
HRegion r = recoveringRegions.remove(region);
if (r != null) {
r.setRecovering(false);
}
LOG.debug("Mark recovering region:" + region + " up.");
} else {
// current check is a defensive(or redundant) mechanism to prevent us from
// having stale recovering regions in our internal RS memory state while
// zookeeper(source of truth) says differently. We stop at the first good one
// because we should not have a single instance such as this in normal case so
// check the first one is good enough.
break;
}
} catch (KeeperException e) {
// ignore zookeeper error
LOG.debug("Got a zookeeper when trying to open a recovering region", e);
break;
}
}
}
}
}
}
}
}
/**
* try to grab a 'lock' on the task zk node to own and execute the task.
* <p>
* @param path zk node for the task
*/
private void grabTask(String path) {
Stat stat = new Stat();
byte[] data;
synchronized (grabTaskLock) {
currentTask = path;
workerInGrabTask = true;
if (Thread.interrupted()) {
return;
}
}
try {
try {
if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
return;
}
} catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
SplitLogTask slt;
try {
slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
if (!slt.isUnassigned()) {
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
return;
}
currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(),
stat.getVersion());
if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
return;
}
LOG.info("worker " + serverName + " acquired task " + path);
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
submitTask(path, slt.getMode(), currentVersion, this.report_period);
// after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
try {
int sleepTime = RandomUtils.nextInt(500) + 500;
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
LOG.warn("Interrupted while yielding for other region servers", e);
Thread.currentThread().interrupt();
}
} finally {
synchronized (grabTaskLock) {
workerInGrabTask = false;
// clear the interrupt from stopTask() otherwise the next task will
// suffer
Thread.interrupted();
}
}
}
/**
* Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
* <p>
* This method is also used to periodically heartbeat the task progress by transitioning the node
* from OWNED to OWNED.
* <p>
* @param isFirstTime
* @param zkw
* @param server
* @param task
* @param taskZKVersion
* @return non-negative integer value when task can be owned by current region server otherwise -1
*/
protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
int latestZKVersion = FAILED_TO_OWN_TASK;
try {
SplitLogTask slt = new SplitLogTask.Owned(server, mode);
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + task);
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return FAILED_TO_OWN_TASK;
}
latestZKVersion = stat.getVersion();
SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
return latestZKVersion;
} catch (KeeperException e) {
if (!isFirstTime) {
if (e.code().equals(KeeperException.Code.NONODE)) {
LOG.warn("NONODE failed to assert ownership for " + task, e);
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
LOG.warn("BADVERSION failed to assert ownership for " + task, e);
} else {
LOG.warn("failed to assert ownership for " + task, e);
}
}
} catch (InterruptedException e1) {
LOG.warn("Interrupted while trying to assert ownership of " +
task + " " + StringUtils.stringifyException(e1));
Thread.currentThread().interrupt();
}
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return FAILED_TO_OWN_TASK;
}
/**
* This function calculates how many splitters it could create based on expected average tasks per
* RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
* @param numTasks current total number of available tasks
*/
private int calculateAvailableSplitters(int numTasks) {
// at lease one RS(itself) available
int availableRSs = 1;
try {
List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
} catch (KeeperException e) {
// do nothing
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
}
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
// calculate how many more splitters we could spawn
return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
}
/**
* Submit a log split task to executor service
* @param curTask
* @param curTaskZKVersion
*/
void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
final int reportPeriod) {
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
CancelableProgressable reporter = new CancelableProgressable() {
private long last_report_at = 0;
@Override
public boolean progress() {
long t = EnvironmentEdgeManager.currentTime();
if ((t - last_report_at) > reportPeriod) {
last_report_at = t;
int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode,
zkVersion.intValue());
if (latestZKVersion < 0) {
LOG.warn("Failed to heartbeat the task" + curTask);
return false;
}
zkVersion.setValue(latestZKVersion);
}
return true;
}
};
HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter,
this.tasksInProgress, this.splitTaskExecutor, mode);
this.executorService.submit(hsh);
}
void getDataSetWatchAsync() {
this.watcher.getRecoverableZooKeeper().getZooKeeper().
getData(currentTask, this.watcher,
new GetDataAsyncCallback(), null);
SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
}
void getDataSetWatchSuccess(String path, byte[] data) {
SplitLogTask slt;
try {
slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse", e);
return;
}
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change but that's ok
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
// have to compare data. cannot compare version because then there
// will be race with attemptToOwnTask()
// cannot just check whether the node has been transitioned to
// UNASSIGNED because by the time this worker sets the data watch
// the node might have made two transitions - from owned by this
// worker to unassigned to owned by another worker
if (! slt.isOwned(this.serverName) &&
! slt.isDone(this.serverName) &&
! slt.isErr(this.serverName) &&
! slt.isResigned(this.serverName)) {
LOG.info("task " + taskpath + " preempted from " +
serverName + ", current task state and owner=" + slt.toString());
stopTask();
}
}
}
}
}
void getDataSetWatchFailure(String path) {
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change but that's ok
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
LOG.info("retrying data watch on " + path);
SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
getDataSetWatchAsync();
} else {
// no point setting a watch on the task which this worker is not
// working upon anymore
}
}
}
}
@Override
public void nodeDataChanged(String path) {
// there will be a self generated dataChanged event every time attemptToOwnTask()
// heartbeats the task znode by upping its version
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change
String taskpath = currentTask;
if (taskpath!= null && taskpath.equals(path)) {
getDataSetWatchAsync();
}
}
}
}
private List<String> getTaskList() throws InterruptedException {
List<String> childrenPaths = null;
long sleepTime = 1000;
// It will be in loop till it gets the list of children or
// it will come out if worker thread exited.
while (!exitWorker) {
try {
childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
this.watcher.splitLogZNode);
if (childrenPaths != null) {
return childrenPaths;
}
} catch (KeeperException e) {
LOG.warn("Could not get children of znode "
+ this.watcher.splitLogZNode, e);
}
LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
+ " after sleep for " + sleepTime + "ms!");
Thread.sleep(sleepTime);
}
return childrenPaths;
}
@Override
public void nodeChildrenChanged(String path) {
if(path.equals(watcher.splitLogZNode)) {
LOG.debug("tasks arrived or departed");
synchronized (taskReadyLock) {
taskReadySeq++;
taskReadyLock.notify();
}
}
}
/** /**
* If the worker is doing a task i.e. splitting a log file then stop the task. * If the worker is doing a task i.e. splitting a log file then stop the task.
* It doesn't exit the worker thread. * It doesn't exit the worker thread.
*/ */
void stopTask() { public void stopTask() {
LOG.info("Sending interrupt to stop the worker thread"); LOG.info("Sending interrupt to stop the worker thread");
worker.interrupt(); // TODO interrupt often gets swallowed, do what else? worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
} }
/** /**
* start the SplitLogWorker thread * start the SplitLogWorker thread
*/ */
public void start() { public void start() {
worker = new Thread(null, this, "SplitLogWorker-" + serverName); worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName());
exitWorker = false;
worker.start(); worker.start();
} }
@ -603,29 +178,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* stop the SplitLogWorker thread * stop the SplitLogWorker thread
*/ */
public void stop() { public void stop() {
exitWorker = true; coordination.stopProcessingTasks();
stopTask(); stopTask();
} }
/**
* Asynchronous handler for zk get-data-set-watch on node results.
*/
class GetDataAsyncCallback implements AsyncCallback.DataCallback {
private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
if (rc != 0) {
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
getDataSetWatchFailure(path);
return;
}
data = watcher.getRecoverableZooKeeper().removeMetaData(data);
getDataSetWatchSuccess(path, data);
}
}
/** /**
* Objects implementing this interface actually do the task that has been * Objects implementing this interface actually do the task that has been
* acquired by a {@link SplitLogWorker}. Since there isn't a water-tight * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
@ -642,4 +198,13 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
} }
Status exec(String name, RecoveryMode mode, CancelableProgressable p); Status exec(String name, RecoveryMode mode, CancelableProgressable p);
} }
/**
* Returns the number of tasks processed by coordination.
* This method is used by tests only
*/
@VisibleForTesting
public int getTaskReadySeq() {
return coordination.getTaskReadySeq();
}
} }

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -30,17 +28,13 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/** /**
* Handles log splitting a wal * Handles log splitting a wal
@ -49,28 +43,24 @@ import org.apache.zookeeper.KeeperException;
public class HLogSplitterHandler extends EventHandler { public class HLogSplitterHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class); private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
private final ServerName serverName; private final ServerName serverName;
private final String curTask;
private final String wal;
private final ZooKeeperWatcher zkw;
private final CancelableProgressable reporter; private final CancelableProgressable reporter;
private final AtomicInteger inProgressTasks; private final AtomicInteger inProgressTasks;
private final MutableInt curTaskZKVersion;
private final TaskExecutor splitTaskExecutor; private final TaskExecutor splitTaskExecutor;
private final RecoveryMode mode; private final RecoveryMode mode;
private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
private final SplitLogWorkerCoordination coordination;
public HLogSplitterHandler(final Server server, String curTask,
final MutableInt curTaskZKVersion, public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
CancelableProgressable reporter, SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) { AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
super(server, EventType.RS_LOG_REPLAY); super(server, EventType.RS_LOG_REPLAY);
this.curTask = curTask; this.splitTaskDetails = splitDetails;
this.wal = ZKSplitLog.getFileName(curTask); this.coordination = coordination;
this.reporter = reporter; this.reporter = reporter;
this.inProgressTasks = inProgressTasks; this.inProgressTasks = inProgressTasks;
this.inProgressTasks.incrementAndGet(); this.inProgressTasks.incrementAndGet();
this.serverName = server.getServerName(); this.serverName = server.getServerName();
this.zkw = server.getZooKeeper();
this.curTaskZKVersion = curTaskZKVersion;
this.splitTaskExecutor = splitTaskExecutor; this.splitTaskExecutor = splitTaskExecutor;
this.mode = mode; this.mode = mode;
} }
@ -79,20 +69,20 @@ public class HLogSplitterHandler extends EventHandler {
public void process() throws IOException { public void process() throws IOException {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
Status status = this.splitTaskExecutor.exec(wal, mode, reporter); Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
switch (status) { switch (status) {
case DONE: case DONE:
endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode), coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
break; break;
case PREEMPTED: case PREEMPTED:
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
LOG.warn("task execution prempted " + wal); LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
break; break;
case ERR: case ERR:
if (server != null && !server.isStopped()) { if (server != null && !server.isStopped()) {
endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode), coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
break; break;
} }
// if the RS is exiting then there is probably a tons of stuff // if the RS is exiting then there is probably a tons of stuff
@ -100,45 +90,17 @@ public class HLogSplitterHandler extends EventHandler {
//$FALL-THROUGH$ //$FALL-THROUGH$
case RESIGNED: case RESIGNED:
if (server != null && server.isStopped()) { if (server != null && server.isStopped()) {
LOG.info("task execution interrupted because worker is exiting " + curTask); LOG.info("task execution interrupted because worker is exiting "
+ splitTaskDetails.toString());
} }
endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode), coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
break; break;
} }
} finally { } finally {
LOG.info("worker " + serverName + " done with task " + curTask + " in " LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
+ (System.currentTimeMillis() - startTime) + "ms"); + (System.currentTimeMillis() - startTime) + "ms");
this.inProgressTasks.decrementAndGet(); this.inProgressTasks.decrementAndGet();
} }
} }
/**
* endTask() can fail and the only way to recover out of it is for the
* {@link SplitLogManager} to timeout the task node.
* @param slt
* @param ctr
*/
public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task,
int taskZKVersion) {
try {
if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) {
LOG.info("successfully transitioned task " + task + " to final state " + slt);
ctr.incrementAndGet();
return;
}
LOG.warn("failed to transistion task " + task + " to end state " + slt
+ " because of version mismatch ");
} catch (KeeperException.BadVersionException bve) {
LOG.warn("transisition task " + task + " to " + slt
+ " failed because of version mismatch", bve);
} catch (KeeperException.NoNodeException e) {
LOG.fatal(
"logic error - end task " + task + " " + slt
+ " failed because task doesn't exist", e);
} catch (KeeperException e) {
LOG.warn("failed to end task, " + task + " " + slt, e);
}
SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
}
} }

View File

@ -78,9 +78,10 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -110,7 +111,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -138,8 +138,7 @@ public class HLogSplitter {
private Set<TableName> disablingOrDisabledTables = private Set<TableName> disablingOrDisabledTables =
new HashSet<TableName>(); new HashSet<TableName>();
private ZooKeeperWatcher watcher; private BaseCoordinatedStateManager csm;
private CoordinatedStateManager csm;
// If an exception is thrown by one of the other threads, it will be // If an exception is thrown by one of the other threads, it will be
// stored here. // stored here.
@ -173,7 +172,7 @@ public class HLogSplitter {
private final int minBatchSize; private final int minBatchSize;
HLogSplitter(Configuration conf, Path rootDir, HLogSplitter(Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, FileSystem fs, LastSequenceId idChecker,
CoordinatedStateManager csm, RecoveryMode mode) { CoordinatedStateManager csm, RecoveryMode mode) {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf String codecClassName = conf
@ -182,8 +181,7 @@ public class HLogSplitter {
this.rootDir = rootDir; this.rootDir = rootDir;
this.fs = fs; this.fs = fs;
this.sequenceIdChecker = idChecker; this.sequenceIdChecker = idChecker;
this.watcher = zkw; this.csm = (BaseCoordinatedStateManager)csm;
this.csm = csm;
entryBuffers = new EntryBuffers( entryBuffers = new EntryBuffers(
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
@ -195,7 +193,7 @@ public class HLogSplitter {
this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (zkw != null && csm != null && this.distributedLogReplay) { if (csm != null && this.distributedLogReplay) {
outputSink = new LogReplayOutputSink(numWriterThreads); outputSink = new LogReplayOutputSink(numWriterThreads);
} else { } else {
if (this.distributedLogReplay) { if (this.distributedLogReplay) {
@ -219,15 +217,14 @@ public class HLogSplitter {
* @param conf * @param conf
* @param reporter * @param reporter
* @param idChecker * @param idChecker
* @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we * @param cp coordination state manager
* dump out recoved.edits files for regions to replay on.
* @return false if it is interrupted by the progress-able. * @return false if it is interrupted by the progress-able.
* @throws IOException * @throws IOException
*/ */
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException { CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode); HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, cp, mode);
return s.splitLogFile(logfile, reporter); return s.splitLogFile(logfile, reporter);
} }
@ -241,8 +238,8 @@ public class HLogSplitter {
List<Path> splits = new ArrayList<Path>(); List<Path> splits = new ArrayList<Path>();
if (logfiles != null && logfiles.length > 0) { if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) { for (FileStatus logfile: logfiles) {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, HLogSplitter s =
RecoveryMode.LOG_SPLITTING); new HLogSplitter(conf, rootDir, fs, null, null, RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) { if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) { if (s.outputSink.splits != null) {
@ -295,7 +292,7 @@ public class HLogSplitter {
LOG.warn("Nothing to split in log file " + logPath); LOG.warn("Nothing to split in log file " + logPath);
return true; return true;
} }
if(watcher != null && csm != null) { if(csm != null) {
try { try {
TableStateManager tsm = csm.getTableStateManager(); TableStateManager tsm = csm.getTableStateManager();
disablingOrDisabledTables = tsm.getTablesInStates( disablingOrDisabledTables = tsm.getTablesInStates(
@ -320,7 +317,8 @@ public class HLogSplitter {
if (lastFlushedSequenceId == null) { if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) { if (this.distributedLogReplay) {
RegionStoreSequenceIds ids = RegionStoreSequenceIds ids =
SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key); csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
key);
if (ids != null) { if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId(); lastFlushedSequenceId = ids.getLastFlushedSequenceId();
} }
@ -358,7 +356,8 @@ public class HLogSplitter {
throw iie; throw iie;
} catch (CorruptedLogFileException e) { } catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted log file " + logPath, e); LOG.warn("Could not parse, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
logfile.getPath().getName(), fs);
isCorrupted = true; isCorrupted = true;
} catch (IOException e) { } catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);
@ -1368,8 +1367,9 @@ public class HLogSplitter {
public LogReplayOutputSink(int numWriters) { public LogReplayOutputSink(int numWriters) {
super(numWriters); super(numWriters);
this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", this.waitRegionOnlineTimeOut =
SplitLogManager.DEFAULT_TIMEOUT); conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
this.logRecoveredEditsOutputSink.setReporter(reporter); this.logRecoveredEditsOutputSink.setReporter(reporter);
} }
@ -1590,8 +1590,8 @@ public class HLogSplitter {
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
// update the value for the region // update the value for the region
RegionStoreSequenceIds ids = RegionStoreSequenceIds ids =
SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
.getRegionInfo().getEncodedName()); loc.getRegionInfo().getEncodedName());
if (ids != null) { if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId(); lastFlushedSequenceId = ids.getLastFlushedSequenceId();
Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.zookeeper; package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -28,8 +30,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.zookeeper.KeeperException;
/** /**
* Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker} * Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker}
@ -120,4 +125,100 @@ public class ZKSplitLog {
return isCorrupt; return isCorrupt;
} }
/*
* Following methods come from SplitLogManager
*/
/**
* check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
* and set watcher as well.
* @param zkw
* @param regionEncodedName region encode name
* @return true when /hbase/recovering-regions/<current region encoded name> exists
* @throws KeeperException
*/
public static boolean
isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
throws KeeperException {
boolean result = false;
String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
if (node != null) {
result = true;
}
return result;
}
/**
* @param bytes - Content of a failed region server or recovering region znode.
* @return long - The last flushed sequence Id for the region server
*/
public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
long lastRecordedFlushedSequenceId = -1l;
try {
lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
} catch (DeserializationException e) {
lastRecordedFlushedSequenceId = -1l;
LOG.warn("Can't parse last flushed sequence Id", e);
}
return lastRecordedFlushedSequenceId;
}
public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
try {
if (regions == null) {
// remove all children under /home/recovering-regions
LOG.debug("Garbage collecting all recovering region znodes");
ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
} else {
for (String curRegion : regions) {
String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
ZKUtil.deleteNodeRecursively(watcher, nodePath);
}
}
} catch (KeeperException e) {
LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
}
}
/**
* This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
* @param zkw
* @param serverName
* @param encodedRegionName
* @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
* @throws IOException
*/
public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
String serverName, String encodedRegionName) throws IOException {
// when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
// last flushed sequence Id changes when newly assigned RS flushes writes to the region.
// If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
// sequence Id name space (sequence Id only valid for a particular RS instance), changes
// when different newly assigned RS flushes the region.
// Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
// last flushed sequence Id for each failed RS instance.
RegionStoreSequenceIds result = null;
String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
nodePath = ZKUtil.joinZNode(nodePath, serverName);
try {
byte[] data;
try {
data = ZKUtil.getData(zkw, nodePath);
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (data != null) {
result = ZKUtil.parseRegionStoreSequenceIds(data);
}
} catch (KeeperException e) {
throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
+ serverName + "; region=" + encodedRegionName, e);
} catch (DeserializationException e) {
LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
}
return result;
}
} }

View File

@ -77,6 +77,9 @@ import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
@ -657,8 +660,8 @@ public class TestDistributedLogSplitting {
break; break;
} }
slm.markRegionsRecoveringInZK(firstFailedServer, regionSet); slm.markRegionsRecovering(firstFailedServer, regionSet);
slm.markRegionsRecoveringInZK(secondFailedServer, regionSet); slm.markRegionsRecovering(secondFailedServer, regionSet);
List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw, List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName())); ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
@ -886,7 +889,7 @@ public class TestDistributedLogSplitting {
break; break;
} }
slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet); slm.markRegionsRecovering(hrs.getServerName(), regionSet);
// move region in order for the region opened in recovering state // move region in order for the region opened in recovering state
final HRegionInfo hri = region; final HRegionInfo hri = region;
final HRegionServer tmpRS = dstRS; final HRegionServer tmpRS = dstRS;
@ -1070,7 +1073,10 @@ public class TestDistributedLogSplitting {
out.write(0); out.write(0);
out.write(Bytes.toBytes("corrupted bytes")); out.write(Bytes.toBytes("corrupted bytes"));
out.close(); out.close();
slm.ignoreZKDeleteForTesting = true; ZKSplitLogManagerCoordination coordination =
(ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
.getCoordinatedStateManager()).getSplitLogManagerCoordination();
coordination.setIgnoreDeleteForTesting(true);
executor = Executors.newSingleThreadExecutor(); executor = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable() { Runnable runnable = new Runnable() {
@Override @Override

View File

@ -19,11 +19,8 @@
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
@ -49,22 +46,26 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -85,13 +86,14 @@ public class TestSplitLogManager {
private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1"); private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
private final ServerManager sm = Mockito.mock(ServerManager.class); private final ServerManager sm = Mockito.mock(ServerManager.class);
private final MasterServices master = Mockito.mock(MasterServices.class); private final MasterServices master = Mockito.mock(MasterServices.class);
static { static {
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
} }
private ZooKeeperWatcher zkw; private ZooKeeperWatcher zkw;
private DummyServer ds;
private static boolean stopped = false; private static boolean stopped = false;
private SplitLogManager slm; private SplitLogManager slm;
private Configuration conf; private Configuration conf;
@ -100,6 +102,68 @@ public class TestSplitLogManager {
private static HBaseTestingUtility TEST_UTIL; private static HBaseTestingUtility TEST_UTIL;
class DummyServer implements Server {
private ZooKeeperWatcher zkw;
private Configuration conf;
private CoordinatedStateManager cm;
public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
this.zkw = zkw;
this.conf = conf;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
cm.initialize(this);
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public ServerName getServerName() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return cm;
}
@Override
public HConnection getShortCircuitConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
}
static Stoppable stopper = new Stoppable() { static Stoppable stopper = new Stoppable() {
@Override @Override
public void stop(String why) { public void stop(String why) {
@ -110,7 +174,6 @@ public class TestSplitLogManager {
public boolean isStopped() { public boolean isStopped() {
return stopped; return stopped;
} }
}; };
@Before @Before
@ -119,7 +182,10 @@ public class TestSplitLogManager {
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
// Use a different ZK wrapper instance for each tests. // Use a different ZK wrapper instance for each tests.
zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); zkw =
new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
ds = new DummyServer(zkw, conf);
ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
@ -132,18 +198,20 @@ public class TestSplitLogManager {
resetCounters(); resetCounters();
// By default, we let the test manage the error as before, so the server // By default, we let the test manage the error as before, so the server
// does not appear as dead from the master point of view, only from the split log pov. // does not appear as dead from the master point of view, only from the split log pov.
Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
Mockito.when(master.getServerManager()).thenReturn(sm); Mockito.when(master.getServerManager()).thenReturn(sm);
to = 6000; to = 6000;
conf.setInt("hbase.splitlog.manager.timeout", to); conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
to = to + 4 * 100; to = to + 4 * 100;
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? this.mode =
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
: RecoveryMode.LOG_SPLITTING);
} }
@After @After
@ -173,17 +241,17 @@ public class TestSplitLogManager {
throws Exception { throws Exception {
TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
return (e.eval() != oldval); return (e.eval() != oldval);
} }
}); });
assertEquals(newval, e.eval()); assertEquals(newval, e.eval());
} }
private String submitTaskAndWait(TaskBatch batch, String name) private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
throws KeeperException, InterruptedException { InterruptedException {
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
zkw.registerListener(listener); zkw.registerListener(listener);
@ -208,7 +276,7 @@ public class TestSplitLogManager {
public void testTaskCreation() throws Exception { public void testTaskCreation() throws Exception {
LOG.info("TestTaskCreation - test the creation of a task in zk"); LOG.info("TestTaskCreation - test the creation of a task in zk");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -228,7 +296,7 @@ public class TestSplitLogManager {
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
@ -254,7 +322,7 @@ public class TestSplitLogManager {
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode); int version = ZKUtil.checkExists(zkw, tasknode);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
@ -275,9 +343,8 @@ public class TestSplitLogManager {
@Test @Test
public void testMultipleResubmits() throws Exception { public void testMultipleResubmits() throws Exception {
LOG.info("TestMultipleResbmits - no indefinite resubmissions"); LOG.info("TestMultipleResbmits - no indefinite resubmissions");
conf.setInt("hbase.splitlog.max.resubmit", 2); conf.setInt("hbase.splitlog.max.resubmit", 2);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -309,7 +376,7 @@ public class TestSplitLogManager {
public void testRescanCleanup() throws Exception { public void testRescanCleanup() throws Exception {
LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -338,7 +405,7 @@ public class TestSplitLogManager {
public void testTaskDone() throws Exception { public void testTaskDone() throws Exception {
LOG.info("TestTaskDone - cleanup task node once in DONE state"); LOG.info("TestTaskDone - cleanup task node once in DONE state");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1");
@ -358,7 +425,7 @@ public class TestSplitLogManager {
LOG.info("TestTaskErr - cleanup task node once in ERR state"); LOG.info("TestTaskErr - cleanup task node once in ERR state");
conf.setInt("hbase.splitlog.max.resubmit", 0); conf.setInt("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -373,14 +440,14 @@ public class TestSplitLogManager {
} }
waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
} }
@Test @Test
public void testTaskResigned() throws Exception { public void testTaskResigned() throws Exception {
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.get(), 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.get(), 0);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -414,7 +481,7 @@ public class TestSplitLogManager {
zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
// submit another task which will stay in unassigned mode // submit another task which will stay in unassigned mode
@ -443,7 +510,7 @@ public class TestSplitLogManager {
LOG.info("testDeadWorker"); LOG.info("testDeadWorker");
conf.setLong("hbase.splitlog.max.resubmit", 0); conf.setLong("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -468,7 +535,7 @@ public class TestSplitLogManager {
@Test @Test
public void testWorkerCrash() throws Exception { public void testWorkerCrash() throws Exception {
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -493,7 +560,7 @@ public class TestSplitLogManager {
@Test @Test
public void testEmptyLogDir() throws Exception { public void testEmptyLogDir() throws Exception {
LOG.info("testEmptyLogDir"); LOG.info("testEmptyLogDir");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
UUID.randomUUID().toString()); UUID.randomUUID().toString());
@ -505,7 +572,7 @@ public class TestSplitLogManager {
@Test (timeout = 60000) @Test (timeout = 60000)
public void testLogFilesAreArchived() throws Exception { public void testLogFilesAreArchived() throws Exception {
LOG.info("testLogFilesAreArchived"); LOG.info("testLogFilesAreArchived");
final SplitLogManager slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
conf.set(HConstants.HBASE_DIR, dir.toString()); conf.set(HConstants.HBASE_DIR, dir.toString());
@ -554,15 +621,15 @@ public class TestSplitLogManager {
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L)); ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
slm.removeStaleRecoveringRegionsFromZK(null); slm.removeStaleRecoveringRegions(null);
List<String> recoveringRegions = List<String> recoveringRegions =
zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty()); assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
} }
@Test(timeout=60000) @Test(timeout=60000)
public void testGetPreviousRecoveryMode() throws Exception { public void testGetPreviousRecoveryMode() throws Exception {
LOG.info("testGetPreviousRecoveryMode"); LOG.info("testGetPreviousRecoveryMode");
@ -575,12 +642,12 @@ public class TestSplitLogManager {
ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER);
assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING); assertTrue(slm.isLogSplitting());
zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
slm.setRecoveryMode(false); slm.setRecoveryMode(false);
assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY); assertTrue(slm.isLogReplaying());
} }
} }

View File

@ -19,8 +19,9 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.hamcrest.CoreMatchers.*;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -30,19 +31,23 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -65,11 +70,74 @@ public class TestSplitLogWorker {
} }
private final static HBaseTestingUtility TEST_UTIL = private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility(); new HBaseTestingUtility();
private DummyServer ds;
private ZooKeeperWatcher zkw; private ZooKeeperWatcher zkw;
private SplitLogWorker slw; private SplitLogWorker slw;
private ExecutorService executorService; private ExecutorService executorService;
private RecoveryMode mode; private RecoveryMode mode;
class DummyServer implements Server {
private ZooKeeperWatcher zkw;
private Configuration conf;
private CoordinatedStateManager cm;
public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
this.zkw = zkw;
this.conf = conf;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
cm.initialize(this);
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public ServerName getServerName() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return cm;
}
@Override
public HConnection getShortCircuitConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
}
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
throws Exception { throws Exception {
assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval, assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
@ -106,19 +174,22 @@ public class TestSplitLogWorker {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"split-log-worker-tests", null); "split-log-worker-tests", null);
ds = new DummyServer(zkw, conf);
ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1)));
LOG.debug(zkw.baseZNode + " created"); LOG.debug(zkw.baseZNode + " created");
ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1)));
LOG.debug(zkw.splitLogZNode + " created"); LOG.debug(zkw.splitLogZNode + " created");
ZKUtil.createAndFailSilent(zkw, zkw.rsZNode); ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1); assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1)));
SplitLogCounters.resetCounters(); SplitLogCounters.resetCounters();
executorService = new ExecutorService("TestSplitLogWorker"); executorService = new ExecutorService("TestSplitLogWorker");
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
} }
@ -157,12 +228,12 @@ public class TestSplitLogWorker {
final ServerName RS = ServerName.valueOf("rs,1,1"); final ServerName RS = ServerName.valueOf("rs,1,1");
RegionServerServices mockedRS = getRegionServer(RS); RegionServerServices mockedRS = getRegionServer(RS);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
SplitLogWorker slw = SplitLogWorker slw =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start(); slw.start();
try { try {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
@ -170,7 +241,7 @@ public class TestSplitLogWorker {
SplitLogTask slt = SplitLogTask.parseFrom(bytes); SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(RS)); assertTrue(slt.isOwned(RS));
} finally { } finally {
stopSplitLogWorker(slw); stopSplitLogWorker(slw);
} }
} }
@ -193,14 +264,14 @@ public class TestSplitLogWorker {
final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
RegionServerServices mockedRS1 = getRegionServer(SVR1); RegionServerServices mockedRS1 = getRegionServer(SVR1);
RegionServerServices mockedRS2 = getRegionServer(SVR2); RegionServerServices mockedRS2 = getRegionServer(SVR2);
SplitLogWorker slw1 = SplitLogWorker slw1 =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
SplitLogWorker slw2 = SplitLogWorker slw2 =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
slw1.start(); slw1.start();
slw2.start(); slw2.start();
try { try {
@ -227,7 +298,7 @@ public class TestSplitLogWorker {
final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
RegionServerServices mockedRS = getRegionServer(SRV); RegionServerServices mockedRS = getRegionServer(SRV);
SplitLogWorker slw = SplitLogWorker slw =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start(); slw.start();
try { try {
Thread.yield(); // let the worker start Thread.yield(); // let the worker start
@ -236,11 +307,11 @@ public class TestSplitLogWorker {
// this time create a task node after starting the splitLogWorker // this time create a task node after starting the splitLogWorker
zkw.getRecoverableZooKeeper().create(PATH, zkw.getRecoverableZooKeeper().create(PATH,
new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
assertEquals(1, slw.taskReadySeq); assertEquals(1, slw.getTaskReadySeq());
byte [] bytes = ZKUtil.getData(zkw, PATH); byte [] bytes = ZKUtil.getData(zkw, PATH);
SplitLogTask slt = SplitLogTask.parseFrom(bytes); SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SRV)); assertTrue(slt.isOwned(SRV));
@ -260,14 +331,14 @@ public class TestSplitLogWorker {
final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
RegionServerServices mockedRS = getRegionServer(SRV); RegionServerServices mockedRS = getRegionServer(SRV);
SplitLogWorker slw = SplitLogWorker slw =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start(); slw.start();
try { try {
Thread.yield(); // let the worker start Thread.yield(); // let the worker start
Thread.sleep(100); Thread.sleep(100);
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
SplitLogTask unassignedManager = SplitLogTask unassignedManager =
new SplitLogTask.Unassigned(MANAGER, this.mode); new SplitLogTask.Unassigned(MANAGER, this.mode);
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@ -287,7 +358,7 @@ public class TestSplitLogWorker {
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
assertEquals(2, slw.taskReadySeq); assertEquals(2, slw.getTaskReadySeq());
byte [] bytes = ZKUtil.getData(zkw, PATH2); byte [] bytes = ZKUtil.getData(zkw, PATH2);
slt = SplitLogTask.parseFrom(bytes); slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SRV)); assertTrue(slt.isOwned(SRV));
@ -302,7 +373,7 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters(); SplitLogCounters.resetCounters();
final ServerName SRV = ServerName.valueOf("svr,1,1"); final ServerName SRV = ServerName.valueOf("svr,1,1");
RegionServerServices mockedRS = getRegionServer(SRV); RegionServerServices mockedRS = getRegionServer(SRV);
slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start(); slw.start();
Thread.yield(); // let the worker start Thread.yield(); // let the worker start
Thread.sleep(100); Thread.sleep(100);
@ -358,14 +429,13 @@ public class TestSplitLogWorker {
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
RegionServerServices mockedRS = getRegionServer(RS); RegionServerServices mockedRS = getRegionServer(RS);
for (int i = 0; i < maxTasks; i++) { for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} }
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
slw.start(); slw.start();
try { try {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
@ -408,7 +478,7 @@ public class TestSplitLogWorker {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} }
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
slw.start(); slw.start();
try { try {
int acquiredTasks = 0; int acquiredTasks = 0;

View File

@ -18,7 +18,12 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.util.NavigableSet; import java.util.NavigableSet;
@ -27,7 +32,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
@ -35,8 +44,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import static org.mockito.Mockito.mock;
/** /**
* Simple testing of a few HLog methods. * Simple testing of a few HLog methods.
*/ */
@ -45,7 +52,7 @@ public class TestHLogMethods {
private static final byte[] TEST_REGION = Bytes.toBytes("test_region");; private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
private static final TableName TEST_TABLE = private static final TableName TEST_TABLE =
TableName.valueOf("test_table"); TableName.valueOf("test_table");
private final HBaseTestingUtility util = new HBaseTestingUtility(); private final HBaseTestingUtility util = new HBaseTestingUtility();
/** /**
@ -108,27 +115,27 @@ public class TestHLogMethods {
reb.appendEntry(createTestLogEntry(1)); reb.appendEntry(createTestLogEntry(1));
assertTrue(reb.heapSize() > 0); assertTrue(reb.heapSize() > 0);
} }
@Test @Test
public void testEntrySink() throws Exception { public void testEntrySink() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
HLogSplitter splitter = new HLogSplitter( HLogSplitter splitter = new HLogSplitter(
conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode); conf, mock(Path.class), mock(FileSystem.class), null, null, mode);
EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024); EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
HLog.Entry entry = createTestLogEntry(i); HLog.Entry entry = createTestLogEntry(i);
sink.appendEntry(entry); sink.appendEntry(entry);
} }
assertTrue(sink.totalBuffered > 0); assertTrue(sink.totalBuffered > 0);
long amountInChunk = sink.totalBuffered; long amountInChunk = sink.totalBuffered;
// Get a chunk // Get a chunk
RegionEntryBuffer chunk = sink.getChunkToWrite(); RegionEntryBuffer chunk = sink.getChunkToWrite();
assertEquals(chunk.heapSize(), amountInChunk); assertEquals(chunk.heapSize(), amountInChunk);
// Make sure it got marked that a thread is "working on this" // Make sure it got marked that a thread is "working on this"
assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION)); assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
@ -136,26 +143,26 @@ public class TestHLogMethods {
for (int i = 0; i < 500; i++) { for (int i = 0; i < 500; i++) {
HLog.Entry entry = createTestLogEntry(i); HLog.Entry entry = createTestLogEntry(i);
sink.appendEntry(entry); sink.appendEntry(entry);
} }
// Asking for another chunk shouldn't work since the first one // Asking for another chunk shouldn't work since the first one
// is still writing // is still writing
assertNull(sink.getChunkToWrite()); assertNull(sink.getChunkToWrite());
// If we say we're done writing the first chunk, then we should be able // If we say we're done writing the first chunk, then we should be able
// to get the second // to get the second
sink.doneWriting(chunk); sink.doneWriting(chunk);
RegionEntryBuffer chunk2 = sink.getChunkToWrite(); RegionEntryBuffer chunk2 = sink.getChunkToWrite();
assertNotNull(chunk2); assertNotNull(chunk2);
assertNotSame(chunk, chunk2); assertNotSame(chunk, chunk2);
long amountInChunk2 = sink.totalBuffered; long amountInChunk2 = sink.totalBuffered;
// The second chunk had fewer rows than the first // The second chunk had fewer rows than the first
assertTrue(amountInChunk2 < amountInChunk); assertTrue(amountInChunk2 < amountInChunk);
sink.doneWriting(chunk2); sink.doneWriting(chunk2);
assertEquals(0, sink.totalBuffered); assertEquals(0, sink.totalBuffered);
} }
private HLog.Entry createTestLogEntry(int i) { private HLog.Entry createTestLogEntry(int i) {
long seq = i; long seq = i;
long now = i * 1000; long now = i * 1000;

View File

@ -138,7 +138,7 @@ public class TestHLogReaderOnSecureHLog {
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
try { try {
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
s.splitLogFile(listStatus[0], null); s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt"); "corrupt");
@ -181,7 +181,7 @@ public class TestHLogReaderOnSecureHLog {
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
try { try {
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
s.splitLogFile(listStatus[0], null); s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt"); "corrupt");

View File

@ -809,7 +809,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0); logfiles != null && logfiles.length > 0);
// Set up a splitter that will throw an IOE on the output side // Set up a splitter that will throw an IOE on the output side
HLogSplitter logSplitter = new HLogSplitter( HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null, null, this.mode) { conf, HBASEDIR, fs, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs, protected HLog.Writer createWriter(FileSystem fs,
Path logfile, Configuration conf) throws IOException { Path logfile, Configuration conf) throws IOException {
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@ -942,7 +942,7 @@ public class TestHLogSplit {
try { try {
conf.setInt("hbase.splitlog.report.period", 1000); conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = HLogSplitter.splitLogFile( boolean ret = HLogSplitter.splitLogFile(
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode); HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
assertFalse("Log splitting should failed", ret); assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0); assertTrue(count.get() > 0);
} catch (IOException e) { } catch (IOException e) {
@ -1001,7 +1001,7 @@ public class TestHLogSplit {
// Create a splitter that reads and writes the data without touching disk // Create a splitter that reads and writes the data without touching disk
HLogSplitter logSplitter = new HLogSplitter( HLogSplitter logSplitter = new HLogSplitter(
localConf, HBASEDIR, fs, null, null, null, this.mode) { localConf, HBASEDIR, fs, null, null, this.mode) {
/* Produce a mock writer that doesn't write anywhere */ /* Produce a mock writer that doesn't write anywhere */
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@ -1286,7 +1286,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0); logfiles != null && logfiles.length > 0);
HLogSplitter logSplitter = new HLogSplitter( HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null, null, this.mode) { conf, HBASEDIR, fs, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException { throws IOException {
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf); HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);

View File

@ -885,7 +885,7 @@ public class TestWALReplay {
wal.close(); wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, null, mode); this.fs, this.conf, null, null, null, mode);
FileStatus[] listStatus1 = this.fs.listStatus( FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
new Path(hri.getEncodedName(), "recovered.edits"))); new Path(hri.getEncodedName(), "recovered.edits")));