diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java index 67fe96a638f..3b1bbb77772 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java @@ -62,14 +62,12 @@ public interface SplitLogManagerCoordination { final private ConcurrentMap tasks; final private MasterServices master; final private Set failedDeletions; - final private ServerName serverName; public SplitLogManagerDetails(ConcurrentMap tasks, MasterServices master, - Set failedDeletions, ServerName serverName) { + Set failedDeletions) { this.tasks = tasks; this.master = master; this.failedDeletions = failedDeletions; - this.serverName = serverName; } /** @@ -97,7 +95,7 @@ public interface SplitLogManagerCoordination { * @return server name */ public ServerName getServerName() { - return serverName; + return master.getServerName(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index d0a4d5889f5..cc39e9ff8ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,14 +39,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; @@ -75,32 +72,25 @@ import org.apache.zookeeper.data.Stat; public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements SplitLogManagerCoordination { - public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails { - - ZkSplitLogManagerDetails(ConcurrentMap tasks, MasterServices master, - Set failedDeletions, ServerName serverName) { - super(tasks, master, failedDeletions, serverName); - } - } - public static final int DEFAULT_TIMEOUT = 120000; public static final int DEFAULT_ZK_RETRIES = 3; public static final int DEFAULT_MAX_RESUBMIT = 3; private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class); - private Server server; + private final TaskFinisher taskFinisher; + private final Configuration conf; + private long zkretries; private long resubmitThreshold; private long timeout; - private TaskFinisher taskFinisher; SplitLogManagerDetails details; // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check // whether to GC stale recovering znodes private volatile long lastRecoveringNodeCreationTime = 0; - private Configuration conf; + public boolean ignoreZKDeleteForTesting = false; private RecoveryMode recoveryMode; @@ -122,8 +112,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements return Status.DONE; } }; - this.server = manager.getServer(); - this.conf = server.getConfiguration(); + this.conf = manager.getServer().getConfiguration(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index bff85cbec99..d14a9254529 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -58,18 +57,17 @@ import org.apache.hadoop.hbase.util.Triple; @InterfaceAudience.Private public class CatalogJanitor extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName()); - private final Server server; + + private final AtomicBoolean alreadyRunning = new AtomicBoolean(false); + private final AtomicBoolean enabled = new AtomicBoolean(true); private final MasterServices services; - private AtomicBoolean enabled = new AtomicBoolean(true); - private AtomicBoolean alreadyRunning = new AtomicBoolean(false); private final Connection connection; - CatalogJanitor(final Server server, final MasterServices services) { - super("CatalogJanitor-" + server.getServerName().toShortString(), server, server - .getConfiguration().getInt("hbase.catalogjanitor.interval", 300000)); - this.server = server; + CatalogJanitor(final MasterServices services) { + super("CatalogJanitor-" + services.getServerName().toShortString(), services, + services.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000)); this.services = services; - this.connection = server.getConnection(); + this.connection = services.getConnection(); } @Override @@ -214,8 +212,7 @@ public class CatalogJanitor extends ScheduledChore { + " from fs because merged region no longer holds references"); HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionA); HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionB); - MetaTableAccessor.deleteMergeQualifiers(server.getConnection(), - mergedRegion); + MetaTableAccessor.deleteMergeQualifiers(services.getConnection(), mergedRegion); return true; } return false; @@ -414,7 +411,7 @@ public class CatalogJanitor extends ScheduledChore { try { regionFs = HRegionFileSystem.openRegionFromFileSystem( this.services.getConfiguration(), fs, tabledir, daughter, true); - + for (HColumnDescriptor family: parentDescriptor.getFamilies()) { if ((references = regionFs.hasReferences(family.getNameAsString()))) { break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2145909fd36..810f95be4d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -788,7 +788,7 @@ public class HMaster extends HRegionServer implements MasterServices { getChoreService().scheduleChore(balancerChore); this.normalizerChore = new RegionNormalizerChore(this); getChoreService().scheduleChore(normalizerChore); - this.catalogJanitorChore = new CatalogJanitor(this, this); + this.catalogJanitorChore = new CatalogJanitor(this); getChoreService().scheduleChore(catalogJanitorChore); // Do Metrics periodically diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index ab0dd4f1b36..4d19e9e33f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -100,8 +100,7 @@ public class MasterWalManager { this.conf = conf; this.rootDir = rootDir; this.services = services; - this.splitLogManager = new SplitLogManager(services, conf, - services, services, services.getServerName()); + this.splitLogManager = new SplitLogManager(services, conf); this.distributedLogReplay = this.splitLogManager.isLogReplaying(); this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index fa5816f37ab..c3323572c52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -48,10 +48,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Stoppable; @@ -100,9 +98,8 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; public class SplitLogManager { private static final Log LOG = LogFactory.getLog(SplitLogManager.class); - private Server server; + private final MasterServices server; - private final Stoppable stopper; private final Configuration conf; private final ChoreService choreService; @@ -129,26 +126,19 @@ public class SplitLogManager { /** * Its OK to construct this object even when region-servers are not online. It does lookup the * orphan tasks in coordination engine but it doesn't block waiting for them to be done. - * @param server the server instance - * @param conf the HBase configuration - * @param stopper the stoppable in case anything is wrong * @param master the master services - * @param serverName the master server name + * @param conf the HBase configuration * @throws IOException */ - public SplitLogManager(Server server, Configuration conf, Stoppable stopper, - MasterServices master, ServerName serverName) throws IOException { - this.server = server; + public SplitLogManager(MasterServices master, Configuration conf) + throws IOException { + this.server = master; this.conf = conf; - this.stopper = stopper; - this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_"); + this.choreService = new ChoreService(master.getServerName() + "_splitLogManager_"); if (server.getCoordinatedStateManager() != null) { - SplitLogManagerCoordination coordination = - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination(); + SplitLogManagerCoordination coordination = getSplitLogManagerCoordination(); Set failedDeletions = Collections.synchronizedSet(new HashSet()); - SplitLogManagerDetails details = - new SplitLogManagerDetails(tasks, master, failedDeletions, serverName); + SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions); coordination.setDetails(details); coordination.init(); // Determine recovery mode @@ -157,10 +147,15 @@ public class SplitLogManager { conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); this.timeoutMonitor = new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), - stopper); + master); choreService.scheduleChore(timeoutMonitor); } + private SplitLogManagerCoordination getSplitLogManagerCoordination() { + return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination(); + } + private FileStatus[] getFileList(List logDirs, PathFilter filter) throws IOException { return getFileList(conf, logDirs, filter); } @@ -325,14 +320,11 @@ public class SplitLogManager { */ boolean enqueueSplitTask(String taskname, TaskBatch batch) { lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); - String task = - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().prepareTask(taskname); + String task = getSplitLogManagerCoordination().prepareTask(taskname); Task oldtask = createTaskIfAbsent(task, batch); if (oldtask == null) { // publish the task in the coordination engine - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().submitTask(task); + getSplitLogManagerCoordination().submitTask(task); return true; } return false; @@ -349,9 +341,7 @@ public class SplitLogManager { if (remaining != actual) { LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); } - int remainingTasks = - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().remainingTasksInCoordination(); + int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination(); if (remainingTasks >= 0 && actual > remainingTasks) { LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " + remainingTasks); @@ -365,7 +355,7 @@ public class SplitLogManager { } } batch.wait(100); - if (stopper.isStopped()) { + if (server.isStopped()) { LOG.warn("Stopped while waiting for log splits to be completed"); return; } @@ -414,9 +404,8 @@ public class SplitLogManager { this.recoveringRegionLock.lock(); try { - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet, - isMetaRecovery); + getSplitLogManagerCoordination().removeRecoveringRegions( + recoveredServerNameSet, isMetaRecovery); } catch (IOException e) { LOG.warn("removeRecoveringRegions got exception. Will retry", e); if (serverNames != null && !serverNames.isEmpty()) { @@ -445,8 +434,7 @@ public class SplitLogManager { this.recoveringRegionLock.lock(); try { - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers); + getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers); } finally { this.recoveringRegionLock.unlock(); } @@ -566,9 +554,7 @@ public class SplitLogManager { * @throws IOException throws if it's impossible to set recovery mode */ public void setRecoveryMode(boolean isForInitialization) throws IOException { - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization); - + getSplitLogManagerCoordination().setRecoveryMode(isForInitialization); } public void markRegionsRecovering(ServerName server, Set userRegions) @@ -579,8 +565,7 @@ public class SplitLogManager { try { this.recoveringRegionLock.lock(); // mark that we're creating recovering regions - ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions); + getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions); } finally { this.recoveringRegionLock.unlock(); } @@ -591,9 +576,8 @@ public class SplitLogManager { * @return whether log is replaying */ public boolean isLogReplaying() { - CoordinatedStateManager m = server.getCoordinatedStateManager(); - if (m == null) return false; - return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying(); + if (server.getCoordinatedStateManager() == null) return false; + return getSplitLogManagerCoordination().isReplaying(); } /** @@ -601,16 +585,14 @@ public class SplitLogManager { */ public boolean isLogSplitting() { if (server.getCoordinatedStateManager() == null) return false; - return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().isSplitting(); + return getSplitLogManagerCoordination().isSplitting(); } /** * @return the current log recovery mode */ public RecoveryMode getRecoveryMode() { - return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().getRecoveryMode(); + return getSplitLogManagerCoordination().getRecoveryMode(); } /** @@ -695,6 +677,8 @@ public class SplitLogManager { @Override protected void chore() { + if (server.getCoordinatedStateManager() == null) return; + int resubmitted = 0; int unassigned = 0; int tot = 0; @@ -723,16 +707,14 @@ public class SplitLogManager { found_assigned_task = true; if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); - if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { + if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { resubmitted++; } else { handleDeadWorker(cur_worker); LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker + ", will retry."); } - } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { + } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { resubmitted++; } } @@ -767,25 +749,21 @@ public class SplitLogManager { // called unnecessarily for a taskpath if (task.isUnassigned() && (task.status != FAILURE)) { // We just touch the znode to make sure its still there - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().checkTaskStillAvailable(key); + getSplitLogManagerCoordination().checkTaskStillAvailable(key); } } - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().checkTasks(); + getSplitLogManagerCoordination().checkTasks(); SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); } Set failedDeletions = - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().getDetails().getFailedDeletions(); + getSplitLogManagerCoordination().getDetails().getFailedDeletions(); // Retry previously failed deletes if (failedDeletions.size() > 0) { List tmpPaths = new ArrayList(failedDeletions); for (String tmpPath : tmpPaths) { // deleteNode is an async call - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().deleteTask(tmpPath); + getSplitLogManagerCoordination().deleteTask(tmpPath); } failedDeletions.removeAll(tmpPaths); } @@ -793,8 +771,7 @@ public class SplitLogManager { // Garbage collect left-over long timeInterval = EnvironmentEdgeManager.currentTime() - - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().getLastRecoveryTime(); + - getSplitLogManagerCoordination().getLastRecoveryTime(); if (!failedRecoveringRegionDeletions.isEmpty() || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) { // inside the function there have more checks before GC anything diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 35a3a79af50..67ea5f6e4ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -32,12 +32,10 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -45,8 +43,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaMockingUtil; -import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; @@ -75,8 +71,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Triple; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -91,16 +85,16 @@ public class TestCatalogJanitor { private static final Log LOG = LogFactory.getLog(TestCatalogJanitor.class); /** - * Pseudo server for below tests. - * Be sure to call stop on the way out else could leave some mess around. + * Mock MasterServices for tests below. */ - class MockServer implements Server { + class MockMasterServices extends MockNoopMasterServices { private final ClusterConnection connection; - private final Configuration c; + private final MasterFileSystem mfs; + private final AssignmentManager asm; + + MockMasterServices(final HBaseTestingUtility htu) throws IOException { + super(htu.getConfiguration()); - MockServer(final HBaseTestingUtility htu) - throws NotAllMetaRegionsOnlineException, IOException, InterruptedException { - this.c = htu.getConfiguration(); ClientProtos.ClientService.BlockingInterface ri = Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); MutateResponse.Builder builder = MutateResponse.newBuilder(); @@ -128,93 +122,16 @@ public class TestCatalogJanitor { // ClusterConnection return the HRI. Have the HRI return a few mocked up responses // to make our test work. this.connection = - HConnectionTestingUtility.getMockedConnectionAndDecorate(this.c, + HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(), Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, ServerName.valueOf("example.org,12345,6789"), HRegionInfo.FIRST_META_REGIONINFO); // Set hbase.rootdir into test dir. - FileSystem.get(this.c); - Path rootdir = FSUtils.getRootDir(this.c); - FSUtils.setRootDir(this.c, rootdir); + FileSystem.get(getConfiguration()); + Path rootdir = FSUtils.getRootDir(getConfiguration()); + FSUtils.setRootDir(getConfiguration(), rootdir); Mockito.mock(AdminProtos.AdminService.BlockingInterface.class); - } - @Override - public ClusterConnection getConnection() { - return this.connection; - } - - @Override - public MetaTableLocator getMetaTableLocator() { - return null; - } - - @Override - public Configuration getConfiguration() { - return this.c; - } - - @Override - public ServerName getServerName() { - return ServerName.valueOf("mockserver.example.org", 1234, -1L); - } - - @Override - public ZooKeeperWatcher getZooKeeper() { - return null; - } - - @Override - public CoordinatedStateManager getCoordinatedStateManager() { - BaseCoordinatedStateManager m = Mockito.mock(BaseCoordinatedStateManager.class); - SplitLogManagerCoordination c = Mockito.mock(SplitLogManagerCoordination.class); - Mockito.when(m.getSplitLogManagerCoordination()).thenReturn(c); - SplitLogManagerDetails d = Mockito.mock(SplitLogManagerDetails.class); - Mockito.when(c.getDetails()).thenReturn(d); - return m; - } - - @Override - public void abort(String why, Throwable e) { - //no-op - } - - @Override - public boolean isAborted() { - return false; - } - - @Override - public boolean isStopped() { - return false; - } - - @Override - public void stop(String why) { - } - - @Override - public ChoreService getChoreService() { - return null; - } - - @Override - public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub - return null; - } - } - - /** - * Mock MasterServices for tests below. - */ - class MockMasterServices extends MockNoopMasterServices { - private final MasterFileSystem mfs; - private final AssignmentManager asm; - private final Server server; - - MockMasterServices(final Server server) throws IOException { - this.server = server; this.mfs = new MasterFileSystem(this); this.asm = Mockito.mock(AssignmentManager.class); } @@ -230,13 +147,23 @@ public class TestCatalogJanitor { } @Override - public Configuration getConfiguration() { - return server.getConfiguration(); + public ClusterConnection getConnection() { + return this.connection; } @Override public ServerName getServerName() { - return server.getServerName(); + return ServerName.valueOf("mockserver.example.org", 1234, -1L); + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + BaseCoordinatedStateManager m = Mockito.mock(BaseCoordinatedStateManager.class); + SplitLogManagerCoordination c = Mockito.mock(SplitLogManagerCoordination.class); + Mockito.when(m.getSplitLogManagerCoordination()).thenReturn(c); + SplitLogManagerDetails d = Mockito.mock(SplitLogManagerDetails.class); + Mockito.when(c.getDetails()).thenReturn(d); + return m; } @Override @@ -290,10 +217,9 @@ public class TestCatalogJanitor { public void testCleanParent() throws IOException, InterruptedException { HBaseTestingUtility htu = new HBaseTestingUtility(); setRootDirAndCleanIt(htu, "testCleanParent"); - Server server = new MockServer(htu); + MasterServices services = new MockMasterServices(htu); try { - MasterServices services = new MockMasterServices(server); - CatalogJanitor janitor = new CatalogJanitor(server, services); + CatalogJanitor janitor = new CatalogJanitor(services); // Create regions. HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table")); htd.addFamily(new HColumnDescriptor("f")); @@ -327,7 +253,7 @@ public class TestCatalogJanitor { assertTrue(fs.delete(p, true)); assertTrue(janitor.cleanParent(parent, r)); } finally { - server.stop("shutdown"); + services.stop("shutdown"); } } @@ -368,9 +294,8 @@ public class TestCatalogJanitor { throws IOException, InterruptedException { HBaseTestingUtility htu = new HBaseTestingUtility(); setRootDirAndCleanIt(htu, rootDir); - Server server = new MockServer(htu); - MasterServices services = new MockMasterServices(server); - CatalogJanitor janitor = new CatalogJanitor(server, services); + MasterServices services = new MockMasterServices(htu); + CatalogJanitor janitor = new CatalogJanitor(services); final HTableDescriptor htd = createHTableDescriptor(); // Create regions: aaa->{lastEndKey}, aaa->ccc, aaa->bbb, bbb->ccc, etc. @@ -470,8 +395,7 @@ public class TestCatalogJanitor { public void testScanDoesNotCleanRegionsWithExistingParents() throws Exception { HBaseTestingUtility htu = new HBaseTestingUtility(); setRootDirAndCleanIt(htu, "testScanDoesNotCleanRegionsWithExistingParents"); - Server server = new MockServer(htu); - MasterServices services = new MockMasterServices(server); + MasterServices services = new MockMasterServices(htu); final HTableDescriptor htd = createHTableDescriptor(); @@ -506,7 +430,7 @@ public class TestCatalogJanitor { splitParents.put(splita, createResult(splita, splitaa,splitab)); final Map mergedRegions = new TreeMap(); - CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); + CatalogJanitor janitor = spy(new CatalogJanitor(services)); doReturn(new Triple, Map>( 10, mergedRegions, splitParents)).when(janitor) .getMergedRegionsAndSplitParents(); @@ -628,11 +552,10 @@ public class TestCatalogJanitor { String table = "table"; HBaseTestingUtility htu = new HBaseTestingUtility(); setRootDirAndCleanIt(htu, "testCleanParent"); - Server server = new MockServer(htu); - MasterServices services = new MockMasterServices(server); + MasterServices services = new MockMasterServices(htu); // create the janitor - CatalogJanitor janitor = new CatalogJanitor(server, services); + CatalogJanitor janitor = new CatalogJanitor(services); // Create regions. HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); @@ -688,7 +611,6 @@ public class TestCatalogJanitor { // cleanup FSUtils.delete(fs, rootdir, true); services.stop("Test finished"); - server.stop("Test finished"); janitor.cancel(true); } @@ -712,12 +634,11 @@ public class TestCatalogJanitor { String table = "table"; HBaseTestingUtility htu = new HBaseTestingUtility(); setRootDirAndCleanIt(htu, "testCleanParent"); - Server server = new MockServer(htu); - MasterServices services = new MockMasterServices(server); + MasterServices services = new MockMasterServices(htu); // create the janitor - CatalogJanitor janitor = new CatalogJanitor(server, services); + CatalogJanitor janitor = new CatalogJanitor(services); // Create regions. HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); @@ -773,7 +694,6 @@ public class TestCatalogJanitor { // cleanup services.stop("Test finished"); - server.stop("shutdown"); janitor.cancel(true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index d0dbb0db5da..d928d1c3324 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -46,19 +46,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; @@ -66,7 +62,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -86,17 +81,15 @@ import org.mockito.Mockito; @Category({MasterTests.class, MediumTests.class}) public class TestSplitLogManager { private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); - private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1"); + private final ServerManager sm = Mockito.mock(ServerManager.class); - private final MasterServices master = Mockito.mock(MasterServices.class); static { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); } private ZooKeeperWatcher zkw; - private DummyServer ds; - private static boolean stopped = false; + private DummyMasterServices master; private SplitLogManager slm; private Configuration conf; private int to; @@ -104,90 +97,33 @@ public class TestSplitLogManager { private static HBaseTestingUtility TEST_UTIL; - class DummyServer implements Server { + class DummyMasterServices extends MockNoopMasterServices { private ZooKeeperWatcher zkw; - private Configuration conf; private CoordinatedStateManager cm; - public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { + public DummyMasterServices(ZooKeeperWatcher zkw, Configuration conf) { + super(conf); this.zkw = zkw; - this.conf = conf; cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); cm.initialize(this); } - @Override - public void abort(String why, Throwable e) { - } - - @Override - public boolean isAborted() { - return false; - } - - @Override - public void stop(String why) { - } - - @Override - public boolean isStopped() { - return false; - } - - @Override - public Configuration getConfiguration() { - return conf; - } - @Override public ZooKeeperWatcher getZooKeeper() { return zkw; } - @Override - public ServerName getServerName() { - return null; - } - @Override public CoordinatedStateManager getCoordinatedStateManager() { return cm; } @Override - public ClusterConnection getConnection() { - return null; - } - - @Override - public MetaTableLocator getMetaTableLocator() { - return null; - } - - @Override - public ChoreService getChoreService() { - return null; - } - - @Override - public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub - return null; + public ServerManager getServerManager() { + return sm; } } - static Stoppable stopper = new Stoppable() { - @Override - public void stop(String why) { - stopped = true; - } - - @Override - public boolean isStopped() { - return stopped; - } - }; - @Before public void setup() throws Exception { TEST_UTIL = new HBaseTestingUtility(); @@ -196,7 +132,7 @@ public class TestSplitLogManager { // Use a different ZK wrapper instance for each tests. zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); - ds = new DummyServer(zkw, conf); + master = new DummyMasterServices(zkw, conf); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); @@ -206,13 +142,11 @@ public class TestSplitLogManager { assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); LOG.debug(zkw.splitLogZNode + " created"); - stopped = false; resetCounters(); // By default, we let the test manage the error as before, so the server // does not appear as dead from the master point of view, only from the split log pov. Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); - Mockito.when(master.getServerManager()).thenReturn(sm); to = 12000; conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); @@ -228,7 +162,7 @@ public class TestSplitLogManager { @After public void teardown() throws IOException, KeeperException { - stopper.stop(""); + master.stop(""); if (slm != null) slm.stop(); TEST_UTIL.shutdownMiniZKCluster(); } @@ -288,7 +222,7 @@ public class TestSplitLogManager { public void testTaskCreation() throws Exception { LOG.info("TestTaskCreation - test the creation of a task in zk"); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -296,7 +230,7 @@ public class TestSplitLogManager { byte[] data = ZKUtil.getData(zkw, tasknode); SplitLogTask slt = SplitLogTask.parseFrom(data); LOG.info("Task node created " + slt.toString()); - assertTrue(slt.isUnassigned(DUMMY_MASTER)); + assertTrue(slt.isUnassigned(master.getServerName())); } @Test (timeout=180000) @@ -304,11 +238,11 @@ public class TestSplitLogManager { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); - SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode); + SplitLogTask slt = new SplitLogTask.Owned(master.getServerName(), this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -329,12 +263,12 @@ public class TestSplitLogManager { " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task - SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode); + SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName(), this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -356,7 +290,7 @@ public class TestSplitLogManager { public void testMultipleResubmits() throws Exception { LOG.info("TestMultipleResbmits - no indefinite resubmissions"); conf.setInt("hbase.splitlog.max.resubmit", 2); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -388,7 +322,7 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -408,7 +342,7 @@ public class TestSplitLogManager { assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); - assertTrue(slt.isUnassigned(DUMMY_MASTER)); + assertTrue(slt.isUnassigned(master.getServerName())); waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); } @@ -417,7 +351,7 @@ public class TestSplitLogManager { public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); @@ -437,7 +371,7 @@ public class TestSplitLogManager { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -459,7 +393,7 @@ public class TestSplitLogManager { public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -478,7 +412,7 @@ public class TestSplitLogManager { byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); - assertTrue(slt.isUnassigned(DUMMY_MASTER)); + assertTrue(slt.isUnassigned(master.getServerName())); } @Test (timeout=180000) @@ -493,7 +427,7 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); // submit another task which will stay in unassigned mode @@ -522,7 +456,7 @@ public class TestSplitLogManager { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -541,13 +475,13 @@ public class TestSplitLogManager { assertTrue(version1 > version); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); - assertTrue(slt.isUnassigned(DUMMY_MASTER)); + assertTrue(slt.isUnassigned(master.getServerName())); return; } @Test (timeout=180000) public void testWorkerCrash() throws Exception { - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -572,7 +506,7 @@ public class TestSplitLogManager { @Test (timeout=180000) public void testEmptyLogDir() throws Exception { LOG.info("testEmptyLogDir"); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString()); @@ -584,7 +518,7 @@ public class TestSplitLogManager { @Test (timeout = 60000) public void testLogFilesAreArchived() throws Exception { LOG.info("testLogFilesAreArchived"); - final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); conf.set(HConstants.HBASE_DIR, dir.toString()); @@ -637,7 +571,7 @@ public class TestSplitLogManager { HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L)); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); slm.removeStaleRecoveringRegions(null); List recoveringRegions = @@ -659,7 +593,7 @@ public class TestSplitLogManager { ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(master, conf); LOG.info("Mode1=" + slm.getRecoveryMode()); assertTrue(slm.isLogSplitting()); zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);