HBASE-3006 Reading compressed HFile blocks causes way too many DFS RPC calls severly impacting performance--Revert mistaken over commit
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cac06d4573
commit
96f8cbe9e2
|
@ -523,9 +523,6 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2986 multi writable can npe causing client hang
|
||||
HBASE-2979 Fix failing TestMultParrallel in hudson build
|
||||
HBASE-2899 hfile.min.blocksize.size ignored/documentation wrong
|
||||
HBASE-3006 Reading compressed HFile blocks causes way too many DFS RPC
|
||||
calls severly impacting performance
|
||||
(Kannan Muthukkaruppan via Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.Closeable;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
|
@ -1052,15 +1051,10 @@ public class HFile {
|
|||
// decompressor reading into next block -- IIRC, it just grabs a
|
||||
// bunch of data w/o regard to whether decompressor is coming to end of a
|
||||
// decompression.
|
||||
|
||||
// We use a buffer of DEFAULT_BLOCKSIZE size. This might be extreme.
|
||||
// Could maybe do with less. Study and figure it: TODO
|
||||
InputStream is = this.compressAlgo.createDecompressionStream(
|
||||
new BufferedInputStream(
|
||||
new BoundedRangeFileInputStream(this.istream, offset, compressedSize,
|
||||
pread),
|
||||
Math.min(DEFAULT_BLOCKSIZE, compressedSize)),
|
||||
decompressor, 0);
|
||||
new BoundedRangeFileInputStream(this.istream, offset, compressedSize,
|
||||
pread),
|
||||
decompressor, 0);
|
||||
buf = ByteBuffer.allocate(decompressedSize);
|
||||
IOUtils.readFully(is, buf.array(), 0, buf.capacity());
|
||||
is.close();
|
||||
|
|
|
@ -95,7 +95,7 @@ class ActiveMasterManager extends ZooKeeperListener {
|
|||
clusterHasActiveMaster.set(true);
|
||||
} else {
|
||||
// Node is no longer there, cluster does not have an active master
|
||||
LOG.debug("No master available. Notifying waiting threads");
|
||||
LOG.debug("No master available. notifying waiting threads");
|
||||
clusterHasActiveMaster.set(false);
|
||||
// Notify any thread waiting to become the active master
|
||||
clusterHasActiveMaster.notifyAll();
|
||||
|
@ -114,56 +114,46 @@ class ActiveMasterManager extends ZooKeeperListener {
|
|||
*
|
||||
* This also makes sure that we are watching the master znode so will be
|
||||
* notified if another master dies.
|
||||
* @return True if no issue becoming active master else false if another
|
||||
* master was running or if some other problem (zookeeper, stop flag has been
|
||||
* set on this Master)
|
||||
* @return False if we did not start up this cluster, another
|
||||
* master did, or if a problem (zookeeper, stop flag has been set on this
|
||||
* Master)
|
||||
*/
|
||||
boolean blockUntilBecomingActiveMaster() {
|
||||
boolean cleanSetOfActiveMaster = true;
|
||||
boolean thisMasterStartedCluster = true;
|
||||
// Try to become the active master, watch if there is another master
|
||||
try {
|
||||
if (ZKUtil.setAddressAndWatch(this.watcher,
|
||||
this.watcher.masterAddressZNode, this.address)) {
|
||||
if(ZKUtil.setAddressAndWatch(watcher, watcher.masterAddressZNode,
|
||||
address)) {
|
||||
// We are the master, return
|
||||
this.clusterHasActiveMaster.set(true);
|
||||
return cleanSetOfActiveMaster;
|
||||
}
|
||||
|
||||
// There is another active master running elsewhere or this is a restart
|
||||
// and the master ephemeral node has not expired yet.
|
||||
this.clusterHasActiveMaster.set(true);
|
||||
cleanSetOfActiveMaster = false;
|
||||
HServerAddress currentMaster =
|
||||
ZKUtil.getDataAsAddress(this.watcher, this.watcher.masterAddressZNode);
|
||||
if (currentMaster != null && currentMaster.equals(this.address)) {
|
||||
LOG.info("Current master has this master's address, " + currentMaster +
|
||||
"; master was restarted? Waiting on znode to expire...");
|
||||
// Hurry along the expiration of the znode.
|
||||
ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
|
||||
} else {
|
||||
LOG.info("Another master is the active master, " + currentMaster +
|
||||
"; waiting to become the next active master");
|
||||
clusterHasActiveMaster.set(true);
|
||||
return thisMasterStartedCluster;
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
master.abort("Received an unexpected KeeperException, aborting", ke);
|
||||
return false;
|
||||
}
|
||||
synchronized (this.clusterHasActiveMaster) {
|
||||
while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
|
||||
// There is another active master, this is not a cluster startup
|
||||
// and we must wait until the active master dies
|
||||
LOG.info("Another master is already the active master, waiting to become " +
|
||||
"the next active master");
|
||||
clusterHasActiveMaster.set(true);
|
||||
thisMasterStartedCluster = false;
|
||||
synchronized(clusterHasActiveMaster) {
|
||||
while(clusterHasActiveMaster.get() && !master.isStopped()) {
|
||||
try {
|
||||
this.clusterHasActiveMaster.wait();
|
||||
clusterHasActiveMaster.wait();
|
||||
} catch (InterruptedException e) {
|
||||
// We expect to be interrupted when a master dies, will fall out if so
|
||||
LOG.debug("Interrupted waiting for master to die", e);
|
||||
}
|
||||
}
|
||||
if (this.master.isStopped()) {
|
||||
return cleanSetOfActiveMaster;
|
||||
if(master.isStopped()) {
|
||||
return thisMasterStartedCluster;
|
||||
}
|
||||
// Try to become active master again now that there is no active master
|
||||
blockUntilBecomingActiveMaster();
|
||||
}
|
||||
return cleanSetOfActiveMaster;
|
||||
return thisMasterStartedCluster;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -145,14 +145,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
// Cluster status zk tracker and local setter
|
||||
private ClusterStatusTracker clusterStatusTracker;
|
||||
|
||||
// True if this a cluster startup as opposed to a master joining an already
|
||||
// running cluster
|
||||
boolean freshClusterStart;
|
||||
// True if this is the master that started the cluster.
|
||||
boolean clusterStarter;
|
||||
|
||||
// This flag is for stopping this Master instance. Its set when we are
|
||||
// stopping or aborting
|
||||
private volatile boolean stopped = false;
|
||||
// Set on abort -- usually failure of our zk session.
|
||||
// This flag is for stopping this Master instance.
|
||||
private boolean stopped = false;
|
||||
// Set on abort -- usually failure of our zk session
|
||||
private volatile boolean abort = false;
|
||||
|
||||
// Instance of the hbase executor service.
|
||||
|
@ -185,12 +183,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
HServerAddress a = new HServerAddress(getMyAddress(this.conf));
|
||||
int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
|
||||
this.rpcServer = HBaseRPC.getServer(this,
|
||||
new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
|
||||
a.getBindAddress(), a.getPort(),
|
||||
numHandlers,
|
||||
0, // we dont use high priority handlers in master
|
||||
false, conf,
|
||||
0); // this is a DNC w/o high priority handlers
|
||||
new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
|
||||
a.getBindAddress(), a.getPort(),
|
||||
numHandlers,
|
||||
0, // we dont use high priority handlers in master
|
||||
false, conf,
|
||||
0); // this is a DNC w/o high priority handlers
|
||||
this.address = new HServerAddress(rpcServer.getListenerAddress());
|
||||
|
||||
// set the thread name now we have an address
|
||||
|
@ -216,9 +214,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
this.zooKeeper =
|
||||
new ZooKeeperWatcher(conf, MASTER + "-" + getMasterAddress(), this);
|
||||
|
||||
// Are there regionservers running already?
|
||||
boolean regionservers =
|
||||
0 == ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
|
||||
this.clusterStarter = 0 ==
|
||||
ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
|
||||
|
||||
/*
|
||||
* 3. Block on becoming the active master.
|
||||
|
@ -232,10 +229,26 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this);
|
||||
this.zooKeeper.registerListener(activeMasterManager);
|
||||
|
||||
stallIfBackupMaster(this.conf, this.activeMasterManager);
|
||||
|
||||
// If we're a backup master, stall until a primary to writes his address
|
||||
if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
|
||||
HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
|
||||
// This will only be a minute or so while the cluster starts up,
|
||||
// so don't worry about setting watches on the parent znode
|
||||
while (!this.activeMasterManager.isActiveMaster()) {
|
||||
try {
|
||||
LOG.debug("Waiting for master address ZNode to be written " +
|
||||
"(Also watching cluster state node)");
|
||||
Thread.sleep(conf.getInt("zookeeper.session.timeout", 60 * 1000));
|
||||
} catch (InterruptedException e) {
|
||||
// interrupted = user wants to kill us. Don't continue
|
||||
throw new IOException("Interrupted waiting for master address");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait here until we are the active master
|
||||
activeMasterManager.blockUntilBecomingActiveMaster();
|
||||
clusterStarter = activeMasterManager.blockUntilBecomingActiveMaster();
|
||||
|
||||
/**
|
||||
* 4. We are active master now... go initialize components we need to run.
|
||||
|
@ -259,41 +272,16 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
this.serverManager);
|
||||
regionServerTracker.start();
|
||||
|
||||
// Set the cluster as up. If new RSs, they'll be waiting on this before
|
||||
// going ahead with their startup.
|
||||
// Set the cluster as up.
|
||||
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
|
||||
|
||||
this.freshClusterStart =
|
||||
!this.clusterStatusTracker.isClusterUp() && !regionservers;
|
||||
this.clusterStatusTracker.setClusterUp();
|
||||
this.clusterStatusTracker.start();
|
||||
|
||||
LOG.info("Server active/primary master; " + this.address +
|
||||
"; freshClusterStart=" + this.freshClusterStart + ", sessionid=0x" +
|
||||
"; clusterStarter=" + this.clusterStarter + ", sessionid=0x" +
|
||||
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Stall startup if we are designated a backup master.
|
||||
* @param c
|
||||
* @param amm
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private static void stallIfBackupMaster(final Configuration c,
|
||||
final ActiveMasterManager amm)
|
||||
throws InterruptedException {
|
||||
// If we're a backup master, stall until a primary to writes his address
|
||||
if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
|
||||
HConstants.DEFAULT_MASTER_TYPE_BACKUP)) return;
|
||||
// This will only be a minute or so while the cluster starts up,
|
||||
// so don't worry about setting watches on the parent znode
|
||||
while (!amm.isActiveMaster()) {
|
||||
LOG.debug("Waiting for master address ZNode to be written " +
|
||||
"(Also watching cluster state node)");
|
||||
Thread.sleep(c.getInt("zookeeper.session.timeout", 60 * 1000));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main processing loop for the HMaster.
|
||||
* 1. Handle both fresh cluster start as well as failed over initialization of
|
||||
|
@ -309,22 +297,20 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
startServiceThreads();
|
||||
// wait for minimum number of region servers to be up
|
||||
this.serverManager.waitForMinServers();
|
||||
|
||||
// Start assignment of user regions, startup or failure
|
||||
if (!this.stopped) {
|
||||
if (this.freshClusterStart) {
|
||||
clusterStarterInitializations(this.fileSystemManager,
|
||||
// start assignment of user regions, startup or failure
|
||||
if (this.clusterStarter) {
|
||||
clusterStarterInitializations(this.fileSystemManager,
|
||||
this.serverManager, this.catalogTracker, this.assignmentManager);
|
||||
} else {
|
||||
// Process existing unassigned nodes in ZK, read all regions from META,
|
||||
// rebuild in-memory state.
|
||||
this.assignmentManager.processFailover();
|
||||
}
|
||||
} else {
|
||||
// Process existing unassigned nodes in ZK, read all regions from META,
|
||||
// rebuild in-memory state.
|
||||
this.assignmentManager.processFailover();
|
||||
}
|
||||
|
||||
// Check if we should stop every second.
|
||||
Sleeper sleeper = new Sleeper(1000, this);
|
||||
while (!this.stopped) sleeper.sleep();
|
||||
while (!this.stopped && !this.abort) {
|
||||
sleeper.sleep();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
abort("Unhandled exception. Starting shutdown.", t);
|
||||
}
|
||||
|
@ -809,7 +795,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
if (t != null) LOG.fatal(msg, t);
|
||||
else LOG.fatal(msg);
|
||||
this.abort = true;
|
||||
stop("Aborting");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -127,7 +127,6 @@ public class ServerManager {
|
|||
this.services = services;
|
||||
Configuration c = master.getConfiguration();
|
||||
int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000);
|
||||
// TODO: Fix.
|
||||
this.minimumServerCount = c.getInt("hbase.regions.server.count.min", 1);
|
||||
this.metrics = new MasterMetrics(master.getServerName());
|
||||
this.serverMonitorThread = new ServerMonitor(monitorInterval, master);
|
||||
|
@ -221,8 +220,8 @@ public class ServerManager {
|
|||
info.setLoad(load);
|
||||
// TODO: Why did we update the RS location ourself? Shouldn't RS do this?
|
||||
// masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
|
||||
this.onlineServers.put(serverName, info);
|
||||
if (hri == null) {
|
||||
onlineServers.put(serverName, info);
|
||||
if(hri == null) {
|
||||
serverConnections.remove(serverName);
|
||||
} else {
|
||||
serverConnections.put(serverName, hri);
|
||||
|
@ -550,9 +549,10 @@ public class ServerManager {
|
|||
* Waits for the minimum number of servers to be running.
|
||||
*/
|
||||
public void waitForMinServers() {
|
||||
while (numServers() < minimumServerCount && !this.master.isStopped()) {
|
||||
while(numServers() < minimumServerCount) {
|
||||
// !masterStatus.getShutdownRequested().get()) {
|
||||
LOG.info("Waiting for enough servers to check in. Currently have " +
|
||||
numServers() + " but need at least " + minimumServerCount);
|
||||
numServers() + " but need at least " + minimumServerCount);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -554,7 +554,7 @@ public class ZKUtil {
|
|||
String znode)
|
||||
throws KeeperException {
|
||||
byte [] data = getDataAndWatch(zkw, znode);
|
||||
if (data == null) {
|
||||
if(data == null) {
|
||||
return null;
|
||||
}
|
||||
String addrString = Bytes.toString(data);
|
||||
|
|
|
@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -58,39 +57,6 @@ public class TestActiveMasterManager {
|
|||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
@Test public void testRestartMaster() throws IOException, KeeperException {
|
||||
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
"testActiveMasterManagerFromZK", null);
|
||||
ZKUtil.createAndFailSilent(zk, zk.baseZNode);
|
||||
try {
|
||||
ZKUtil.deleteNode(zk, zk.masterAddressZNode);
|
||||
} catch(KeeperException.NoNodeException nne) {}
|
||||
|
||||
// Create the master node with a dummy address
|
||||
HServerAddress master = new HServerAddress("localhost", 1);
|
||||
// Should not have a master yet
|
||||
DummyMaster dummyMaster = new DummyMaster();
|
||||
ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
|
||||
master, dummyMaster);
|
||||
zk.registerListener(activeMasterManager);
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
|
||||
// First test becoming the active master uninterrupted
|
||||
activeMasterManager.blockUntilBecomingActiveMaster();
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
|
||||
// Now pretend master restart
|
||||
DummyMaster secondDummyMaster = new DummyMaster();
|
||||
ActiveMasterManager secondActiveMasterManager = new ActiveMasterManager(zk,
|
||||
master, secondDummyMaster);
|
||||
zk.registerListener(secondActiveMasterManager);
|
||||
assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
|
||||
activeMasterManager.blockUntilBecomingActiveMaster();
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit tests that uses ZooKeeper but does not use the master-side methods
|
||||
* but rather acts directly on ZK.
|
||||
|
@ -98,21 +64,22 @@ public class TestActiveMasterManager {
|
|||
*/
|
||||
@Test
|
||||
public void testActiveMasterManagerFromZK() throws Exception {
|
||||
|
||||
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
"testActiveMasterManagerFromZK", null);
|
||||
"testActiveMasterManagerFromZK", null);
|
||||
ZKUtil.createAndFailSilent(zk, zk.baseZNode);
|
||||
try {
|
||||
ZKUtil.deleteNode(zk, zk.masterAddressZNode);
|
||||
} catch(KeeperException.NoNodeException nne) {}
|
||||
|
||||
// Create the master node with a dummy address
|
||||
HServerAddress firstMasterAddress = new HServerAddress("localhost", 1);
|
||||
HServerAddress secondMasterAddress = new HServerAddress("localhost", 2);
|
||||
HServerAddress firstMasterAddress = new HServerAddress("firstMaster", 1234);
|
||||
HServerAddress secondMasterAddress = new HServerAddress("secondMaster", 1234);
|
||||
|
||||
// Should not have a master yet
|
||||
DummyMaster ms1 = new DummyMaster();
|
||||
ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
|
||||
firstMasterAddress, ms1);
|
||||
firstMasterAddress, ms1);
|
||||
zk.registerListener(activeMasterManager);
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
|
||||
|
@ -165,9 +132,6 @@ public class TestActiveMasterManager {
|
|||
|
||||
assertTrue(t.manager.clusterHasActiveMaster.get());
|
||||
assertTrue(t.isActiveMaster);
|
||||
|
||||
LOG.info("Deleting master node");
|
||||
ZKUtil.deleteNode(zk, zk.masterAddressZNode);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -61,7 +61,7 @@ public class TestCompaction extends HBaseTestCase {
|
|||
|
||||
// Set cache flush size to 1MB
|
||||
conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024);
|
||||
conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
|
||||
conf.setInt("hbase.hregion.memstore.block.multiplier", 10);
|
||||
this.cluster = null;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue