HBASE-13316 Reduce the downtime on planned moves of regions

Summary:
The current behavior of a region move shuts down a region and then starts is up in another regionserver. This causes increased latency and possibly timeouts till the new region's cache is fully warmed up. We can make a region move less disruptive by warming the cache in the destination region server before shutting dow the old region.

See https://issues.apache.org/jira/browse/HBASE-13316

Test Plan:
1. Unit Tests
2. Added test for concurrent moves and warmups
3. Manually tested reads/writes happening with concurrent moves

Subscribers: tedyu

Differential Revision: https://reviews.facebook.net/D35967

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Sameet Agarwal 2015-03-12 10:56:58 -07:00 committed by Elliott Clark
parent 242f62ffcd
commit 2fd6f2a829
11 changed files with 1476 additions and 148 deletions

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsReques
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -1730,6 +1731,26 @@ public final class ProtobufUtil {
} }
} }
/**
* A helper to warmup a region given a region name
* using admin protocol
*
* @param admin
* @param regionInfo
*
*/
public static void warmupRegion(final AdminService.BlockingInterface admin,
final HRegionInfo regionInfo) throws IOException {
try {
WarmupRegionRequest warmupRegionRequest =
RequestConverter.buildWarmupRegionRequest(regionInfo);
admin.warmupRegion(null, warmupRegionRequest);
} catch (ServiceException e) {
throw getRemoteException(e);
}
}
/** /**
* A helper to open a region using admin protocol. * A helper to open a region using admin protocol.
@ -1748,6 +1769,7 @@ public final class ProtobufUtil {
} }
} }
/** /**
* A helper to get the all the online regions on a region * A helper to get the all the online regions on a region
* server using admin protocol. * server using admin protocol.

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -866,6 +867,16 @@ public final class RequestConverter {
return builder.build(); return builder.build();
} }
/**
* Create a WarmupRegionRequest for a given region name
*
* @param regionInfo Region we are warming up
*/
public static WarmupRegionRequest buildWarmupRegionRequest(final HRegionInfo regionInfo) {
WarmupRegionRequest.Builder builder = WarmupRegionRequest.newBuilder();
builder.setRegionInfo(HRegionInfo.convert(regionInfo));
return builder.build();
}
/** /**
* Create a CloseRegionRequest for a given encoded region name * Create a CloseRegionRequest for a given encoded region name
* *

View File

@ -90,6 +90,14 @@ message OpenRegionResponse {
} }
} }
message WarmupRegionRequest {
required RegionInfo regionInfo = 1;
}
message WarmupRegionResponse {
}
/** /**
* Closes the specified region and will use or not use ZK during the close * Closes the specified region and will use or not use ZK during the close
* according to the specified flag. * according to the specified flag.
@ -253,6 +261,9 @@ service AdminService {
rpc OpenRegion(OpenRegionRequest) rpc OpenRegion(OpenRegionRequest)
returns(OpenRegionResponse); returns(OpenRegionResponse);
rpc WarmupRegion(WarmupRegionRequest)
returns(WarmupRegionResponse);
rpc CloseRegion(CloseRegionRequest) rpc CloseRegion(CloseRegionRequest)
returns(CloseRegionResponse); returns(CloseRegionResponse);

View File

@ -3394,6 +3394,7 @@ public class AssignmentManager extends ZooKeeperListener {
* @param plan Plan to execute. * @param plan Plan to execute.
*/ */
public void balance(final RegionPlan plan) { public void balance(final RegionPlan plan) {
HRegionInfo hri = plan.getRegionInfo(); HRegionInfo hri = plan.getRegionInfo();
TableName tableName = hri.getTable(); TableName tableName = hri.getTable();
if (tableStateManager.isTableState(tableName, if (tableStateManager.isTableState(tableName,

View File

@ -1283,6 +1283,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
return; return;
} }
} }
// warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
serverManager.sendRegionWarmup(rp.getDestination(), hri);
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer"); LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
this.assignmentManager.balance(rp); this.assignmentManager.balance(rp);
if (this.cpHost != null) { if (this.cpHost != null) {

View File

@ -815,6 +815,27 @@ public class ServerManager {
return sendRegionClose(server, region, versionOfClosingNode, null, true); return sendRegionClose(server, region, versionOfClosingNode, null, true);
} }
/**
* Sends a WARMUP RPC to the specified server to warmup the specified region.
* <p>
* A region server could reject the close request because it either does not
* have the specified region or the region is being split.
* @param server server to warmup a region
* @param region region to warmup
*/
public void sendRegionWarmup(ServerName server,
HRegionInfo region) {
if (server == null) return;
try {
AdminService.BlockingInterface admin = getRsAdmin(server);
ProtobufUtil.warmupRegion(admin, region);
} catch (IOException e) {
LOG.error("Received exception in RPC for warmup server:" +
server + "region: " + region +
"exception: " + e);
}
}
/** /**
* Contacts a region server and waits up to timeout ms * Contacts a region server and waits up to timeout ms
* to close the region. This bypasses the active hmaster. * to close the region. This bypasses the active hmaster.

View File

@ -849,11 +849,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
status.setStatus("Writing region info on filesystem"); status.setStatus("Writing region info on filesystem");
fs.checkRegionInfoOnFilesystem(); fs.checkRegionInfoOnFilesystem();
// Initialize all the HStores // Initialize all the HStores
status.setStatus("Initializing all the Stores"); status.setStatus("Initializing all the Stores");
long maxSeqId = initializeRegionStores(reporter, status); long maxSeqId = initializeRegionStores(reporter, status, false);
this.lastReplayedOpenRegionSeqId = maxSeqId; this.lastReplayedOpenRegionSeqId = maxSeqId;
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
@ -916,8 +914,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return nextSeqid; return nextSeqid;
} }
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status) private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status,
boolean warmupOnly)
throws IOException { throws IOException {
// Load in all the HStores. // Load in all the HStores.
long maxSeqId = -1; long maxSeqId = -1;
@ -979,7 +979,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
} }
} }
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this) && !warmupOnly) {
// Recover any edits if available. // Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
@ -989,6 +989,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return maxSeqId; return maxSeqId;
} }
private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
// Initialize all the HStores
status.setStatus("Warming up all the Stores");
initializeRegionStores(reporter, status, true);
}
private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException { private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
Map<byte[], List<Path>> storeFiles Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR); = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
@ -6333,6 +6341,35 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return this; return this;
} }
public static void warmupHRegion(final HRegionInfo info,
final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
LOG.debug("HRegion.Warming up region: " + info);
}
Path rootDir = FSUtils.getRootDir(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
FileSystem fs = null;
if (rsServices != null) {
fs = rsServices.getFileSystem();
}
if (fs == null) {
fs = FileSystem.get(conf);
}
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
r.initializeWarmup(reporter);
r.close();
}
private void checkCompressionCodecs() throws IOException { private void checkCompressionCodecs() throws IOException {
for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) { for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
CompressionTest.testCompression(fam.getCompression()); CompressionTest.testCompression(fam.getCompression());

View File

@ -108,6 +108,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
@ -1452,6 +1454,57 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return builder.build(); return builder.build();
} }
/**
* Wamrmup a region on this server.
*
* This method should only be called by Master. It synchrnously opens the region and
* closes the region bringing the most important pages in cache.
* <p>
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
public WarmupRegionResponse warmupRegion(final RpcController controller,
final WarmupRegionRequest request) throws ServiceException {
RegionInfo regionInfo = request.getRegionInfo();
final HRegionInfo region = HRegionInfo.convert(regionInfo);
HTableDescriptor htd;
WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
try {
String encodedName = region.getEncodedName();
byte[] encodedNameBytes = region.getEncodedNameAsBytes();
final HRegion onlineRegion = regionServer.getFromOnlineRegions(encodedName);
if (onlineRegion != null) {
LOG.info("Region already online. Skipping warming up " + region);
return response;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Warming up Region " + region.getRegionNameAsString());
}
htd = regionServer.tableDescriptors.get(region.getTable());
if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
LOG.info("Region is in transition. Skipping warmup " + region);
return response;
}
HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
regionServer.getConfiguration(), regionServer, null);
} catch (IOException ie) {
LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
throw new ServiceException(ie);
}
return response;
}
/** /**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any * that the given mutations will be durable on the receiving RS if this method returns without any

View File

@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsReques
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
@ -452,6 +454,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null; return null;
} }
@Override
public WarmupRegionResponse warmupRegion(RpcController controller,
WarmupRegionRequest request) throws ServiceException {
//TODO Auto-generated method stub
return null;
}
@Override @Override
public CloseRegionResponse closeRegion(RpcController controller, public CloseRegionResponse closeRegion(RpcController controller,
CloseRegionRequest request) throws ServiceException { CloseRegionRequest request) throws ServiceException {

View File

@ -0,0 +1,163 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.regionserver.HRegion.warmupHRegion;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
/**
* Run tests that use the HBase clients; {@link HTable}.
* Sets up the HBase mini cluster once at start and runs through all client tests.
* Each creates a table named for the method and does its stuff against that.
*/
@Category(LargeTests.class)
@SuppressWarnings ("deprecation")
public class TestWarmupRegion {
final Log LOG = LogFactory.getLog(getClass());
protected TableName TABLENAME = TableName.valueOf("testPurgeFutureDeletes");
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
private static byte[] COLUMN = Bytes.toBytes("column");
private static int numRows = 10000;
protected static int SLAVES = 3;
private static MiniHBaseCluster myCluster;
private static Table table;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniCluster(SLAVES);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
table = TEST_UTIL.createTable(TABLENAME, FAMILY);
// future timestamp
for (int i = 0; i < numRows; i++) {
long ts = System.currentTimeMillis() * 2;
Put put = new Put(ROW, ts);
put.add(FAMILY, COLUMN, VALUE);
table.put(put);
}
// major compaction, purged future deletes
TEST_UTIL.getHBaseAdmin().flush(TABLENAME);
TEST_UTIL.getHBaseAdmin().majorCompact(TABLENAME);
// waiting for the major compaction to complete
TEST_UTIL.waitFor(6000, new Waiter.Predicate<IOException>() {
@Override
public boolean evaluate() throws IOException {
return TEST_UTIL.getHBaseAdmin().getCompactionState(TABLENAME) ==
AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
}
});
table.close();
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
// Nothing to do.
}
protected void runwarmup() throws InterruptedException{
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
HRegion region = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLENAME).get(0);
HRegionInfo info = region.getRegionInfo();
try {
HTableDescriptor htd = table.getTableDescriptor();
for (int i = 0; i < 10; i++) {
warmupHRegion(info, htd, rs.getWAL(info), rs.getConfiguration(), rs, null);
}
} catch (IOException ie) {
LOG.error("Failed warming up region " + info.getRegionNameAsString(), ie);
}
}
});
thread.start();
thread.join();
}
/**
* Basic client side validation of HBASE-4536
*/
@Test
public void testWarmup() throws Exception {
int serverid = 0;
HRegion region = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLENAME).get(0);
HRegionInfo info = region.getRegionInfo();
runwarmup();
for (int i = 0; i < 10; i++) {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(serverid);
byte [] destName = Bytes.toBytes(rs.getServerName().toString());
TEST_UTIL.getMiniHBaseCluster().getMaster().move(info.getEncodedNameAsBytes(), destName);
serverid = (serverid + 1) % 2;
}
}
}