HBASE-16551 Cleanup SplitLogManager and CatalogJanitor

This commit is contained in:
Matteo Bertozzi 2016-09-20 10:33:39 -07:00
parent 08d9a2b662
commit 181fed4450
8 changed files with 119 additions and 305 deletions

View File

@ -62,14 +62,12 @@ public interface SplitLogManagerCoordination {
final private ConcurrentMap<String, Task> tasks; final private ConcurrentMap<String, Task> tasks;
final private MasterServices master; final private MasterServices master;
final private Set<String> failedDeletions; final private Set<String> failedDeletions;
final private ServerName serverName;
public SplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master, public SplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master,
Set<String> failedDeletions, ServerName serverName) { Set<String> failedDeletions) {
this.tasks = tasks; this.tasks = tasks;
this.master = master; this.master = master;
this.failedDeletions = failedDeletions; this.failedDeletions = failedDeletions;
this.serverName = serverName;
} }
/** /**
@ -97,7 +95,7 @@ public interface SplitLogManagerCoordination {
* @return server name * @return server name
*/ */
public ServerName getServerName() { public ServerName getServerName() {
return serverName; return master.getServerName();
} }
} }

View File

@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -40,14 +39,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
@ -75,32 +72,25 @@ import org.apache.zookeeper.data.Stat;
public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
SplitLogManagerCoordination { SplitLogManagerCoordination {
public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails {
ZkSplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master,
Set<String> failedDeletions, ServerName serverName) {
super(tasks, master, failedDeletions, serverName);
}
}
public static final int DEFAULT_TIMEOUT = 120000; public static final int DEFAULT_TIMEOUT = 120000;
public static final int DEFAULT_ZK_RETRIES = 3; public static final int DEFAULT_ZK_RETRIES = 3;
public static final int DEFAULT_MAX_RESUBMIT = 3; public static final int DEFAULT_MAX_RESUBMIT = 3;
private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class); private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class);
private Server server; private final TaskFinisher taskFinisher;
private final Configuration conf;
private long zkretries; private long zkretries;
private long resubmitThreshold; private long resubmitThreshold;
private long timeout; private long timeout;
private TaskFinisher taskFinisher;
SplitLogManagerDetails details; SplitLogManagerDetails details;
// When lastRecoveringNodeCreationTime is older than the following threshold, we'll check // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
// whether to GC stale recovering znodes // whether to GC stale recovering znodes
private volatile long lastRecoveringNodeCreationTime = 0; private volatile long lastRecoveringNodeCreationTime = 0;
private Configuration conf;
public boolean ignoreZKDeleteForTesting = false; public boolean ignoreZKDeleteForTesting = false;
private RecoveryMode recoveryMode; private RecoveryMode recoveryMode;
@ -122,8 +112,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
return Status.DONE; return Status.DONE;
} }
}; };
this.server = manager.getServer(); this.conf = manager.getServer().getConfiguration();
this.conf = server.getConfiguration();
} }
@Override @Override

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -58,18 +57,17 @@ import org.apache.hadoop.hbase.util.Triple;
@InterfaceAudience.Private @InterfaceAudience.Private
public class CatalogJanitor extends ScheduledChore { public class CatalogJanitor extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName()); private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName());
private final Server server;
private final AtomicBoolean alreadyRunning = new AtomicBoolean(false);
private final AtomicBoolean enabled = new AtomicBoolean(true);
private final MasterServices services; private final MasterServices services;
private AtomicBoolean enabled = new AtomicBoolean(true);
private AtomicBoolean alreadyRunning = new AtomicBoolean(false);
private final Connection connection; private final Connection connection;
CatalogJanitor(final Server server, final MasterServices services) { CatalogJanitor(final MasterServices services) {
super("CatalogJanitor-" + server.getServerName().toShortString(), server, server super("CatalogJanitor-" + services.getServerName().toShortString(), services,
.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000)); services.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000));
this.server = server;
this.services = services; this.services = services;
this.connection = server.getConnection(); this.connection = services.getConnection();
} }
@Override @Override
@ -214,8 +212,7 @@ public class CatalogJanitor extends ScheduledChore {
+ " from fs because merged region no longer holds references"); + " from fs because merged region no longer holds references");
HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionA); HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionA);
HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionB); HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionB);
MetaTableAccessor.deleteMergeQualifiers(server.getConnection(), MetaTableAccessor.deleteMergeQualifiers(services.getConnection(), mergedRegion);
mergedRegion);
return true; return true;
} }
return false; return false;
@ -414,7 +411,7 @@ public class CatalogJanitor extends ScheduledChore {
try { try {
regionFs = HRegionFileSystem.openRegionFromFileSystem( regionFs = HRegionFileSystem.openRegionFromFileSystem(
this.services.getConfiguration(), fs, tabledir, daughter, true); this.services.getConfiguration(), fs, tabledir, daughter, true);
for (HColumnDescriptor family: parentDescriptor.getFamilies()) { for (HColumnDescriptor family: parentDescriptor.getFamilies()) {
if ((references = regionFs.hasReferences(family.getNameAsString()))) { if ((references = regionFs.hasReferences(family.getNameAsString()))) {
break; break;

View File

@ -788,7 +788,7 @@ public class HMaster extends HRegionServer implements MasterServices {
getChoreService().scheduleChore(balancerChore); getChoreService().scheduleChore(balancerChore);
this.normalizerChore = new RegionNormalizerChore(this); this.normalizerChore = new RegionNormalizerChore(this);
getChoreService().scheduleChore(normalizerChore); getChoreService().scheduleChore(normalizerChore);
this.catalogJanitorChore = new CatalogJanitor(this, this); this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore); getChoreService().scheduleChore(catalogJanitorChore);
// Do Metrics periodically // Do Metrics periodically

View File

@ -100,8 +100,7 @@ public class MasterWalManager {
this.conf = conf; this.conf = conf;
this.rootDir = rootDir; this.rootDir = rootDir;
this.services = services; this.services = services;
this.splitLogManager = new SplitLogManager(services, conf, this.splitLogManager = new SplitLogManager(services, conf);
services, services, services.getServerName());
this.distributedLogReplay = this.splitLogManager.isLogReplaying(); this.distributedLogReplay = this.splitLogManager.isLogReplaying();
this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);

View File

@ -48,10 +48,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
@ -100,9 +98,8 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
public class SplitLogManager { public class SplitLogManager {
private static final Log LOG = LogFactory.getLog(SplitLogManager.class); private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
private Server server; private final MasterServices server;
private final Stoppable stopper;
private final Configuration conf; private final Configuration conf;
private final ChoreService choreService; private final ChoreService choreService;
@ -129,26 +126,19 @@ public class SplitLogManager {
/** /**
* Its OK to construct this object even when region-servers are not online. It does lookup the * Its OK to construct this object even when region-servers are not online. It does lookup the
* orphan tasks in coordination engine but it doesn't block waiting for them to be done. * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
* @param server the server instance
* @param conf the HBase configuration
* @param stopper the stoppable in case anything is wrong
* @param master the master services * @param master the master services
* @param serverName the master server name * @param conf the HBase configuration
* @throws IOException * @throws IOException
*/ */
public SplitLogManager(Server server, Configuration conf, Stoppable stopper, public SplitLogManager(MasterServices master, Configuration conf)
MasterServices master, ServerName serverName) throws IOException { throws IOException {
this.server = server; this.server = master;
this.conf = conf; this.conf = conf;
this.stopper = stopper; this.choreService = new ChoreService(master.getServerName() + "_splitLogManager_");
this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_");
if (server.getCoordinatedStateManager() != null) { if (server.getCoordinatedStateManager() != null) {
SplitLogManagerCoordination coordination = SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination();
Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
SplitLogManagerDetails details = SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
coordination.setDetails(details); coordination.setDetails(details);
coordination.init(); coordination.init();
// Determine recovery mode // Determine recovery mode
@ -157,10 +147,15 @@ public class SplitLogManager {
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
this.timeoutMonitor = this.timeoutMonitor =
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
stopper); master);
choreService.scheduleChore(timeoutMonitor); choreService.scheduleChore(timeoutMonitor);
} }
private SplitLogManagerCoordination getSplitLogManagerCoordination() {
return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination();
}
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
return getFileList(conf, logDirs, filter); return getFileList(conf, logDirs, filter);
} }
@ -325,14 +320,11 @@ public class SplitLogManager {
*/ */
boolean enqueueSplitTask(String taskname, TaskBatch batch) { boolean enqueueSplitTask(String taskname, TaskBatch batch) {
lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
String task = String task = getSplitLogManagerCoordination().prepareTask(taskname);
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination().prepareTask(taskname);
Task oldtask = createTaskIfAbsent(task, batch); Task oldtask = createTaskIfAbsent(task, batch);
if (oldtask == null) { if (oldtask == null) {
// publish the task in the coordination engine // publish the task in the coordination engine
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().submitTask(task);
.getSplitLogManagerCoordination().submitTask(task);
return true; return true;
} }
return false; return false;
@ -349,9 +341,7 @@ public class SplitLogManager {
if (remaining != actual) { if (remaining != actual) {
LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
} }
int remainingTasks = int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination().remainingTasksInCoordination();
if (remainingTasks >= 0 && actual > remainingTasks) { if (remainingTasks >= 0 && actual > remainingTasks) {
LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
+ remainingTasks); + remainingTasks);
@ -365,7 +355,7 @@ public class SplitLogManager {
} }
} }
batch.wait(100); batch.wait(100);
if (stopper.isStopped()) { if (server.isStopped()) {
LOG.warn("Stopped while waiting for log splits to be completed"); LOG.warn("Stopped while waiting for log splits to be completed");
return; return;
} }
@ -414,9 +404,8 @@ public class SplitLogManager {
this.recoveringRegionLock.lock(); this.recoveringRegionLock.lock();
try { try {
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().removeRecoveringRegions(
.getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet, recoveredServerNameSet, isMetaRecovery);
isMetaRecovery);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("removeRecoveringRegions got exception. Will retry", e); LOG.warn("removeRecoveringRegions got exception. Will retry", e);
if (serverNames != null && !serverNames.isEmpty()) { if (serverNames != null && !serverNames.isEmpty()) {
@ -445,8 +434,7 @@ public class SplitLogManager {
this.recoveringRegionLock.lock(); this.recoveringRegionLock.lock();
try { try {
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
.getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
} finally { } finally {
this.recoveringRegionLock.unlock(); this.recoveringRegionLock.unlock();
} }
@ -566,9 +554,7 @@ public class SplitLogManager {
* @throws IOException throws if it's impossible to set recovery mode * @throws IOException throws if it's impossible to set recovery mode
*/ */
public void setRecoveryMode(boolean isForInitialization) throws IOException { public void setRecoveryMode(boolean isForInitialization) throws IOException {
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
.getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
} }
public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions) public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
@ -579,8 +565,7 @@ public class SplitLogManager {
try { try {
this.recoveringRegionLock.lock(); this.recoveringRegionLock.lock();
// mark that we're creating recovering regions // mark that we're creating recovering regions
((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager()) getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
.getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
} finally { } finally {
this.recoveringRegionLock.unlock(); this.recoveringRegionLock.unlock();
} }
@ -591,9 +576,8 @@ public class SplitLogManager {
* @return whether log is replaying * @return whether log is replaying
*/ */
public boolean isLogReplaying() { public boolean isLogReplaying() {
CoordinatedStateManager m = server.getCoordinatedStateManager(); if (server.getCoordinatedStateManager() == null) return false;
if (m == null) return false; return getSplitLogManagerCoordination().isReplaying();
return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying();
} }
/** /**
@ -601,16 +585,14 @@ public class SplitLogManager {
*/ */
public boolean isLogSplitting() { public boolean isLogSplitting() {
if (server.getCoordinatedStateManager() == null) return false; if (server.getCoordinatedStateManager() == null) return false;
return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) return getSplitLogManagerCoordination().isSplitting();
.getSplitLogManagerCoordination().isSplitting();
} }
/** /**
* @return the current log recovery mode * @return the current log recovery mode
*/ */
public RecoveryMode getRecoveryMode() { public RecoveryMode getRecoveryMode() {
return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) return getSplitLogManagerCoordination().getRecoveryMode();
.getSplitLogManagerCoordination().getRecoveryMode();
} }
/** /**
@ -695,6 +677,8 @@ public class SplitLogManager {
@Override @Override
protected void chore() { protected void chore() {
if (server.getCoordinatedStateManager() == null) return;
int resubmitted = 0; int resubmitted = 0;
int unassigned = 0; int unassigned = 0;
int tot = 0; int tot = 0;
@ -723,16 +707,14 @@ public class SplitLogManager {
found_assigned_task = true; found_assigned_task = true;
if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
.getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
resubmitted++; resubmitted++;
} else { } else {
handleDeadWorker(cur_worker); handleDeadWorker(cur_worker);
LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
+ ", will retry."); + ", will retry.");
} }
} else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
.getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
resubmitted++; resubmitted++;
} }
} }
@ -767,25 +749,21 @@ public class SplitLogManager {
// called unnecessarily for a taskpath // called unnecessarily for a taskpath
if (task.isUnassigned() && (task.status != FAILURE)) { if (task.isUnassigned() && (task.status != FAILURE)) {
// We just touch the znode to make sure its still there // We just touch the znode to make sure its still there
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().checkTaskStillAvailable(key);
.getSplitLogManagerCoordination().checkTaskStillAvailable(key);
} }
} }
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().checkTasks();
.getSplitLogManagerCoordination().checkTasks();
SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
LOG.debug("resubmitting unassigned task(s) after timeout"); LOG.debug("resubmitting unassigned task(s) after timeout");
} }
Set<String> failedDeletions = Set<String> failedDeletions =
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().getDetails().getFailedDeletions();
.getSplitLogManagerCoordination().getDetails().getFailedDeletions();
// Retry previously failed deletes // Retry previously failed deletes
if (failedDeletions.size() > 0) { if (failedDeletions.size() > 0) {
List<String> tmpPaths = new ArrayList<String>(failedDeletions); List<String> tmpPaths = new ArrayList<String>(failedDeletions);
for (String tmpPath : tmpPaths) { for (String tmpPath : tmpPaths) {
// deleteNode is an async call // deleteNode is an async call
((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) getSplitLogManagerCoordination().deleteTask(tmpPath);
.getSplitLogManagerCoordination().deleteTask(tmpPath);
} }
failedDeletions.removeAll(tmpPaths); failedDeletions.removeAll(tmpPaths);
} }
@ -793,8 +771,7 @@ public class SplitLogManager {
// Garbage collect left-over // Garbage collect left-over
long timeInterval = long timeInterval =
EnvironmentEdgeManager.currentTime() EnvironmentEdgeManager.currentTime()
- ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - getSplitLogManagerCoordination().getLastRecoveryTime();
.getSplitLogManagerCoordination().getLastRecoveryTime();
if (!failedRecoveringRegionDeletions.isEmpty() if (!failedRecoveringRegionDeletions.isEmpty()
|| (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) { || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
// inside the function there have more checks before GC anything // inside the function there have more checks before GC anything

View File

@ -32,12 +32,10 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -45,8 +43,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaMockingUtil; import org.apache.hadoop.hbase.MetaMockingUtil;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -75,8 +71,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -91,16 +85,16 @@ public class TestCatalogJanitor {
private static final Log LOG = LogFactory.getLog(TestCatalogJanitor.class); private static final Log LOG = LogFactory.getLog(TestCatalogJanitor.class);
/** /**
* Pseudo server for below tests. * Mock MasterServices for tests below.
* Be sure to call stop on the way out else could leave some mess around.
*/ */
class MockServer implements Server { class MockMasterServices extends MockNoopMasterServices {
private final ClusterConnection connection; private final ClusterConnection connection;
private final Configuration c; private final MasterFileSystem mfs;
private final AssignmentManager asm;
MockMasterServices(final HBaseTestingUtility htu) throws IOException {
super(htu.getConfiguration());
MockServer(final HBaseTestingUtility htu)
throws NotAllMetaRegionsOnlineException, IOException, InterruptedException {
this.c = htu.getConfiguration();
ClientProtos.ClientService.BlockingInterface ri = ClientProtos.ClientService.BlockingInterface ri =
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
MutateResponse.Builder builder = MutateResponse.newBuilder(); MutateResponse.Builder builder = MutateResponse.newBuilder();
@ -128,93 +122,16 @@ public class TestCatalogJanitor {
// ClusterConnection return the HRI. Have the HRI return a few mocked up responses // ClusterConnection return the HRI. Have the HRI return a few mocked up responses
// to make our test work. // to make our test work.
this.connection = this.connection =
HConnectionTestingUtility.getMockedConnectionAndDecorate(this.c, HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri,
ServerName.valueOf("example.org,12345,6789"), ServerName.valueOf("example.org,12345,6789"),
HRegionInfo.FIRST_META_REGIONINFO); HRegionInfo.FIRST_META_REGIONINFO);
// Set hbase.rootdir into test dir. // Set hbase.rootdir into test dir.
FileSystem.get(this.c); FileSystem.get(getConfiguration());
Path rootdir = FSUtils.getRootDir(this.c); Path rootdir = FSUtils.getRootDir(getConfiguration());
FSUtils.setRootDir(this.c, rootdir); FSUtils.setRootDir(getConfiguration(), rootdir);
Mockito.mock(AdminProtos.AdminService.BlockingInterface.class); Mockito.mock(AdminProtos.AdminService.BlockingInterface.class);
}
@Override
public ClusterConnection getConnection() {
return this.connection;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public Configuration getConfiguration() {
return this.c;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf("mockserver.example.org", 1234, -1L);
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
BaseCoordinatedStateManager m = Mockito.mock(BaseCoordinatedStateManager.class);
SplitLogManagerCoordination c = Mockito.mock(SplitLogManagerCoordination.class);
Mockito.when(m.getSplitLogManagerCoordination()).thenReturn(c);
SplitLogManagerDetails d = Mockito.mock(SplitLogManagerDetails.class);
Mockito.when(c.getDetails()).thenReturn(d);
return m;
}
@Override
public void abort(String why, Throwable e) {
//no-op
}
@Override
public boolean isAborted() {
return false;
}
@Override
public boolean isStopped() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
}
/**
* Mock MasterServices for tests below.
*/
class MockMasterServices extends MockNoopMasterServices {
private final MasterFileSystem mfs;
private final AssignmentManager asm;
private final Server server;
MockMasterServices(final Server server) throws IOException {
this.server = server;
this.mfs = new MasterFileSystem(this); this.mfs = new MasterFileSystem(this);
this.asm = Mockito.mock(AssignmentManager.class); this.asm = Mockito.mock(AssignmentManager.class);
} }
@ -230,13 +147,23 @@ public class TestCatalogJanitor {
} }
@Override @Override
public Configuration getConfiguration() { public ClusterConnection getConnection() {
return server.getConfiguration(); return this.connection;
} }
@Override @Override
public ServerName getServerName() { public ServerName getServerName() {
return server.getServerName(); return ServerName.valueOf("mockserver.example.org", 1234, -1L);
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
BaseCoordinatedStateManager m = Mockito.mock(BaseCoordinatedStateManager.class);
SplitLogManagerCoordination c = Mockito.mock(SplitLogManagerCoordination.class);
Mockito.when(m.getSplitLogManagerCoordination()).thenReturn(c);
SplitLogManagerDetails d = Mockito.mock(SplitLogManagerDetails.class);
Mockito.when(c.getDetails()).thenReturn(d);
return m;
} }
@Override @Override
@ -290,10 +217,9 @@ public class TestCatalogJanitor {
public void testCleanParent() throws IOException, InterruptedException { public void testCleanParent() throws IOException, InterruptedException {
HBaseTestingUtility htu = new HBaseTestingUtility(); HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, "testCleanParent"); setRootDirAndCleanIt(htu, "testCleanParent");
Server server = new MockServer(htu); MasterServices services = new MockMasterServices(htu);
try { try {
MasterServices services = new MockMasterServices(server); CatalogJanitor janitor = new CatalogJanitor(services);
CatalogJanitor janitor = new CatalogJanitor(server, services);
// Create regions. // Create regions.
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table")); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table"));
htd.addFamily(new HColumnDescriptor("f")); htd.addFamily(new HColumnDescriptor("f"));
@ -327,7 +253,7 @@ public class TestCatalogJanitor {
assertTrue(fs.delete(p, true)); assertTrue(fs.delete(p, true));
assertTrue(janitor.cleanParent(parent, r)); assertTrue(janitor.cleanParent(parent, r));
} finally { } finally {
server.stop("shutdown"); services.stop("shutdown");
} }
} }
@ -368,9 +294,8 @@ public class TestCatalogJanitor {
throws IOException, InterruptedException { throws IOException, InterruptedException {
HBaseTestingUtility htu = new HBaseTestingUtility(); HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, rootDir); setRootDirAndCleanIt(htu, rootDir);
Server server = new MockServer(htu); MasterServices services = new MockMasterServices(htu);
MasterServices services = new MockMasterServices(server); CatalogJanitor janitor = new CatalogJanitor(services);
CatalogJanitor janitor = new CatalogJanitor(server, services);
final HTableDescriptor htd = createHTableDescriptor(); final HTableDescriptor htd = createHTableDescriptor();
// Create regions: aaa->{lastEndKey}, aaa->ccc, aaa->bbb, bbb->ccc, etc. // Create regions: aaa->{lastEndKey}, aaa->ccc, aaa->bbb, bbb->ccc, etc.
@ -470,8 +395,7 @@ public class TestCatalogJanitor {
public void testScanDoesNotCleanRegionsWithExistingParents() throws Exception { public void testScanDoesNotCleanRegionsWithExistingParents() throws Exception {
HBaseTestingUtility htu = new HBaseTestingUtility(); HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, "testScanDoesNotCleanRegionsWithExistingParents"); setRootDirAndCleanIt(htu, "testScanDoesNotCleanRegionsWithExistingParents");
Server server = new MockServer(htu); MasterServices services = new MockMasterServices(htu);
MasterServices services = new MockMasterServices(server);
final HTableDescriptor htd = createHTableDescriptor(); final HTableDescriptor htd = createHTableDescriptor();
@ -506,7 +430,7 @@ public class TestCatalogJanitor {
splitParents.put(splita, createResult(splita, splitaa,splitab)); splitParents.put(splita, createResult(splita, splitaa,splitab));
final Map<HRegionInfo, Result> mergedRegions = new TreeMap<HRegionInfo, Result>(); final Map<HRegionInfo, Result> mergedRegions = new TreeMap<HRegionInfo, Result>();
CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); CatalogJanitor janitor = spy(new CatalogJanitor(services));
doReturn(new Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>>( doReturn(new Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>>(
10, mergedRegions, splitParents)).when(janitor) 10, mergedRegions, splitParents)).when(janitor)
.getMergedRegionsAndSplitParents(); .getMergedRegionsAndSplitParents();
@ -628,11 +552,10 @@ public class TestCatalogJanitor {
String table = "table"; String table = "table";
HBaseTestingUtility htu = new HBaseTestingUtility(); HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, "testCleanParent"); setRootDirAndCleanIt(htu, "testCleanParent");
Server server = new MockServer(htu); MasterServices services = new MockMasterServices(htu);
MasterServices services = new MockMasterServices(server);
// create the janitor // create the janitor
CatalogJanitor janitor = new CatalogJanitor(server, services); CatalogJanitor janitor = new CatalogJanitor(services);
// Create regions. // Create regions.
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
@ -688,7 +611,6 @@ public class TestCatalogJanitor {
// cleanup // cleanup
FSUtils.delete(fs, rootdir, true); FSUtils.delete(fs, rootdir, true);
services.stop("Test finished"); services.stop("Test finished");
server.stop("Test finished");
janitor.cancel(true); janitor.cancel(true);
} }
@ -712,12 +634,11 @@ public class TestCatalogJanitor {
String table = "table"; String table = "table";
HBaseTestingUtility htu = new HBaseTestingUtility(); HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, "testCleanParent"); setRootDirAndCleanIt(htu, "testCleanParent");
Server server = new MockServer(htu); MasterServices services = new MockMasterServices(htu);
MasterServices services = new MockMasterServices(server);
// create the janitor // create the janitor
CatalogJanitor janitor = new CatalogJanitor(server, services); CatalogJanitor janitor = new CatalogJanitor(services);
// Create regions. // Create regions.
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
@ -773,7 +694,6 @@ public class TestCatalogJanitor {
// cleanup // cleanup
services.stop("Test finished"); services.stop("Test finished");
server.stop("shutdown");
janitor.cancel(true); janitor.cancel(true);
} }

View File

@ -46,19 +46,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
@ -66,7 +62,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -86,17 +81,15 @@ import org.mockito.Mockito;
@Category({MasterTests.class, MediumTests.class}) @Category({MasterTests.class, MediumTests.class})
public class TestSplitLogManager { public class TestSplitLogManager {
private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
private final ServerManager sm = Mockito.mock(ServerManager.class); private final ServerManager sm = Mockito.mock(ServerManager.class);
private final MasterServices master = Mockito.mock(MasterServices.class);
static { static {
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
} }
private ZooKeeperWatcher zkw; private ZooKeeperWatcher zkw;
private DummyServer ds; private DummyMasterServices master;
private static boolean stopped = false;
private SplitLogManager slm; private SplitLogManager slm;
private Configuration conf; private Configuration conf;
private int to; private int to;
@ -104,90 +97,33 @@ public class TestSplitLogManager {
private static HBaseTestingUtility TEST_UTIL; private static HBaseTestingUtility TEST_UTIL;
class DummyServer implements Server { class DummyMasterServices extends MockNoopMasterServices {
private ZooKeeperWatcher zkw; private ZooKeeperWatcher zkw;
private Configuration conf;
private CoordinatedStateManager cm; private CoordinatedStateManager cm;
public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { public DummyMasterServices(ZooKeeperWatcher zkw, Configuration conf) {
super(conf);
this.zkw = zkw; this.zkw = zkw;
this.conf = conf;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
cm.initialize(this); cm.initialize(this);
} }
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override @Override
public ZooKeeperWatcher getZooKeeper() { public ZooKeeperWatcher getZooKeeper() {
return zkw; return zkw;
} }
@Override
public ServerName getServerName() {
return null;
}
@Override @Override
public CoordinatedStateManager getCoordinatedStateManager() { public CoordinatedStateManager getCoordinatedStateManager() {
return cm; return cm;
} }
@Override @Override
public ClusterConnection getConnection() { public ServerManager getServerManager() {
return null; return sm;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
} }
} }
static Stoppable stopper = new Stoppable() {
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
};
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL = new HBaseTestingUtility();
@ -196,7 +132,7 @@ public class TestSplitLogManager {
// Use a different ZK wrapper instance for each tests. // Use a different ZK wrapper instance for each tests.
zkw = zkw =
new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
ds = new DummyServer(zkw, conf); master = new DummyMasterServices(zkw, conf);
ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
@ -206,13 +142,11 @@ public class TestSplitLogManager {
assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
LOG.debug(zkw.splitLogZNode + " created"); LOG.debug(zkw.splitLogZNode + " created");
stopped = false;
resetCounters(); resetCounters();
// By default, we let the test manage the error as before, so the server // By default, we let the test manage the error as before, so the server
// does not appear as dead from the master point of view, only from the split log pov. // does not appear as dead from the master point of view, only from the split log pov.
Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
Mockito.when(master.getServerManager()).thenReturn(sm);
to = 12000; to = 12000;
conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
@ -228,7 +162,7 @@ public class TestSplitLogManager {
@After @After
public void teardown() throws IOException, KeeperException { public void teardown() throws IOException, KeeperException {
stopper.stop(""); master.stop("");
if (slm != null) slm.stop(); if (slm != null) slm.stop();
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniZKCluster();
} }
@ -288,7 +222,7 @@ public class TestSplitLogManager {
public void testTaskCreation() throws Exception { public void testTaskCreation() throws Exception {
LOG.info("TestTaskCreation - test the creation of a task in zk"); LOG.info("TestTaskCreation - test the creation of a task in zk");
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -296,7 +230,7 @@ public class TestSplitLogManager {
byte[] data = ZKUtil.getData(zkw, tasknode); byte[] data = ZKUtil.getData(zkw, tasknode);
SplitLogTask slt = SplitLogTask.parseFrom(data); SplitLogTask slt = SplitLogTask.parseFrom(data);
LOG.info("Task node created " + slt.toString()); LOG.info("Task node created " + slt.toString());
assertTrue(slt.isUnassigned(DUMMY_MASTER)); assertTrue(slt.isUnassigned(master.getServerName()));
} }
@Test (timeout=180000) @Test (timeout=180000)
@ -304,11 +238,11 @@ public class TestSplitLogManager {
LOG.info("TestOrphanTaskAcquisition"); LOG.info("TestOrphanTaskAcquisition");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode); SplitLogTask slt = new SplitLogTask.Owned(master.getServerName(), this.mode);
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
@ -329,12 +263,12 @@ public class TestSplitLogManager {
" startup"); " startup");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
//create an unassigned orphan task //create an unassigned orphan task
SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode); SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName(), this.mode);
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode); int version = ZKUtil.checkExists(zkw, tasknode);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
@ -356,7 +290,7 @@ public class TestSplitLogManager {
public void testMultipleResubmits() throws Exception { public void testMultipleResubmits() throws Exception {
LOG.info("TestMultipleResbmits - no indefinite resubmissions"); LOG.info("TestMultipleResbmits - no indefinite resubmissions");
conf.setInt("hbase.splitlog.max.resubmit", 2); conf.setInt("hbase.splitlog.max.resubmit", 2);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -388,7 +322,7 @@ public class TestSplitLogManager {
public void testRescanCleanup() throws Exception { public void testRescanCleanup() throws Exception {
LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -408,7 +342,7 @@ public class TestSplitLogManager {
assertTrue(version1 > version); assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode); byte[] taskstate = ZKUtil.getData(zkw, tasknode);
slt = SplitLogTask.parseFrom(taskstate); slt = SplitLogTask.parseFrom(taskstate);
assertTrue(slt.isUnassigned(DUMMY_MASTER)); assertTrue(slt.isUnassigned(master.getServerName()));
waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
} }
@ -417,7 +351,7 @@ public class TestSplitLogManager {
public void testTaskDone() throws Exception { public void testTaskDone() throws Exception {
LOG.info("TestTaskDone - cleanup task node once in DONE state"); LOG.info("TestTaskDone - cleanup task node once in DONE state");
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1");
@ -437,7 +371,7 @@ public class TestSplitLogManager {
LOG.info("TestTaskErr - cleanup task node once in ERR state"); LOG.info("TestTaskErr - cleanup task node once in ERR state");
conf.setInt("hbase.splitlog.max.resubmit", 0); conf.setInt("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -459,7 +393,7 @@ public class TestSplitLogManager {
public void testTaskResigned() throws Exception { public void testTaskResigned() throws Exception {
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.get(), 0);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.get(), 0);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -478,7 +412,7 @@ public class TestSplitLogManager {
byte[] taskstate = ZKUtil.getData(zkw, tasknode); byte[] taskstate = ZKUtil.getData(zkw, tasknode);
slt = SplitLogTask.parseFrom(taskstate); slt = SplitLogTask.parseFrom(taskstate);
assertTrue(slt.isUnassigned(DUMMY_MASTER)); assertTrue(slt.isUnassigned(master.getServerName()));
} }
@Test (timeout=180000) @Test (timeout=180000)
@ -493,7 +427,7 @@ public class TestSplitLogManager {
zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
// submit another task which will stay in unassigned mode // submit another task which will stay in unassigned mode
@ -522,7 +456,7 @@ public class TestSplitLogManager {
LOG.info("testDeadWorker"); LOG.info("testDeadWorker");
conf.setLong("hbase.splitlog.max.resubmit", 0); conf.setLong("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -541,13 +475,13 @@ public class TestSplitLogManager {
assertTrue(version1 > version); assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode); byte[] taskstate = ZKUtil.getData(zkw, tasknode);
slt = SplitLogTask.parseFrom(taskstate); slt = SplitLogTask.parseFrom(taskstate);
assertTrue(slt.isUnassigned(DUMMY_MASTER)); assertTrue(slt.isUnassigned(master.getServerName()));
return; return;
} }
@Test (timeout=180000) @Test (timeout=180000)
public void testWorkerCrash() throws Exception { public void testWorkerCrash() throws Exception {
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
@ -572,7 +506,7 @@ public class TestSplitLogManager {
@Test (timeout=180000) @Test (timeout=180000)
public void testEmptyLogDir() throws Exception { public void testEmptyLogDir() throws Exception {
LOG.info("testEmptyLogDir"); LOG.info("testEmptyLogDir");
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
UUID.randomUUID().toString()); UUID.randomUUID().toString());
@ -584,7 +518,7 @@ public class TestSplitLogManager {
@Test (timeout = 60000) @Test (timeout = 60000)
public void testLogFilesAreArchived() throws Exception { public void testLogFilesAreArchived() throws Exception {
LOG.info("testLogFilesAreArchived"); LOG.info("testLogFilesAreArchived");
final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
conf.set(HConstants.HBASE_DIR, dir.toString()); conf.set(HConstants.HBASE_DIR, dir.toString());
@ -637,7 +571,7 @@ public class TestSplitLogManager {
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L)); ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
slm.removeStaleRecoveringRegions(null); slm.removeStaleRecoveringRegions(null);
List<String> recoveringRegions = List<String> recoveringRegions =
@ -659,7 +593,7 @@ public class TestSplitLogManager {
ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); slm = new SplitLogManager(master, conf);
LOG.info("Mode1=" + slm.getRecoveryMode()); LOG.info("Mode1=" + slm.getRecoveryMode());
assertTrue(slm.isLogSplitting()); assertTrue(slm.isLogSplitting());
zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);