hbase-9098: During recovery use ZK as the source of truth for region state
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1510101 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b8bece0228
commit
d70d599161
|
@ -192,9 +192,6 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
|
|||
private byte[] tableName = null;
|
||||
private String tableNameAsString = null;
|
||||
|
||||
// when a region is in recovering state, it can only accept writes not reads
|
||||
private volatile boolean recovering = false;
|
||||
|
||||
/** HRegionInfo for root region */
|
||||
public static final HRegionInfo ROOT_REGIONINFO =
|
||||
new HRegionInfo(0L, Bytes.toBytes("-ROOT-"));
|
||||
|
@ -303,7 +300,6 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
|
|||
this.startKey = startKey == null?
|
||||
HConstants.EMPTY_START_ROW: startKey.clone();
|
||||
this.tableName = tableName.clone();
|
||||
this.recovering = false;
|
||||
setHashCode();
|
||||
}
|
||||
|
||||
|
@ -324,7 +320,6 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
|
|||
this.hashCode = other.hashCode();
|
||||
this.encodedName = other.getEncodedName();
|
||||
this.tableName = other.tableName;
|
||||
this.recovering = other.isRecovering();
|
||||
}
|
||||
|
||||
|
||||
|
@ -608,20 +603,6 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
|
|||
this.split = split;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if current region is in recovering
|
||||
*/
|
||||
public boolean isRecovering() {
|
||||
return this.recovering;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param newState set recovering state
|
||||
*/
|
||||
public void setRecovering(boolean newState) {
|
||||
this.recovering = newState;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if this region is offline.
|
||||
*/
|
||||
|
@ -858,7 +839,6 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
|
|||
}
|
||||
builder.setOffline(info.isOffline());
|
||||
builder.setSplit(info.isSplit());
|
||||
builder.setRecovering(info.isRecovering());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -891,9 +871,6 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
|
|||
if (proto.hasOffline()) {
|
||||
hri.setOffline(proto.getOffline());
|
||||
}
|
||||
if (proto.hasRecovering()) {
|
||||
hri.setRecovering(proto.getRecovering());
|
||||
}
|
||||
return hri;
|
||||
}
|
||||
|
||||
|
|
|
@ -2329,10 +2329,6 @@ public final class HBaseProtos {
|
|||
// optional bool split = 6;
|
||||
boolean hasSplit();
|
||||
boolean getSplit();
|
||||
|
||||
// optional bool recovering = 7;
|
||||
boolean hasRecovering();
|
||||
boolean getRecovering();
|
||||
}
|
||||
public static final class RegionInfo extends
|
||||
com.google.protobuf.GeneratedMessage
|
||||
|
@ -2423,16 +2419,6 @@ public final class HBaseProtos {
|
|||
return split_;
|
||||
}
|
||||
|
||||
// optional bool recovering = 7;
|
||||
public static final int RECOVERING_FIELD_NUMBER = 7;
|
||||
private boolean recovering_;
|
||||
public boolean hasRecovering() {
|
||||
return ((bitField0_ & 0x00000040) == 0x00000040);
|
||||
}
|
||||
public boolean getRecovering() {
|
||||
return recovering_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
regionId_ = 0L;
|
||||
tableName_ = com.google.protobuf.ByteString.EMPTY;
|
||||
|
@ -2440,7 +2426,6 @@ public final class HBaseProtos {
|
|||
endKey_ = com.google.protobuf.ByteString.EMPTY;
|
||||
offline_ = false;
|
||||
split_ = false;
|
||||
recovering_ = false;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -2480,9 +2465,6 @@ public final class HBaseProtos {
|
|||
if (((bitField0_ & 0x00000020) == 0x00000020)) {
|
||||
output.writeBool(6, split_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
output.writeBool(7, recovering_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -2516,10 +2498,6 @@ public final class HBaseProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(6, split_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(7, recovering_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -2573,11 +2551,6 @@ public final class HBaseProtos {
|
|||
result = result && (getSplit()
|
||||
== other.getSplit());
|
||||
}
|
||||
result = result && (hasRecovering() == other.hasRecovering());
|
||||
if (hasRecovering()) {
|
||||
result = result && (getRecovering()
|
||||
== other.getRecovering());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -2611,10 +2584,6 @@ public final class HBaseProtos {
|
|||
hash = (37 * hash) + SPLIT_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getSplit());
|
||||
}
|
||||
if (hasRecovering()) {
|
||||
hash = (37 * hash) + RECOVERING_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getRecovering());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
return hash;
|
||||
}
|
||||
|
@ -2743,8 +2712,6 @@ public final class HBaseProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
split_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
recovering_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -2807,10 +2774,6 @@ public final class HBaseProtos {
|
|||
to_bitField0_ |= 0x00000020;
|
||||
}
|
||||
result.split_ = split_;
|
||||
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
|
||||
to_bitField0_ |= 0x00000040;
|
||||
}
|
||||
result.recovering_ = recovering_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -2845,9 +2808,6 @@ public final class HBaseProtos {
|
|||
if (other.hasSplit()) {
|
||||
setSplit(other.getSplit());
|
||||
}
|
||||
if (other.hasRecovering()) {
|
||||
setRecovering(other.getRecovering());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -2917,11 +2877,6 @@ public final class HBaseProtos {
|
|||
split_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
case 56: {
|
||||
bitField0_ |= 0x00000040;
|
||||
recovering_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3063,27 +3018,6 @@ public final class HBaseProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional bool recovering = 7;
|
||||
private boolean recovering_ ;
|
||||
public boolean hasRecovering() {
|
||||
return ((bitField0_ & 0x00000040) == 0x00000040);
|
||||
}
|
||||
public boolean getRecovering() {
|
||||
return recovering_;
|
||||
}
|
||||
public Builder setRecovering(boolean value) {
|
||||
bitField0_ |= 0x00000040;
|
||||
recovering_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearRecovering() {
|
||||
bitField0_ = (bitField0_ & ~0x00000040);
|
||||
recovering_ = false;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:RegionInfo)
|
||||
}
|
||||
|
||||
|
@ -9978,36 +9912,35 @@ public final class HBaseProtos {
|
|||
"\0132\017.NameStringPair\"o\n\022ColumnFamilySchema" +
|
||||
"\022\014\n\004name\030\001 \002(\014\022#\n\nattributes\030\002 \003(\0132\017.Byt" +
|
||||
"esBytesPair\022&\n\rconfiguration\030\003 \003(\0132\017.Nam" +
|
||||
"eStringPair\"\213\001\n\nRegionInfo\022\021\n\tregion_id\030" +
|
||||
"\001 \002(\004\022\022\n\ntable_name\030\002 \002(\014\022\021\n\tstart_key\030\003" +
|
||||
" \001(\014\022\017\n\007end_key\030\004 \001(\014\022\017\n\007offline\030\005 \001(\010\022\r",
|
||||
"\n\005split\030\006 \001(\010\022\022\n\nrecovering\030\007 \001(\010\"1\n\014Fav" +
|
||||
"oredNodes\022!\n\014favored_node\030\001 \003(\0132\013.Server" +
|
||||
"Name\"\225\001\n\017RegionSpecifier\0222\n\004type\030\001 \002(\0162$" +
|
||||
".RegionSpecifier.RegionSpecifierType\022\r\n\005" +
|
||||
"value\030\002 \002(\014\"?\n\023RegionSpecifierType\022\017\n\013RE" +
|
||||
"GION_NAME\020\001\022\027\n\023ENCODED_REGION_NAME\020\002\"%\n\t" +
|
||||
"TimeRange\022\014\n\004from\030\001 \001(\004\022\n\n\002to\030\002 \001(\004\"A\n\nS" +
|
||||
"erverName\022\021\n\thost_name\030\001 \002(\t\022\014\n\004port\030\002 \001" +
|
||||
"(\r\022\022\n\nstart_code\030\003 \001(\004\"\033\n\013Coprocessor\022\014\n" +
|
||||
"\004name\030\001 \002(\t\"-\n\016NameStringPair\022\014\n\004name\030\001 ",
|
||||
"\002(\t\022\r\n\005value\030\002 \002(\t\",\n\rNameBytesPair\022\014\n\004n" +
|
||||
"ame\030\001 \002(\t\022\r\n\005value\030\002 \001(\014\"/\n\016BytesBytesPa" +
|
||||
"ir\022\r\n\005first\030\001 \002(\014\022\016\n\006second\030\002 \002(\014\",\n\rNam" +
|
||||
"eInt64Pair\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"" +
|
||||
"\256\001\n\023SnapshotDescription\022\014\n\004name\030\001 \002(\t\022\r\n" +
|
||||
"\005table\030\002 \001(\t\022\030\n\rcreation_time\030\003 \001(\003:\0010\022." +
|
||||
"\n\004type\030\004 \001(\0162\031.SnapshotDescription.Type:" +
|
||||
"\005FLUSH\022\017\n\007version\030\005 \001(\005\"\037\n\004Type\022\014\n\010DISAB" +
|
||||
"LED\020\000\022\t\n\005FLUSH\020\001\"\n\n\010EmptyMsg\"\033\n\007LongMsg\022" +
|
||||
"\020\n\010long_msg\030\001 \002(\003\"\'\n\rBigDecimalMsg\022\026\n\016bi",
|
||||
"gdecimal_msg\030\001 \002(\014\"5\n\004UUID\022\026\n\016least_sig_" +
|
||||
"bits\030\001 \002(\004\022\025\n\rmost_sig_bits\030\002 \002(\004*r\n\013Com" +
|
||||
"pareType\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n" +
|
||||
"\005EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQ" +
|
||||
"UAL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006B>\n*org.apa" +
|
||||
"che.hadoop.hbase.protobuf.generatedB\013HBa" +
|
||||
"seProtosH\001\240\001\001"
|
||||
"eStringPair\"w\n\nRegionInfo\022\021\n\tregion_id\030\001" +
|
||||
" \002(\004\022\022\n\ntable_name\030\002 \002(\014\022\021\n\tstart_key\030\003 " +
|
||||
"\001(\014\022\017\n\007end_key\030\004 \001(\014\022\017\n\007offline\030\005 \001(\010\022\r\n",
|
||||
"\005split\030\006 \001(\010\"1\n\014FavoredNodes\022!\n\014favored_" +
|
||||
"node\030\001 \003(\0132\013.ServerName\"\225\001\n\017RegionSpecif" +
|
||||
"ier\0222\n\004type\030\001 \002(\0162$.RegionSpecifier.Regi" +
|
||||
"onSpecifierType\022\r\n\005value\030\002 \002(\014\"?\n\023Region" +
|
||||
"SpecifierType\022\017\n\013REGION_NAME\020\001\022\027\n\023ENCODE" +
|
||||
"D_REGION_NAME\020\002\"%\n\tTimeRange\022\014\n\004from\030\001 \001" +
|
||||
"(\004\022\n\n\002to\030\002 \001(\004\"A\n\nServerName\022\021\n\thost_nam" +
|
||||
"e\030\001 \002(\t\022\014\n\004port\030\002 \001(\r\022\022\n\nstart_code\030\003 \001(" +
|
||||
"\004\"\033\n\013Coprocessor\022\014\n\004name\030\001 \002(\t\"-\n\016NameSt" +
|
||||
"ringPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\",\n",
|
||||
"\rNameBytesPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 " +
|
||||
"\001(\014\"/\n\016BytesBytesPair\022\r\n\005first\030\001 \002(\014\022\016\n\006" +
|
||||
"second\030\002 \002(\014\",\n\rNameInt64Pair\022\014\n\004name\030\001 " +
|
||||
"\001(\t\022\r\n\005value\030\002 \001(\003\"\256\001\n\023SnapshotDescripti" +
|
||||
"on\022\014\n\004name\030\001 \002(\t\022\r\n\005table\030\002 \001(\t\022\030\n\rcreat" +
|
||||
"ion_time\030\003 \001(\003:\0010\022.\n\004type\030\004 \001(\0162\031.Snapsh" +
|
||||
"otDescription.Type:\005FLUSH\022\017\n\007version\030\005 \001" +
|
||||
"(\005\"\037\n\004Type\022\014\n\010DISABLED\020\000\022\t\n\005FLUSH\020\001\"\n\n\010E" +
|
||||
"mptyMsg\"\033\n\007LongMsg\022\020\n\010long_msg\030\001 \002(\003\"\'\n\r" +
|
||||
"BigDecimalMsg\022\026\n\016bigdecimal_msg\030\001 \002(\014\"5\n",
|
||||
"\004UUID\022\026\n\016least_sig_bits\030\001 \002(\004\022\025\n\rmost_si" +
|
||||
"g_bits\030\002 \002(\004*r\n\013CompareType\022\010\n\004LESS\020\000\022\021\n" +
|
||||
"\rLESS_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r\n\tNOT_EQUAL" +
|
||||
"\020\003\022\024\n\020GREATER_OR_EQUAL\020\004\022\013\n\007GREATER\020\005\022\t\n" +
|
||||
"\005NO_OP\020\006B>\n*org.apache.hadoop.hbase.prot" +
|
||||
"obuf.generatedB\013HBaseProtosH\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -10035,7 +9968,7 @@ public final class HBaseProtos {
|
|||
internal_static_RegionInfo_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_RegionInfo_descriptor,
|
||||
new java.lang.String[] { "RegionId", "TableName", "StartKey", "EndKey", "Offline", "Split", "Recovering", },
|
||||
new java.lang.String[] { "RegionId", "TableName", "StartKey", "EndKey", "Offline", "Split", },
|
||||
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.class,
|
||||
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.Builder.class);
|
||||
internal_static_FavoredNodes_descriptor =
|
||||
|
|
|
@ -56,7 +56,6 @@ message RegionInfo {
|
|||
optional bytes end_key = 4;
|
||||
optional bool offline = 5;
|
||||
optional bool split = 6;
|
||||
optional bool recovering = 7;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -991,7 +991,7 @@ MasterServices, Server {
|
|||
// In log replay mode, we mark META region as recovering in ZK
|
||||
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
|
||||
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
this.fileSystemManager.prepareMetaLogReplay(currentMetaServer, regions);
|
||||
this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
|
||||
} else {
|
||||
// In recovered.edits mode: create recovered edits file for .META. server
|
||||
this.fileSystemManager.splitMetaLog(currentMetaServer);
|
||||
|
|
|
@ -336,41 +336,18 @@ public class MasterFileSystem {
|
|||
|
||||
/**
|
||||
* Mark regions in recovering state when distributedLogReplay are set true
|
||||
* @param serverNames Set of ServerNames to be replayed wals in order to recover changes contained
|
||||
* in them
|
||||
* @param serverName Failed region server whose wals to be replayed
|
||||
* @param regions Set of regions to be recovered
|
||||
* @throws IOException
|
||||
*/
|
||||
public void prepareLogReplay(Set<ServerName> serverNames) throws IOException {
|
||||
public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {
|
||||
if (!this.distributedLogReplay) {
|
||||
return;
|
||||
}
|
||||
// mark regions in recovering state
|
||||
for (ServerName serverName : serverNames) {
|
||||
NavigableMap<HRegionInfo, Result> regions = this.getServerUserRegions(serverName);
|
||||
if (regions == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
this.splitLogManager.markRegionsRecoveringInZK(serverName, regions.keySet());
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark meta regions in recovering state when distributedLogReplay are set true. The function is used
|
||||
* when {@link #getServerUserRegions(ServerName)} can't be used in case meta RS is down.
|
||||
* @param serverName
|
||||
* @param regions
|
||||
* @throws IOException
|
||||
*/
|
||||
public void prepareMetaLogReplay(ServerName serverName, Set<HRegionInfo> regions)
|
||||
throws IOException {
|
||||
if (!this.distributedLogReplay || (regions == null)) {
|
||||
if (regions == null || regions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// mark regions in recovering state
|
||||
try {
|
||||
this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);
|
||||
} catch (KeeperException e) {
|
||||
|
|
|
@ -1195,6 +1195,27 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
return lastRecordedFlushedSequenceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
|
||||
* and set watcher as well.
|
||||
* @param zkw
|
||||
* @param regionEncodedName region encode name
|
||||
* @return true when /hbase/recovering-regions/<current region encoded name> exists
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static boolean
|
||||
isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
|
||||
throws KeeperException {
|
||||
boolean result = false;
|
||||
String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
|
||||
|
||||
byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
|
||||
if (node != null) {
|
||||
result = true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
|
||||
* @param zkw
|
||||
|
@ -1204,8 +1225,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
* @throws IOException
|
||||
*/
|
||||
public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
|
||||
String serverName,
|
||||
String encodedRegionName) throws IOException {
|
||||
String serverName, String encodedRegionName) throws IOException {
|
||||
// when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
|
||||
// last flushed sequence Id changes when newly assigned RS flushes writes to the region.
|
||||
// If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
package org.apache.hadoop.hbase.master.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -53,15 +55,9 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
|
|||
try {
|
||||
AssignmentManager am = this.services.getAssignmentManager();
|
||||
try {
|
||||
if (this.shouldSplitHlog) {
|
||||
if (this.shouldSplitHlog && !this.distributedLogReplay) {
|
||||
LOG.info("Splitting META logs for " + serverName);
|
||||
if(this.distributedLogReplay) {
|
||||
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
|
||||
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
this.services.getMasterFileSystem().prepareMetaLogReplay(serverName, regions);
|
||||
} else {
|
||||
this.services.getMasterFileSystem().splitMetaLog(serverName);
|
||||
}
|
||||
this.services.getMasterFileSystem().splitMetaLog(serverName);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
this.services.getExecutorService().submit(this);
|
||||
|
@ -157,6 +153,21 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
|
|||
long waitTime = this.server.getConfiguration().getLong(
|
||||
"hbase.catalog.verification.timeout", 1000);
|
||||
|
||||
if (this.shouldSplitHlog && this.distributedLogReplay) {
|
||||
LOG.info("Splitting META logs for " + serverName
|
||||
+ ". Mark META region in recovery before assignment.");
|
||||
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
|
||||
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
try {
|
||||
this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
|
||||
} catch (IOException ioe) {
|
||||
this.services.getExecutorService().submit(this);
|
||||
this.deadServers.add(serverName);
|
||||
throw new IOException("failed to mark META region in recovery on " + serverName
|
||||
+ ", will retry", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
int iFlag = 0;
|
||||
while (true) {
|
||||
try {
|
||||
|
|
|
@ -181,15 +181,9 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
}
|
||||
|
||||
try {
|
||||
if (this.shouldSplitHlog) {
|
||||
if (this.shouldSplitHlog && !this.distributedLogReplay) {
|
||||
LOG.info("Splitting logs for " + serverName + " before assignment.");
|
||||
if(this.distributedLogReplay){
|
||||
Set<ServerName> serverNames = new HashSet<ServerName>();
|
||||
serverNames.add(serverName);
|
||||
this.services.getMasterFileSystem().prepareLogReplay(serverNames);
|
||||
} else {
|
||||
this.services.getMasterFileSystem().splitLog(serverName);
|
||||
}
|
||||
this.services.getMasterFileSystem().splitLog(serverName);
|
||||
} else {
|
||||
LOG.info("Skipping log splitting for " + serverName);
|
||||
}
|
||||
|
@ -265,6 +259,18 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
}
|
||||
}
|
||||
|
||||
if (this.shouldSplitHlog && this.distributedLogReplay) {
|
||||
try {
|
||||
LOG.info("Splitting logs for " + serverName
|
||||
+ ". Mark regions in recovery before assignment.");
|
||||
Set<HRegionInfo> toAssignRegionSet = new HashSet<HRegionInfo>();
|
||||
toAssignRegionSet.addAll(toAssignRegions);
|
||||
this.services.getMasterFileSystem().prepareLogReplay(serverName, toAssignRegionSet);
|
||||
} catch (IOException ioe) {
|
||||
resubmit(serverName, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
am.assign(toAssignRegions);
|
||||
} catch (InterruptedException ie) {
|
||||
|
|
|
@ -311,6 +311,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
private boolean disallowWritesInRecovering = false;
|
||||
|
||||
// when a region is in recovering state, it can only accept writes not reads
|
||||
private volatile boolean isRecovering = false;
|
||||
|
||||
/**
|
||||
* @return The smallest mvcc readPoint across all the scanners in this
|
||||
* region. Writes older than this readPoint, are included in every
|
||||
|
@ -818,14 +821,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @param newState
|
||||
*/
|
||||
public void setRecovering(boolean newState) {
|
||||
this.getRegionInfo().setRecovering(newState);
|
||||
this.isRecovering = newState;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if current region is in recovering
|
||||
*/
|
||||
public boolean isRecovering() {
|
||||
return this.getRegionInfo().isRecovering();
|
||||
return this.isRecovering;
|
||||
}
|
||||
|
||||
/** @return true if region is available (not closed and not closing) */
|
||||
|
|
|
@ -3476,7 +3476,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
|
||||
if (previous == null) {
|
||||
// check if the region to be opened is marked in recovering state in ZK
|
||||
if (isRegionMarkedRecoveringInZK(region.getEncodedName())) {
|
||||
if (SplitLogManager.isRegionMarkedRecoveringInZK(this.getZooKeeper(),
|
||||
region.getEncodedName())) {
|
||||
this.recoveringRegions.put(region.getEncodedName(), null);
|
||||
}
|
||||
// If there is no action in progress, we can submit a specific handler.
|
||||
|
@ -4193,25 +4194,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
return this.compactSplitThread;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
|
||||
* and set watcher as well.
|
||||
* @param regionEncodedName region encode name
|
||||
* @return true when /hbase/recovering-regions/<current region encoded name> exists
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private boolean isRegionMarkedRecoveringInZK(String regionEncodedName) throws KeeperException {
|
||||
boolean result = false;
|
||||
String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, regionEncodedName);
|
||||
|
||||
byte[] node = ZKUtil.getDataAndWatch(this.zooKeeper, nodePath);
|
||||
if (node != null) {
|
||||
result = true;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper function to store the last flushed sequence Id with the previous failed RS for a
|
||||
* recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
|
||||
|
|
|
@ -1523,8 +1523,8 @@ public class HLogSplitter {
|
|||
// fetch location from cache
|
||||
HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
|
||||
if(loc != null) return loc;
|
||||
// fetch location from .META.
|
||||
loc = hconn.getRegionLocation(table, row, false);
|
||||
// fetch location from .META. directly without using cache to avoid hit old dead server
|
||||
loc = hconn.getRegionLocation(table, row, true);
|
||||
if (loc == null) {
|
||||
throw new IOException("Can't locate location for row:" + Bytes.toString(row)
|
||||
+ " of table:" + Bytes.toString(table));
|
||||
|
@ -1560,12 +1560,21 @@ public class HLogSplitter {
|
|||
if (cachedLastFlushedSequenceId == null
|
||||
|| lastFlushedSequenceId > cachedLastFlushedSequenceId) {
|
||||
lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
|
||||
} else if (loc.getRegionInfo().isRecovering() == false) {
|
||||
// region isn't in recovering at all because WAL file may contain a region that has
|
||||
// been moved to somewhere before hosting RS fails
|
||||
lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
|
||||
LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
|
||||
+ " because it's not in recovering.");
|
||||
}
|
||||
|
||||
// check if the region to be recovered is marked as recovering in ZK
|
||||
try {
|
||||
if (SplitLogManager.isRegionMarkedRecoveringInZK(watcher, loc.getRegionInfo()
|
||||
.getEncodedName()) == false) {
|
||||
// region isn't in recovering at all because WAL file may contain a region that has
|
||||
// been moved to somewhere before hosting RS fails
|
||||
lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
|
||||
LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
|
||||
+ " because it's not in recovering.");
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to retrieve recovering state of region "
|
||||
+ loc.getRegionInfo().getEncodedName(), e);
|
||||
}
|
||||
|
||||
onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
|
||||
|
@ -1619,7 +1628,6 @@ public class HLogSplitter {
|
|||
BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
|
||||
HRegionInfo region = loc.getRegionInfo();
|
||||
if((region =ProtobufUtil.getRegionInfo(remoteSvr, region.getRegionName())) != null) {
|
||||
loc.getRegionInfo().setRecovering(region.isRecovering());
|
||||
return loc;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -1006,10 +1006,10 @@ public class TestDistributedLogSplitting {
|
|||
LOG.info("#regions = " + regions.size());
|
||||
Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
|
||||
tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
master.getMasterFileSystem().prepareMetaLogReplay(hrs.getServerName(), tmpRegions);
|
||||
Set<ServerName> failedServers = new HashSet<ServerName>();
|
||||
failedServers.add(hrs.getServerName());
|
||||
master.getMasterFileSystem().prepareLogReplay(failedServers);
|
||||
master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
|
||||
Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
|
||||
userRegionSet.addAll(regions);
|
||||
master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
|
||||
boolean isMetaRegionInRecovery = false;
|
||||
List<String> recoveringRegions =
|
||||
zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
|
||||
|
|
Loading…
Reference in New Issue