From cac06d45735d7f68ac1b817e41cdf0a75f03e3b1 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 17 Sep 2010 04:18:33 +0000 Subject: [PATCH] HBASE-3006 Reading compressed HFile blocks causes way too many DFS RPC calls severly impacting performance git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997973 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/io/hfile/HFile.java | 2 +- .../hbase/master/ActiveMasterManager.java | 52 +++++---- .../apache/hadoop/hbase/master/HMaster.java | 101 ++++++++++-------- .../hadoop/hbase/master/ServerManager.java | 10 +- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 2 +- .../hbase/master/TestActiveMasterManager.java | 46 +++++++- 6 files changed, 137 insertions(+), 76 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index aa5de242089..8f3e08ca8c0 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -1059,7 +1059,7 @@ public class HFile { new BufferedInputStream( new BoundedRangeFileInputStream(this.istream, offset, compressedSize, pread), - Math.min(DEFAUT_BLOCKSIZE, compressedSize)), + Math.min(DEFAULT_BLOCKSIZE, compressedSize)), decompressor, 0); buf = ByteBuffer.allocate(decompressedSize); IOUtils.readFully(is, buf.array(), 0, buf.capacity()); diff --git a/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java b/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java index 87fe9cd3861..6bf49b7a2ec 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java @@ -95,7 +95,7 @@ class ActiveMasterManager extends ZooKeeperListener { clusterHasActiveMaster.set(true); } else { // Node is no longer there, cluster does not have an active master - LOG.debug("No master available. notifying waiting threads"); + LOG.debug("No master available. Notifying waiting threads"); clusterHasActiveMaster.set(false); // Notify any thread waiting to become the active master clusterHasActiveMaster.notifyAll(); @@ -114,46 +114,56 @@ class ActiveMasterManager extends ZooKeeperListener { * * This also makes sure that we are watching the master znode so will be * notified if another master dies. - * @return False if we did not start up this cluster, another - * master did, or if a problem (zookeeper, stop flag has been set on this - * Master) + * @return True if no issue becoming active master else false if another + * master was running or if some other problem (zookeeper, stop flag has been + * set on this Master) */ boolean blockUntilBecomingActiveMaster() { - boolean thisMasterStartedCluster = true; + boolean cleanSetOfActiveMaster = true; // Try to become the active master, watch if there is another master try { - if(ZKUtil.setAddressAndWatch(watcher, watcher.masterAddressZNode, - address)) { + if (ZKUtil.setAddressAndWatch(this.watcher, + this.watcher.masterAddressZNode, this.address)) { // We are the master, return - clusterHasActiveMaster.set(true); - return thisMasterStartedCluster; + this.clusterHasActiveMaster.set(true); + return cleanSetOfActiveMaster; + } + + // There is another active master running elsewhere or this is a restart + // and the master ephemeral node has not expired yet. + this.clusterHasActiveMaster.set(true); + cleanSetOfActiveMaster = false; + HServerAddress currentMaster = + ZKUtil.getDataAsAddress(this.watcher, this.watcher.masterAddressZNode); + if (currentMaster != null && currentMaster.equals(this.address)) { + LOG.info("Current master has this master's address, " + currentMaster + + "; master was restarted? Waiting on znode to expire..."); + // Hurry along the expiration of the znode. + ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode); + } else { + LOG.info("Another master is the active master, " + currentMaster + + "; waiting to become the next active master"); } } catch (KeeperException ke) { master.abort("Received an unexpected KeeperException, aborting", ke); return false; } - // There is another active master, this is not a cluster startup - // and we must wait until the active master dies - LOG.info("Another master is already the active master, waiting to become " + - "the next active master"); - clusterHasActiveMaster.set(true); - thisMasterStartedCluster = false; - synchronized(clusterHasActiveMaster) { - while(clusterHasActiveMaster.get() && !master.isStopped()) { + synchronized (this.clusterHasActiveMaster) { + while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) { try { - clusterHasActiveMaster.wait(); + this.clusterHasActiveMaster.wait(); } catch (InterruptedException e) { // We expect to be interrupted when a master dies, will fall out if so LOG.debug("Interrupted waiting for master to die", e); } } - if(master.isStopped()) { - return thisMasterStartedCluster; + if (this.master.isStopped()) { + return cleanSetOfActiveMaster; } // Try to become active master again now that there is no active master blockUntilBecomingActiveMaster(); } - return thisMasterStartedCluster; + return cleanSetOfActiveMaster; } /** diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c1b80eb12b7..fcd2bfce14d 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -145,12 +145,14 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; - // True if this is the master that started the cluster. - boolean clusterStarter; + // True if this a cluster startup as opposed to a master joining an already + // running cluster + boolean freshClusterStart; - // This flag is for stopping this Master instance. - private boolean stopped = false; - // Set on abort -- usually failure of our zk session + // This flag is for stopping this Master instance. Its set when we are + // stopping or aborting + private volatile boolean stopped = false; + // Set on abort -- usually failure of our zk session. private volatile boolean abort = false; // Instance of the hbase executor service. @@ -183,12 +185,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { HServerAddress a = new HServerAddress(getMyAddress(this.conf)); int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10); this.rpcServer = HBaseRPC.getServer(this, - new Class[]{HMasterInterface.class, HMasterRegionInterface.class}, - a.getBindAddress(), a.getPort(), - numHandlers, - 0, // we dont use high priority handlers in master - false, conf, - 0); // this is a DNC w/o high priority handlers + new Class[]{HMasterInterface.class, HMasterRegionInterface.class}, + a.getBindAddress(), a.getPort(), + numHandlers, + 0, // we dont use high priority handlers in master + false, conf, + 0); // this is a DNC w/o high priority handlers this.address = new HServerAddress(rpcServer.getListenerAddress()); // set the thread name now we have an address @@ -214,8 +216,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + "-" + getMasterAddress(), this); - this.clusterStarter = 0 == - ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode); + // Are there regionservers running already? + boolean regionservers = + 0 == ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode); /* * 3. Block on becoming the active master. @@ -229,26 +232,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); this.zooKeeper.registerListener(activeMasterManager); - - // If we're a backup master, stall until a primary to writes his address - if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, - HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { - // This will only be a minute or so while the cluster starts up, - // so don't worry about setting watches on the parent znode - while (!this.activeMasterManager.isActiveMaster()) { - try { - LOG.debug("Waiting for master address ZNode to be written " + - "(Also watching cluster state node)"); - Thread.sleep(conf.getInt("zookeeper.session.timeout", 60 * 1000)); - } catch (InterruptedException e) { - // interrupted = user wants to kill us. Don't continue - throw new IOException("Interrupted waiting for master address"); - } - } - } + stallIfBackupMaster(this.conf, this.activeMasterManager); // Wait here until we are the active master - clusterStarter = activeMasterManager.blockUntilBecomingActiveMaster(); + activeMasterManager.blockUntilBecomingActiveMaster(); /** * 4. We are active master now... go initialize components we need to run. @@ -272,16 +259,41 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.serverManager); regionServerTracker.start(); - // Set the cluster as up. + // Set the cluster as up. If new RSs, they'll be waiting on this before + // going ahead with their startup. this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); + + this.freshClusterStart = + !this.clusterStatusTracker.isClusterUp() && !regionservers; this.clusterStatusTracker.setClusterUp(); this.clusterStatusTracker.start(); LOG.info("Server active/primary master; " + this.address + - "; clusterStarter=" + this.clusterStarter + ", sessionid=0x" + + "; freshClusterStart=" + this.freshClusterStart + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); } + /** + * Stall startup if we are designated a backup master. + * @param c + * @param amm + * @throws InterruptedException + */ + private static void stallIfBackupMaster(final Configuration c, + final ActiveMasterManager amm) + throws InterruptedException { + // If we're a backup master, stall until a primary to writes his address + if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP, + HConstants.DEFAULT_MASTER_TYPE_BACKUP)) return; + // This will only be a minute or so while the cluster starts up, + // so don't worry about setting watches on the parent znode + while (!amm.isActiveMaster()) { + LOG.debug("Waiting for master address ZNode to be written " + + "(Also watching cluster state node)"); + Thread.sleep(c.getInt("zookeeper.session.timeout", 60 * 1000)); + } + } + /** * Main processing loop for the HMaster. * 1. Handle both fresh cluster start as well as failed over initialization of @@ -297,20 +309,22 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { startServiceThreads(); // wait for minimum number of region servers to be up this.serverManager.waitForMinServers(); - // start assignment of user regions, startup or failure - if (this.clusterStarter) { - clusterStarterInitializations(this.fileSystemManager, + + // Start assignment of user regions, startup or failure + if (!this.stopped) { + if (this.freshClusterStart) { + clusterStarterInitializations(this.fileSystemManager, this.serverManager, this.catalogTracker, this.assignmentManager); - } else { - // Process existing unassigned nodes in ZK, read all regions from META, - // rebuild in-memory state. - this.assignmentManager.processFailover(); + } else { + // Process existing unassigned nodes in ZK, read all regions from META, + // rebuild in-memory state. + this.assignmentManager.processFailover(); + } } + // Check if we should stop every second. Sleeper sleeper = new Sleeper(1000, this); - while (!this.stopped && !this.abort) { - sleeper.sleep(); - } + while (!this.stopped) sleeper.sleep(); } catch (Throwable t) { abort("Unhandled exception. Starting shutdown.", t); } @@ -795,6 +809,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { if (t != null) LOG.fatal(msg, t); else LOG.fatal(msg); this.abort = true; + stop("Aborting"); } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 528bb9dac0d..fc9b73e661c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -127,6 +127,7 @@ public class ServerManager { this.services = services; Configuration c = master.getConfiguration(); int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000); + // TODO: Fix. this.minimumServerCount = c.getInt("hbase.regions.server.count.min", 1); this.metrics = new MasterMetrics(master.getServerName()); this.serverMonitorThread = new ServerMonitor(monitorInterval, master); @@ -220,8 +221,8 @@ public class ServerManager { info.setLoad(load); // TODO: Why did we update the RS location ourself? Shouldn't RS do this? // masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher); - onlineServers.put(serverName, info); - if(hri == null) { + this.onlineServers.put(serverName, info); + if (hri == null) { serverConnections.remove(serverName); } else { serverConnections.put(serverName, hri); @@ -549,10 +550,9 @@ public class ServerManager { * Waits for the minimum number of servers to be running. */ public void waitForMinServers() { - while(numServers() < minimumServerCount) { -// !masterStatus.getShutdownRequested().get()) { + while (numServers() < minimumServerCount && !this.master.isStopped()) { LOG.info("Waiting for enough servers to check in. Currently have " + - numServers() + " but need at least " + minimumServerCount); + numServers() + " but need at least " + minimumServerCount); try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 1595d234b97..b1bba6dfde9 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -554,7 +554,7 @@ public class ZKUtil { String znode) throws KeeperException { byte [] data = getDataAndWatch(zkw, znode); - if(data == null) { + if (data == null) { return null; } String addrString = Bytes.toString(data); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java index 030bc127c4c..1a199411e6b 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.concurrent.Semaphore; import org.apache.commons.logging.Log; @@ -57,6 +58,39 @@ public class TestActiveMasterManager { TEST_UTIL.shutdownMiniZKCluster(); } + @Test public void testRestartMaster() throws IOException, KeeperException { + ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "testActiveMasterManagerFromZK", null); + ZKUtil.createAndFailSilent(zk, zk.baseZNode); + try { + ZKUtil.deleteNode(zk, zk.masterAddressZNode); + } catch(KeeperException.NoNodeException nne) {} + + // Create the master node with a dummy address + HServerAddress master = new HServerAddress("localhost", 1); + // Should not have a master yet + DummyMaster dummyMaster = new DummyMaster(); + ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk, + master, dummyMaster); + zk.registerListener(activeMasterManager); + assertFalse(activeMasterManager.clusterHasActiveMaster.get()); + + // First test becoming the active master uninterrupted + activeMasterManager.blockUntilBecomingActiveMaster(); + assertTrue(activeMasterManager.clusterHasActiveMaster.get()); + assertMaster(zk, master); + + // Now pretend master restart + DummyMaster secondDummyMaster = new DummyMaster(); + ActiveMasterManager secondActiveMasterManager = new ActiveMasterManager(zk, + master, secondDummyMaster); + zk.registerListener(secondActiveMasterManager); + assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get()); + activeMasterManager.blockUntilBecomingActiveMaster(); + assertTrue(activeMasterManager.clusterHasActiveMaster.get()); + assertMaster(zk, master); + } + /** * Unit tests that uses ZooKeeper but does not use the master-side methods * but rather acts directly on ZK. @@ -64,22 +98,21 @@ public class TestActiveMasterManager { */ @Test public void testActiveMasterManagerFromZK() throws Exception { - ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), - "testActiveMasterManagerFromZK", null); + "testActiveMasterManagerFromZK", null); ZKUtil.createAndFailSilent(zk, zk.baseZNode); try { ZKUtil.deleteNode(zk, zk.masterAddressZNode); } catch(KeeperException.NoNodeException nne) {} // Create the master node with a dummy address - HServerAddress firstMasterAddress = new HServerAddress("firstMaster", 1234); - HServerAddress secondMasterAddress = new HServerAddress("secondMaster", 1234); + HServerAddress firstMasterAddress = new HServerAddress("localhost", 1); + HServerAddress secondMasterAddress = new HServerAddress("localhost", 2); // Should not have a master yet DummyMaster ms1 = new DummyMaster(); ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk, - firstMasterAddress, ms1); + firstMasterAddress, ms1); zk.registerListener(activeMasterManager); assertFalse(activeMasterManager.clusterHasActiveMaster.get()); @@ -132,6 +165,9 @@ public class TestActiveMasterManager { assertTrue(t.manager.clusterHasActiveMaster.get()); assertTrue(t.isActiveMaster); + + LOG.info("Deleting master node"); + ZKUtil.deleteNode(zk, zk.masterAddressZNode); } /**