HBASE-13316 Reduce the downtime on planned moves of regions

Summary:
The current behavior of a region move shuts down a region and then starts is up in another regionserver. This causes increased latency and possibly timeouts till the new region's cache is fully warmed up. We can make a region move less disruptive by warming the cache in the destination region server before shutting dow the old region.

See https://issues.apache.org/jira/browse/HBASE-13316

Test Plan:
1. Unit Tests
2. Added test for concurrent moves and warmups
3. Manually tested reads/writes happening with concurrent moves

Subscribers: tedyu

Differential Revision: https://reviews.facebook.net/D35967

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Sameet Agarwal 2015-03-12 10:56:58 -07:00 committed by Elliott Clark
parent 5d2c33158c
commit ba6345f7d1
11 changed files with 1477 additions and 148 deletions

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsReques
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -1728,6 +1729,26 @@ public final class ProtobufUtil {
}
}
/**
* A helper to warmup a region given a region name
* using admin protocol
*
* @param admin
* @param regionInfo
*
*/
public static void warmupRegion(final AdminService.BlockingInterface admin,
final HRegionInfo regionInfo) throws IOException {
try {
WarmupRegionRequest warmupRegionRequest =
RequestConverter.buildWarmupRegionRequest(regionInfo);
admin.warmupRegion(null, warmupRegionRequest);
} catch (ServiceException e) {
throw getRemoteException(e);
}
}
/**
* A helper to open a region using admin protocol.
@ -1746,6 +1767,7 @@ public final class ProtobufUtil {
}
}
/**
* A helper to get the all the online regions on a region
* server using admin protocol.

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -849,6 +850,16 @@ public final class RequestConverter {
return builder.build();
}
/**
* Create a WarmupRegionRequest for a given region name
*
* @param regionInfo Region we are warming up
*/
public static WarmupRegionRequest buildWarmupRegionRequest(final HRegionInfo regionInfo) {
WarmupRegionRequest.Builder builder = WarmupRegionRequest.newBuilder();
builder.setRegionInfo(HRegionInfo.convert(regionInfo));
return builder.build();
}
/**
* Create a CloseRegionRequest for a given encoded region name
*

View File

@ -90,6 +90,14 @@ message OpenRegionResponse {
}
}
message WarmupRegionRequest {
required RegionInfo regionInfo = 1;
}
message WarmupRegionResponse {
}
/**
* Closes the specified region and will use or not use ZK during the close
* according to the specified flag.
@ -253,6 +261,9 @@ service AdminService {
rpc OpenRegion(OpenRegionRequest)
returns(OpenRegionResponse);
rpc WarmupRegion(WarmupRegionRequest)
returns(WarmupRegionResponse);
rpc CloseRegion(CloseRegionRequest)
returns(CloseRegionResponse);

View File

@ -2046,6 +2046,7 @@ public class AssignmentManager {
* @param plan Plan to execute.
*/
public void balance(final RegionPlan plan) {
HRegionInfo hri = plan.getRegionInfo();
TableName tableName = hri.getTable();
if (tableStateManager.isTableState(tableName,

View File

@ -1264,6 +1264,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
return;
}
}
// warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
serverManager.sendRegionWarmup(rp.getDestination(), hri);
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
this.assignmentManager.balance(rp);
if (this.cpHost != null) {

View File

@ -805,6 +805,27 @@ public class ServerManager {
return sendRegionClose(server, region, null);
}
/**
* Sends a WARMUP RPC to the specified server to warmup the specified region.
* <p>
* A region server could reject the close request because it either does not
* have the specified region or the region is being split.
* @param server server to warmup a region
* @param region region to warmup
*/
public void sendRegionWarmup(ServerName server,
HRegionInfo region) {
if (server == null) return;
try {
AdminService.BlockingInterface admin = getRsAdmin(server);
ProtobufUtil.warmupRegion(admin, region);
} catch (IOException e) {
LOG.error("Received exception in RPC for warmup server:" +
server + "region: " + region +
"exception: " + e);
}
}
/**
* Contacts a region server and waits up to timeout ms
* to close the region. This bypasses the active hmaster.

View File

@ -844,11 +844,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
status.setStatus("Writing region info on filesystem");
fs.checkRegionInfoOnFilesystem();
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeRegionStores(reporter, status);
long maxSeqId = initializeRegionStores(reporter, status, false);
this.lastReplayedOpenRegionSeqId = maxSeqId;
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
@ -911,8 +909,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return nextSeqid;
}
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status,
boolean warmupOnly)
throws IOException {
// Load in all the HStores.
long maxSeqId = -1;
@ -974,7 +974,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
}
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this) && !warmupOnly) {
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
@ -984,6 +984,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return maxSeqId;
}
private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
// Initialize all the HStores
status.setStatus("Warming up all the Stores");
initializeRegionStores(reporter, status, true);
}
private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
@ -6229,6 +6237,35 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return this;
}
public static void warmupHRegion(final HRegionInfo info,
final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
LOG.debug("HRegion.Warming up region: " + info);
}
Path rootDir = FSUtils.getRootDir(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
FileSystem fs = null;
if (rsServices != null) {
fs = rsServices.getFileSystem();
}
if (fs == null) {
fs = FileSystem.get(conf);
}
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
r.initializeWarmup(reporter);
r.close();
}
private void checkCompressionCodecs() throws IOException {
for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
CompressionTest.testCompression(fam.getCompression());

View File

@ -105,6 +105,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
@ -1432,6 +1434,57 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return builder.build();
}
/**
* Wamrmup a region on this server.
*
* This method should only be called by Master. It synchrnously opens the region and
* closes the region bringing the most important pages in cache.
* <p>
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
public WarmupRegionResponse warmupRegion(final RpcController controller,
final WarmupRegionRequest request) throws ServiceException {
RegionInfo regionInfo = request.getRegionInfo();
final HRegionInfo region = HRegionInfo.convert(regionInfo);
HTableDescriptor htd;
WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
try {
String encodedName = region.getEncodedName();
byte[] encodedNameBytes = region.getEncodedNameAsBytes();
final HRegion onlineRegion = regionServer.getFromOnlineRegions(encodedName);
if (onlineRegion != null) {
LOG.info("Region already online. Skipping warming up " + region);
return response;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Warming up Region " + region.getRegionNameAsString());
}
htd = regionServer.tableDescriptors.get(region.getTable());
if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
LOG.info("Region is in transition. Skipping warmup " + region);
return response;
}
HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
regionServer.getConfiguration(), regionServer, null);
} catch (IOException ie) {
LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
throw new ServiceException(ie);
}
return response;
}
/**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any

View File

@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsReques
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
@ -459,6 +461,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
@Override
public WarmupRegionResponse warmupRegion(RpcController controller,
WarmupRegionRequest request) throws ServiceException {
//TODO Auto-generated method stub
return null;
}
@Override
public CloseRegionResponse closeRegion(RpcController controller,
CloseRegionRequest request) throws ServiceException {

View File

@ -0,0 +1,164 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.regionserver.HRegion.warmupHRegion;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
/**
* Run tests that use the HBase clients; {@link HTable}.
* Sets up the HBase mini cluster once at start and runs through all client tests.
* Each creates a table named for the method and does its stuff against that.
*/
@Category({MasterTests.class, LargeTests.class})
@SuppressWarnings ("deprecation")
public class TestWarmupRegion {
final Log LOG = LogFactory.getLog(getClass());
protected TableName TABLENAME = TableName.valueOf("testPurgeFutureDeletes");
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
private static byte[] COLUMN = Bytes.toBytes("column");
private static int numRows = 10000;
protected static int SLAVES = 3;
private static MiniHBaseCluster myCluster;
private static Table table;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniCluster(SLAVES);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
table = TEST_UTIL.createTable(TABLENAME, FAMILY);
// future timestamp
for (int i = 0; i < numRows; i++) {
long ts = System.currentTimeMillis() * 2;
Put put = new Put(ROW, ts);
put.add(FAMILY, COLUMN, VALUE);
table.put(put);
}
// major compaction, purged future deletes
TEST_UTIL.getHBaseAdmin().flush(TABLENAME);
TEST_UTIL.getHBaseAdmin().majorCompact(TABLENAME);
// waiting for the major compaction to complete
TEST_UTIL.waitFor(6000, new Waiter.Predicate<IOException>() {
@Override
public boolean evaluate() throws IOException {
return TEST_UTIL.getHBaseAdmin().getCompactionState(TABLENAME) ==
AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
}
});
table.close();
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
// Nothing to do.
}
protected void runwarmup() throws InterruptedException{
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
HRegion region = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLENAME).get(0);
HRegionInfo info = region.getRegionInfo();
try {
HTableDescriptor htd = table.getTableDescriptor();
for (int i = 0; i < 10; i++) {
warmupHRegion(info, htd, rs.getWAL(info), rs.getConfiguration(), rs, null);
}
} catch (IOException ie) {
LOG.error("Failed warming up region " + info.getRegionNameAsString(), ie);
}
}
});
thread.start();
thread.join();
}
/**
* Basic client side validation of HBASE-4536
*/
@Test
public void testWarmup() throws Exception {
int serverid = 0;
HRegion region = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLENAME).get(0);
HRegionInfo info = region.getRegionInfo();
runwarmup();
for (int i = 0; i < 10; i++) {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(serverid);
byte [] destName = Bytes.toBytes(rs.getServerName().toString());
TEST_UTIL.getMiniHBaseCluster().getMaster().move(info.getEncodedNameAsBytes(), destName);
serverid = (serverid + 1) % 2;
}
}
}