HBASE-20727 Persist FlushedSequenceId to speed up WAL split after cluster restart

This commit is contained in:
Allan Yang 2018-06-19 09:45:47 +08:00 committed by Allan Yang
parent 78da0e3669
commit b336da925a
4 changed files with 268 additions and 1 deletions

View File

@ -252,4 +252,19 @@ message CacheEvictionStats {
optional int64 bytes_evicted = 2;
optional int64 max_cache_size = 3;
repeated RegionExceptionMessage exception = 4;
}
message FlushedStoreSequenceId {
required bytes family = 1;
required uint64 seqId = 2;
}
message FlushedRegionSequenceId {
required bytes regionEncodedName = 1;
required uint64 seqId = 2;
repeated FlushedStoreSequenceId stores = 3;
}
message FlushedSequenceId {
repeated FlushedRegionSequenceId regionSequenceId = 1;
}

View File

@ -862,6 +862,13 @@ public class HMaster extends HRegionServer implements MasterServices {
status.setStatus("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
status.setStatus("Loading last flushed sequence id of regions");
try {
this.serverManager.loadLastFlushedSequenceIds();
} catch (IOException e) {
LOG.debug("Failed to load last flushed sequence id of regions"
+ " from file system", e);
}
// Set ourselves as active Master now our claim has succeeded up in zk.
this.activeMaster = true;
@ -946,6 +953,7 @@ public class HMaster extends HRegionServer implements MasterServices {
getChoreService().scheduleChore(normalizerChore);
this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore);
this.serverManager.startChore();
status.setStatus("Starting cluster schema service");
initClusterSchemaService();

View File

@ -38,10 +38,15 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
@ -51,9 +56,11 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -62,12 +69,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
/**
@ -106,6 +117,22 @@ public class ServerManager {
public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
"hbase.master.wait.on.regionservers.interval";
/**
* see HBASE-20727
* if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
* will be persisted to HDFS and loaded when master restart to speed up log split
*/
public static final String PERSIST_FLUSHEDSEQUENCEID =
"hbase.master.persist.flushedsequenceid.enabled";
public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
"hbase.master.flushedsequenceid.flusher.interval";
public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT =
3 * 60 * 60 * 1000; // 3 hours
private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
// Set if we are to shutdown the cluster.
@ -117,6 +144,13 @@ public class ServerManager {
private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
private boolean persistFlushedSequenceId = true;
private volatile boolean isFlushSeqIdPersistInProgress = false;
/** File on hdfs to store last flushed sequence id of regions */
private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
private FlushedSequenceIdFlusher flushedSeqIdFlusher;
/**
* The last flushed sequence id for a store in a region.
*/
@ -194,6 +228,8 @@ public class ServerManager {
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
this.connection = master.getClusterConnection();
this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
}
/**
@ -424,6 +460,11 @@ public class ServerManager {
this.rsAdmins.remove(serverName);
}
@VisibleForTesting
public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
return flushedSequenceIdByRegion;
}
public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
@ -601,6 +642,10 @@ public class ServerManager {
listener.serverRemoved(serverName);
}
}
// trigger a persist of flushedSeqId
if (flushedSeqIdFlusher != null) {
flushedSeqIdFlusher.triggerNow();
}
return true;
}
@ -967,11 +1012,37 @@ public class ServerManager {
return this.clusterShutdown.get();
}
/**
* start chore in ServerManager
*/
public void startChore() {
Configuration c = master.getConfiguration();
if (persistFlushedSequenceId) {
// when reach here, RegionStates should loaded, firstly, we call remove deleted regions
removeDeletedRegionFromLoadedFlushedSequenceIds();
int flushPeriod = c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL,
FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
flushedSeqIdFlusher = new FlushedSequenceIdFlusher(
"FlushedSequenceIdFlusher", flushPeriod);
master.getChoreService().scheduleChore(flushedSeqIdFlusher);
}
}
/**
* Stop the ServerManager.
*/
public void stop() {
// Nothing to do.
if (flushedSeqIdFlusher != null) {
flushedSeqIdFlusher.cancel();
}
if (persistFlushedSequenceId) {
try {
persistRegionLastFlushedSequenceIds();
} catch (IOException e) {
LOG.warn("Failed to persist last flushed sequence id of regions"
+ " to file system", e);
}
}
}
/**
@ -1065,4 +1136,144 @@ public class ServerManager {
ServerMetrics serverMetrics = onlineServers.get(serverName);
return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
}
/**
* Persist last flushed sequence id of each region to HDFS
* @throws IOException if persit to HDFS fails
*/
private void persistRegionLastFlushedSequenceIds() throws IOException {
if (isFlushSeqIdPersistInProgress) {
return;
}
isFlushSeqIdPersistInProgress = true;
try {
Configuration conf = master.getConfiguration();
Path rootDir = FSUtils.getRootDir(conf);
Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(lastFlushedSeqIdPath)) {
LOG.info("Rewriting .lastflushedseqids file at: "
+ lastFlushedSeqIdPath);
if (!fs.delete(lastFlushedSeqIdPath, false)) {
throw new IOException("Unable to remove existing "
+ lastFlushedSeqIdPath);
}
} else {
LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
}
FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
FlushedSequenceId.Builder flushedSequenceIdBuilder =
FlushedSequenceId.newBuilder();
try {
for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
FlushedRegionSequenceId.newBuilder();
flushedRegionSequenceIdBuilder.setRegionEncodedName(
ByteString.copyFrom(entry.getKey()));
flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
ConcurrentNavigableMap<byte[], Long> storeSeqIds =
storeFlushedSequenceIdsByRegion.get(entry.getKey());
if (storeSeqIds != null) {
for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
FlushedStoreSequenceId.newBuilder();
flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
}
}
flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
}
flushedSequenceIdBuilder.build().writeDelimitedTo(out);
} finally {
if (out != null) {
out.close();
}
}
} finally {
isFlushSeqIdPersistInProgress = false;
}
}
/**
* Load last flushed sequence id of each region from HDFS, if persisted
*/
public void loadLastFlushedSequenceIds() throws IOException {
if (!persistFlushedSequenceId) {
return;
}
Configuration conf = master.getConfiguration();
Path rootDir = FSUtils.getRootDir(conf);
Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
FileSystem fs = FileSystem.get(conf);
if (!fs.exists(lastFlushedSeqIdPath)) {
LOG.info("No .lastflushedseqids found at" + lastFlushedSeqIdPath
+ " will record last flushed sequence id"
+ " for regions by regionserver report all over again");
return;
} else {
LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
}
FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
try {
FlushedSequenceId flushedSequenceId =
FlushedSequenceId.parseDelimitedFrom(in);
for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
.getRegionSequenceIdList()) {
byte[] encodedRegionName = flushedRegionSequenceId
.getRegionEncodedName().toByteArray();
flushedSequenceIdByRegion
.putIfAbsent(encodedRegionName, flushedRegionSequenceId.getSeqId());
if (flushedRegionSequenceId.getStoresList() != null
&& flushedRegionSequenceId.getStoresList().size() != 0) {
ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
() -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
.getStoresList()) {
storeFlushedSequenceId
.put(flushedStoreSequenceId.getFamily().toByteArray(),
flushedStoreSequenceId.getSeqId());
}
}
}
} finally {
in.close();
}
}
/**
* Regions may have been removed between latest persist of FlushedSequenceIds
* and master abort. So after loading FlushedSequenceIds from file, and after
* meta loaded, we need to remove the deleted region according to RegionStates.
*/
public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
while(it.hasNext()) {
byte[] regionEncodedName = it.next();
if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
it.remove();
storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
}
}
}
private class FlushedSequenceIdFlusher extends ScheduledChore {
public FlushedSequenceIdFlusher(String name, int p) {
super(name, master, p, 60 * 1000); //delay one minute before first execute
}
@Override
protected void chore() {
try {
persistRegionLastFlushedSequenceIds();
} catch (IOException e) {
LOG.debug("Failed to persist last flushed sequence id of regions"
+ " to file system", e);
}
}
}
}

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -192,5 +195,35 @@ public class TestMaster {
TEST_UTIL.deleteTable(tableName);
}
}
@Test
public void testFlushedSequenceIdPersistLoad() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
int msgInterval = conf.getInt("hbase.regionserver.msginterval", 100);
// insert some data into META
TableName tableName = TableName.valueOf("testFlushSeqId");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(Bytes.toBytes("cf")));
Table table = TEST_UTIL.createTable(desc, null);
// flush META region
TEST_UTIL.flush(TableName.META_TABLE_NAME);
// wait for regionserver report
Threads.sleep(msgInterval * 2);
// record flush seqid before cluster shutdown
Map<byte[], Long> regionMapBefore =
TEST_UTIL.getHBaseCluster().getMaster().getServerManager()
.getFlushedSequenceIdByRegion();
// restart hbase cluster which will cause flushed sequence id persist and reload
TEST_UTIL.getMiniHBaseCluster().shutdown();
TEST_UTIL.restartHBaseCluster(2);
TEST_UTIL.waitUntilNoRegionsInTransition();
// check equality after reloading flushed sequence id map
Map<byte[], Long> regionMapAfter =
TEST_UTIL.getHBaseCluster().getMaster().getServerManager()
.getFlushedSequenceIdByRegion();
assertTrue(regionMapBefore.equals(regionMapAfter));
}
}