HBASE-3694 high multiput latency due to checking global mem store size in a synchronized function
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1088703 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
44309ae90d
commit
ed64718369
|
@ -127,6 +127,8 @@ Release 0.91.0 - Unreleased
|
|||
(Ted Yu via Stack)
|
||||
HBASE-3704 Show per region request count in table.jsp
|
||||
(Ted Yu via Stack)
|
||||
HBASE-3694 high multiput latency due to checking global mem store size
|
||||
in a synchronized function (Liyin Tang via Stack)
|
||||
|
||||
TASK
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -424,6 +424,28 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public AtomicLong getMemstoreSize() {
|
||||
return memstoreSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase the size of mem store in this region and the size of global mem
|
||||
* store
|
||||
* @param memStoreSize
|
||||
* @return the size of memstore in this region
|
||||
*/
|
||||
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
|
||||
if (this.rsServices != null) {
|
||||
RegionServerAccounting rsAccounting =
|
||||
this.rsServices.getRegionServerAccounting();
|
||||
|
||||
if (rsAccounting != null) {
|
||||
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
|
||||
}
|
||||
}
|
||||
return this.memstoreSize.getAndAdd(memStoreSize);
|
||||
}
|
||||
|
||||
/*
|
||||
* Write out an info file under the region directory. Useful recovering
|
||||
|
@ -1044,7 +1066,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
storeFlushers.clear();
|
||||
|
||||
// Set down the memstore size by amount of flush.
|
||||
this.memstoreSize.addAndGet(-currentMemStoreSize);
|
||||
this.addAndGetGlobalMemstoreSize(-currentMemStoreSize);
|
||||
} catch (Throwable t) {
|
||||
// An exception here means that the snapshot was not persisted.
|
||||
// The hlog needs to be replayed so its content is restored to memstore.
|
||||
|
@ -1346,7 +1368,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// Now make changes to the memstore.
|
||||
long addedSize = applyFamilyMapToMemstore(familyMap);
|
||||
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
|
||||
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
|
||||
|
||||
if (coprocessorHost != null) {
|
||||
coprocessorHost.postDelete(familyMap, writeToWAL);
|
||||
|
@ -1478,7 +1500,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.writeRequestsCount.increment();
|
||||
try {
|
||||
long addedSize = doMiniBatchPut(batchOp);
|
||||
newSize = memstoreSize.addAndGet(addedSize);
|
||||
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
|
@ -1858,7 +1880,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
long addedSize = applyFamilyMapToMemstore(familyMap);
|
||||
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
|
||||
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
|
@ -2172,7 +2194,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @return True if we should flush.
|
||||
*/
|
||||
protected boolean restoreEdit(final Store s, final KeyValue kv) {
|
||||
return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
|
||||
return isFlushSize(this.addAndGetGlobalMemstoreSize(s.add(kv)));
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -3281,7 +3303,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
walEdits, now);
|
||||
}
|
||||
|
||||
size = this.memstoreSize.addAndGet(size);
|
||||
size = this.addAndGetGlobalMemstoreSize(size);
|
||||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
|
@ -3357,7 +3379,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// returns the change in the size of the memstore from operation
|
||||
long size = store.updateColumnValue(row, family, qualifier, result);
|
||||
|
||||
size = this.memstoreSize.addAndGet(size);
|
||||
size = this.addAndGetGlobalMemstoreSize(size);
|
||||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
|
|
|
@ -273,6 +273,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
// Replication services. If no replication, this handler will be null.
|
||||
private Replication replicationHandler;
|
||||
|
||||
private final RegionServerAccounting regionServerAccounting;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location
|
||||
*
|
||||
|
@ -351,6 +353,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
// login the server principal (if using secure Hadoop)
|
||||
User.login(conf, "hbase.regionserver.keytab.file",
|
||||
"hbase.regionserver.kerberos.principal", serverInfo.getHostname());
|
||||
|
||||
regionServerAccounting = new RegionServerAccounting();
|
||||
}
|
||||
|
||||
private static final int NORMAL_QOS = 0;
|
||||
|
@ -883,6 +887,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
|
||||
public RegionServerAccounting getRegionServerAccounting() {
|
||||
return regionServerAccounting;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param r Region to get RegionLoad for.
|
||||
*
|
||||
|
@ -2523,19 +2531,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
return this.outboundMsgs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the total size of all memstores in every region.
|
||||
*
|
||||
* @return memstore size in bytes
|
||||
*/
|
||||
public long getGlobalMemStoreSize() {
|
||||
long total = 0;
|
||||
for (HRegion region : onlineRegions.values()) {
|
||||
total += region.memstoreSize.get();
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return the leases.
|
||||
*/
|
||||
|
|
|
@ -290,14 +290,16 @@ class MemStoreFlusher extends Thread implements FlushRequester {
|
|||
* Return true if global memory usage is above the high watermark
|
||||
*/
|
||||
private boolean isAboveHighWaterMark() {
|
||||
return server.getGlobalMemStoreSize() >= globalMemStoreLimit;
|
||||
return server.getRegionServerAccounting().
|
||||
getGlobalMemstoreSize() >= globalMemStoreLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if we're above the high watermark
|
||||
*/
|
||||
private boolean isAboveLowWaterMark() {
|
||||
return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark;
|
||||
return server.getRegionServerAccounting().
|
||||
getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
|
||||
}
|
||||
|
||||
public void requestFlush(HRegion r) {
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* RegionServerAccounting keeps record of some basic real time information about
|
||||
* the Region Server. Currently, it only keeps record the global memstore size.
|
||||
*/
|
||||
public class RegionServerAccounting {
|
||||
|
||||
private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* @return the global Memstore size in the RegionServer
|
||||
*/
|
||||
public long getGlobalMemstoreSize() {
|
||||
return atomicGlobalMemstoreSize.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param memStoreSize the Memstore size will be added to
|
||||
* the global Memstore size
|
||||
* @return the global Memstore size in the RegionServer
|
||||
*/
|
||||
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
|
||||
return atomicGlobalMemstoreSize.addAndGet(memStoreSize);
|
||||
}
|
||||
|
||||
}
|
|
@ -65,6 +65,11 @@ public interface RegionServerServices extends OnlineRegions {
|
|||
* @return The HServerInfo for this RegionServer.
|
||||
*/
|
||||
public HServerInfo getServerInfo();
|
||||
|
||||
/**
|
||||
* @return the RegionServerAccounting for this Region Server
|
||||
*/
|
||||
public RegionServerAccounting getRegionServerAccounting();
|
||||
|
||||
/**
|
||||
* Tasks to perform after region open to complete deploy of region on
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional infomation
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test HBASE-3694 whether the GlobalMemStoreSize is the same as the summary
|
||||
* of all the online region's MemStoreSize
|
||||
*/
|
||||
public class TestGlobalMemStoreSize {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static int regionServerNum =4;
|
||||
private static int regionNum = 16;
|
||||
// total region num = region num + root and meta regions
|
||||
private static int totalRegionNum = regionNum+2;
|
||||
|
||||
private HBaseTestingUtility TEST_UTIL;
|
||||
private MiniHBaseCluster cluster;
|
||||
|
||||
/**
|
||||
* Test the global mem store size in the region server is equal to sum of each
|
||||
* region's mem store size
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testGlobalMemStore() throws Exception {
|
||||
// Start the cluster
|
||||
LOG.info("Starting cluster");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
|
||||
TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
TEST_UTIL.startMiniCluster(1, regionServerNum);
|
||||
cluster = TEST_UTIL.getHBaseCluster();
|
||||
LOG.info("Waiting for active/ready master");
|
||||
cluster.waitForActiveAndReadyMaster();
|
||||
|
||||
// Create a table with regions
|
||||
byte [] table = Bytes.toBytes("TestGlobalMemStoreSize");
|
||||
byte [] family = Bytes.toBytes("family");
|
||||
LOG.info("Creating table with " + regionNum + " regions");
|
||||
HTable ht = TEST_UTIL.createTable(table, family);
|
||||
int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family,
|
||||
regionNum);
|
||||
assertEquals(regionNum,numRegions);
|
||||
waitForAllRegionsAssigned();
|
||||
|
||||
for (HRegionServer server : getOnlineRegionServers()) {
|
||||
long globalMemStoreSize = 0;
|
||||
for(HRegionInfo regionInfo : server.getOnlineRegions()) {
|
||||
globalMemStoreSize +=
|
||||
server.getFromOnlineRegions(regionInfo.getEncodedName()).
|
||||
getMemstoreSize().get();
|
||||
}
|
||||
assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(),
|
||||
globalMemStoreSize);
|
||||
}
|
||||
|
||||
// check the global memstore size after flush
|
||||
for (HRegionServer server : getOnlineRegionServers()) {
|
||||
for(HRegionInfo regionInfo : server.getOnlineRegions()) {
|
||||
HRegion region=
|
||||
server.getFromOnlineRegions(regionInfo.getEncodedName());
|
||||
region.flushcache();
|
||||
}
|
||||
assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(),
|
||||
0);
|
||||
}
|
||||
}
|
||||
|
||||
/** figure out how many regions are currently being served. */
|
||||
private int getRegionCount() {
|
||||
int total = 0;
|
||||
for (HRegionServer server : getOnlineRegionServers()) {
|
||||
total += server.getOnlineRegions().size();
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
private List<HRegionServer> getOnlineRegionServers() {
|
||||
List<HRegionServer> list = new ArrayList<HRegionServer>();
|
||||
for (JVMClusterUtil.RegionServerThread rst :
|
||||
cluster.getRegionServerThreads()) {
|
||||
if (rst.getRegionServer().isOnline()) {
|
||||
list.add(rst.getRegionServer());
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until all the regions are assigned.
|
||||
*/
|
||||
private void waitForAllRegionsAssigned() {
|
||||
while (getRegionCount() < totalRegionNum) {
|
||||
LOG.debug("Waiting for there to be "+totalRegionNum+" regions, but there are " + getRegionCount() + " right now.");
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
|
|||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
|
@ -181,6 +182,10 @@ public class TestOpenRegionHandler {
|
|||
public ZooKeeperWatcher getZooKeeperWatcher() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public RegionServerAccounting getRegionServerAccounting() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue