HBASE-20066 Region sequence id may go backward after split or merge
This commit is contained in:
parent
d272ac908c
commit
f06a89b531
|
@ -241,11 +241,12 @@ enum SplitTableRegionState {
|
|||
SPLIT_TABLE_REGION_PRE_OPERATION = 2;
|
||||
SPLIT_TABLE_REGION_CLOSE_PARENT_REGION = 3;
|
||||
SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS = 4;
|
||||
SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META = 5;
|
||||
SPLIT_TABLE_REGION_UPDATE_META = 6;
|
||||
SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META = 7;
|
||||
SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS = 8;
|
||||
SPLIT_TABLE_REGION_POST_OPERATION = 9;
|
||||
SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE = 5;
|
||||
SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META = 6;
|
||||
SPLIT_TABLE_REGION_UPDATE_META = 7;
|
||||
SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META = 8;
|
||||
SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS = 9;
|
||||
SPLIT_TABLE_REGION_POST_OPERATION = 10;
|
||||
}
|
||||
|
||||
message SplitTableRegionStateData {
|
||||
|
@ -260,11 +261,12 @@ enum MergeTableRegionsState {
|
|||
MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION = 3;
|
||||
MERGE_TABLE_REGIONS_CLOSE_REGIONS = 4;
|
||||
MERGE_TABLE_REGIONS_CREATE_MERGED_REGION = 5;
|
||||
MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION = 6;
|
||||
MERGE_TABLE_REGIONS_UPDATE_META = 7;
|
||||
MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION = 8;
|
||||
MERGE_TABLE_REGIONS_OPEN_MERGED_REGION = 9;
|
||||
MERGE_TABLE_REGIONS_POST_OPERATION = 10;
|
||||
MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE = 6;
|
||||
MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION = 7;
|
||||
MERGE_TABLE_REGIONS_UPDATE_META = 8;
|
||||
MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION = 9;
|
||||
MERGE_TABLE_REGIONS_OPEN_MERGED_REGION = 10;
|
||||
MERGE_TABLE_REGIONS_POST_OPERATION = 11;
|
||||
}
|
||||
|
||||
message MergeTableRegionsStateData {
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -204,13 +205,9 @@ public class MergeTableRegionsProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(
|
||||
final MasterProcedureEnv env,
|
||||
final MergeTableRegionsState state)
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final MergeTableRegionsState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(this + " execute state=" + state);
|
||||
}
|
||||
LOG.trace("{} execute state={}", this, state);
|
||||
try {
|
||||
switch (state) {
|
||||
case MERGE_TABLE_REGIONS_PREPARE:
|
||||
|
@ -230,6 +227,10 @@ public class MergeTableRegionsProcedure
|
|||
break;
|
||||
case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
|
||||
createMergedRegion(env);
|
||||
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE);
|
||||
break;
|
||||
case MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE:
|
||||
writeMaxSequenceIdFile(env);
|
||||
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION);
|
||||
break;
|
||||
case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
|
||||
|
@ -289,15 +290,16 @@ public class MergeTableRegionsProcedure
|
|||
case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
|
||||
case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
|
||||
case MERGE_TABLE_REGIONS_UPDATE_META:
|
||||
String msg = this + " We are in the " + state + " state."
|
||||
+ " It is complicated to rollback the merge operation that region server is working on."
|
||||
+ " Rollback is not supported and we should let the merge operation to complete";
|
||||
String msg = this + " We are in the " + state + " state." +
|
||||
" It is complicated to rollback the merge operation that region server is working on." +
|
||||
" Rollback is not supported and we should let the merge operation to complete";
|
||||
LOG.warn(msg);
|
||||
// PONR
|
||||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
||||
case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
|
||||
break;
|
||||
case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
|
||||
case MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE:
|
||||
cleanupMergedRegion(env);
|
||||
break;
|
||||
case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
|
||||
|
@ -562,7 +564,6 @@ public class MergeTableRegionsProcedure
|
|||
/**
|
||||
* Set the region states to MERGING state
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
|
||||
// Set State.MERGING to regions to be merged
|
||||
|
@ -571,23 +572,9 @@ public class MergeTableRegionsProcedure
|
|||
regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.MERGING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollback the region state change
|
||||
* Not used for now, since rollbackCloseRegionsForMerge() will mark regions as OPEN
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setRegionStateBackToOpen(final MasterProcedureEnv env) throws IOException {
|
||||
// revert region state to Open
|
||||
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
|
||||
regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.OPEN);
|
||||
regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.OPEN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create merged region
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
|
@ -614,7 +601,6 @@ public class MergeTableRegionsProcedure
|
|||
* @param env MasterProcedureEnv
|
||||
* @param regionFs region file system
|
||||
* @param mergedDir the temp directory of merged region
|
||||
* @throws IOException
|
||||
*/
|
||||
private void mergeStoreFiles(
|
||||
final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
|
||||
|
@ -642,7 +628,6 @@ public class MergeTableRegionsProcedure
|
|||
/**
|
||||
* Clean up merged region
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
|
@ -777,6 +762,18 @@ public class MergeTableRegionsProcedure
|
|||
return regionLocation;
|
||||
}
|
||||
|
||||
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
|
||||
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
|
||||
long maxSequenceId = -1L;
|
||||
for (RegionInfo region : regionsToMerge) {
|
||||
maxSequenceId =
|
||||
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region)));
|
||||
}
|
||||
if (maxSequenceId > 0) {
|
||||
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
||||
* retrieve it.
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -214,9 +215,7 @@ public class SplitTableRegionProcedure
|
|||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final SplitTableRegionState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
LOG.trace("{} execute state={}", this, state);
|
||||
|
||||
try {
|
||||
switch (state) {
|
||||
|
@ -237,6 +236,10 @@ public class SplitTableRegionProcedure
|
|||
break;
|
||||
case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
|
||||
createDaughterRegions(env);
|
||||
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE);
|
||||
break;
|
||||
case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE:
|
||||
writeMaxSequenceIdFile(env);
|
||||
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META);
|
||||
break;
|
||||
case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META:
|
||||
|
@ -300,6 +303,7 @@ public class SplitTableRegionProcedure
|
|||
case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META:
|
||||
break;
|
||||
case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
|
||||
case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE:
|
||||
// Doing nothing, as re-open parent region would clean up daughter region directories.
|
||||
break;
|
||||
case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
|
||||
|
@ -415,11 +419,11 @@ public class SplitTableRegionProcedure
|
|||
return daughter_2_RI.getStartKey();
|
||||
}
|
||||
|
||||
private static State [] EXPECTED_SPLIT_STATES = new State [] {State.OPEN, State.CLOSED};
|
||||
private static final State[] EXPECTED_SPLIT_STATES = new State[] { State.OPEN, State.CLOSED };
|
||||
|
||||
/**
|
||||
* Prepare to Split region.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException {
|
||||
|
@ -475,8 +479,6 @@ public class SplitTableRegionProcedure
|
|||
/**
|
||||
* Action before splitting region in a table.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private void preSplitRegion(final MasterProcedureEnv env)
|
||||
throws IOException, InterruptedException {
|
||||
|
@ -499,7 +501,6 @@ public class SplitTableRegionProcedure
|
|||
/**
|
||||
* Action after rollback a split table region action.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void postRollBackSplitRegion(final MasterProcedureEnv env) throws IOException {
|
||||
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
|
@ -511,7 +512,7 @@ public class SplitTableRegionProcedure
|
|||
/**
|
||||
* Rollback close parent region
|
||||
* @param env MasterProcedureEnv
|
||||
**/
|
||||
*/
|
||||
private void openParentRegion(final MasterProcedureEnv env) throws IOException {
|
||||
// Check whether the region is closed; if so, open it in the same server
|
||||
final int regionReplication = getRegionReplication(env);
|
||||
|
@ -528,7 +529,6 @@ public class SplitTableRegionProcedure
|
|||
/**
|
||||
* Create daughter regions
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void createDaughterRegions(final MasterProcedureEnv env) throws IOException {
|
||||
|
@ -558,7 +558,6 @@ public class SplitTableRegionProcedure
|
|||
/**
|
||||
* Create Split directory
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
|
||||
final HRegionFileSystem regionFs) throws IOException {
|
||||
|
@ -756,7 +755,6 @@ public class SplitTableRegionProcedure
|
|||
/**
|
||||
* Add daughter regions to META
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException {
|
||||
env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env),
|
||||
|
@ -823,6 +821,16 @@ public class SplitTableRegionProcedure
|
|||
return htd.getRegionReplication();
|
||||
}
|
||||
|
||||
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
|
||||
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
|
||||
long maxSequenceId =
|
||||
WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion()));
|
||||
if (maxSequenceId > 0) {
|
||||
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId);
|
||||
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
||||
* retrieve it.
|
||||
|
|
|
@ -19,13 +19,17 @@
|
|||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Base class for all the Table procedures that want to use a StateMachineProcedure.
|
||||
|
@ -114,4 +118,10 @@ public abstract class AbstractStateMachineTableProcedure<TState>
|
|||
throw new TableNotFoundException(getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException {
|
||||
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), getTableName());
|
||||
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -947,15 +947,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
// Use maximum of log sequenceid or that which was found in stores
|
||||
// (particularly if no recovered edits, seqid will be -1).
|
||||
long nextSeqid = maxSeqId;
|
||||
if (this.writestate.writesEnabled) {
|
||||
nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(),
|
||||
this.fs.getRegionDir(), nextSeqid, 1);
|
||||
} else {
|
||||
nextSeqid++;
|
||||
long maxSeqIdFromFile =
|
||||
WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir());
|
||||
long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
|
||||
if (writestate.writesEnabled) {
|
||||
WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId);
|
||||
}
|
||||
|
||||
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqid);
|
||||
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
|
||||
|
||||
// A region can be reopened if failed a split; reset flags
|
||||
this.closing.set(false);
|
||||
|
@ -967,7 +966,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
status.markComplete("Region opened successfully");
|
||||
return nextSeqid;
|
||||
return nextSeqId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1103,7 +1102,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// table is still online
|
||||
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
|
||||
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
|
||||
mvcc.getReadPoint(), 0);
|
||||
mvcc.getReadPoint());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7014,7 +7013,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* Open HRegion.
|
||||
* Calls initialize and sets sequenceId.
|
||||
* @return Returns <code>this</code>
|
||||
* @throws IOException
|
||||
*/
|
||||
protected HRegion openHRegion(final CancelableProgressable reporter)
|
||||
throws IOException {
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -47,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
|
@ -89,13 +87,14 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
|
@ -623,71 +622,71 @@ public class WALSplitter {
|
|||
|| file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file with name as region open sequence id
|
||||
* @param fs
|
||||
* @param regiondir
|
||||
* @param newSeqId
|
||||
* @param safetyBumper
|
||||
* @return long new sequence Id value
|
||||
* @throws IOException
|
||||
*/
|
||||
public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
|
||||
long newSeqId, long safetyBumper) throws IOException {
|
||||
private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException {
|
||||
// TODO: Why are we using a method in here as part of our normal region open where
|
||||
// there is no splitting involved? Fix. St.Ack 01/20/2017.
|
||||
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
long maxSeqId = 0;
|
||||
FileStatus[] files = null;
|
||||
if (fs.exists(editsdir)) {
|
||||
files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
return isSequenceIdFile(p);
|
||||
}
|
||||
});
|
||||
if (files != null) {
|
||||
for (FileStatus status : files) {
|
||||
String fileName = status.getPath().getName();
|
||||
Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
|
||||
try {
|
||||
long tmpSeqId =
|
||||
Long.parseLong(fileName.substring(0, fileName.length()
|
||||
- SEQUENCE_ID_FILE_SUFFIX_LENGTH));
|
||||
maxSeqId = Math.max(tmpSeqId, maxSeqId);
|
||||
FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile);
|
||||
return files != null ? files : new FileStatus[0];
|
||||
} catch (FileNotFoundException e) {
|
||||
return new FileStatus[0];
|
||||
}
|
||||
}
|
||||
|
||||
private static long getMaxSequenceId(FileStatus[] files) {
|
||||
long maxSeqId = -1L;
|
||||
for (FileStatus file : files) {
|
||||
String fileName = file.getPath().getName();
|
||||
try {
|
||||
maxSeqId = Math.max(maxSeqId, Long
|
||||
.parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
|
||||
} catch (NumberFormatException ex) {
|
||||
LOG.warn("Invalid SeqId File Name={}", fileName);
|
||||
}
|
||||
}
|
||||
return maxSeqId;
|
||||
}
|
||||
}
|
||||
if (maxSeqId > newSeqId) {
|
||||
newSeqId = maxSeqId;
|
||||
}
|
||||
newSeqId += safetyBumper; // bump up SeqId
|
||||
|
||||
/**
|
||||
* Get the max sequence id which is stored in the region directory. -1 if none.
|
||||
*/
|
||||
public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException {
|
||||
return getMaxSequenceId(getSequenceIdFiles(fs, regionDir));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file with name as region's max sequence id
|
||||
*/
|
||||
public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId)
|
||||
throws IOException {
|
||||
FileStatus[] files = getSequenceIdFiles(fs, regionDir);
|
||||
long maxSeqId = getMaxSequenceId(files);
|
||||
if (maxSeqId > newMaxSeqId) {
|
||||
throw new IOException("The new max sequence id " + newMaxSeqId +
|
||||
" is less than the old max sequence id " + maxSeqId);
|
||||
}
|
||||
// write a new seqId file
|
||||
Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
|
||||
if (newSeqId != maxSeqId) {
|
||||
Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir),
|
||||
newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
|
||||
if (newMaxSeqId != maxSeqId) {
|
||||
try {
|
||||
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
|
||||
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
|
||||
}
|
||||
LOG.debug("Wrote file={}, newSeqId={}, maxSeqId={}", newSeqIdFile,
|
||||
newSeqId, maxSeqId);
|
||||
LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
|
||||
maxSeqId);
|
||||
} catch (FileAlreadyExistsException ignored) {
|
||||
// latest hdfs throws this exception. it's all right if newSeqIdFile already exists
|
||||
}
|
||||
}
|
||||
// remove old ones
|
||||
if (files != null) {
|
||||
for (FileStatus status : files) {
|
||||
if (!newSeqIdFile.equals(status.getPath())) {
|
||||
fs.delete(status.getPath(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
return newSeqId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link Reader} for reading logs to split.
|
||||
|
@ -1817,9 +1816,7 @@ public class WALSplitter {
|
|||
* @throws IOException
|
||||
*/
|
||||
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
|
||||
Pair<WALKey, WALEdit> logEntry, Durability durability)
|
||||
throws IOException {
|
||||
|
||||
Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
|
||||
if (entry == null) {
|
||||
// return an empty array
|
||||
return Collections.emptyList();
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Testcase for HBASE-20066
|
||||
*/
|
||||
@Category({ RegionServerTests.class, LargeTests.class })
|
||||
public class TestSequenceIdMonotonicallyIncreasing {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSequenceIdMonotonicallyIncreasing.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final TableName NAME = TableName.valueOf("test");
|
||||
|
||||
private static final byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
private static final byte[] CQ = Bytes.toBytes("cq");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
UTIL.startMiniCluster(2);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
Admin admin = UTIL.getAdmin();
|
||||
if (admin.tableExists(NAME)) {
|
||||
admin.disableTable(NAME);
|
||||
admin.deleteTable(NAME);
|
||||
}
|
||||
}
|
||||
|
||||
private Table createTable(boolean multiRegions) throws IOException {
|
||||
if (multiRegions) {
|
||||
return UTIL.createTable(NAME, CF, new byte[][] { Bytes.toBytes(1) });
|
||||
} else {
|
||||
return UTIL.createTable(NAME, CF);
|
||||
}
|
||||
}
|
||||
|
||||
private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException {
|
||||
Path walFile = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
|
||||
long maxSeqId = -1L;
|
||||
try (WAL.Reader reader =
|
||||
WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
|
||||
for (;;) {
|
||||
WAL.Entry entry = reader.next();
|
||||
if (entry == null) {
|
||||
break;
|
||||
}
|
||||
if (Bytes.equals(region.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())) {
|
||||
maxSeqId = Math.max(maxSeqId, entry.getKey().getSequenceId());
|
||||
}
|
||||
}
|
||||
}
|
||||
return maxSeqId;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit()
|
||||
throws IOException, InterruptedException, ExecutionException, TimeoutException {
|
||||
try (Table table = createTable(false)) {
|
||||
table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0)));
|
||||
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(0)));
|
||||
}
|
||||
UTIL.flush(NAME);
|
||||
HRegionServer rs = UTIL.getRSForFirstRegionInTable(NAME);
|
||||
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||
UTIL.getAdmin().splitRegionAsync(region.getRegionName(), Bytes.toBytes(1)).get(1,
|
||||
TimeUnit.MINUTES);
|
||||
long maxSeqId = getMaxSeqId(rs, region);
|
||||
RegionLocator locator = UTIL.getConnection().getRegionLocator(NAME);
|
||||
HRegionLocation locA = locator.getRegionLocation(Bytes.toBytes(0), true);
|
||||
HRegionLocation locB = locator.getRegionLocation(Bytes.toBytes(1), true);
|
||||
assertEquals(maxSeqId + 1, locA.getSeqNum());
|
||||
assertEquals(maxSeqId + 1, locB.getSeqNum());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMerge()
|
||||
throws IOException, InterruptedException, ExecutionException, TimeoutException {
|
||||
try (Table table = createTable(true)) {
|
||||
table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0)));
|
||||
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(0)));
|
||||
table.put(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, Bytes.toBytes(0)));
|
||||
}
|
||||
UTIL.flush(NAME);
|
||||
MiniHBaseCluster cluster = UTIL.getMiniHBaseCluster();
|
||||
List<HRegion> regions = cluster.getRegions(NAME);
|
||||
HRegion regionA = regions.get(0);
|
||||
HRegion regionB = regions.get(1);
|
||||
HRegionServer rsA =
|
||||
cluster.getRegionServer(cluster.getServerWith(regionA.getRegionInfo().getRegionName()));
|
||||
HRegionServer rsB =
|
||||
cluster.getRegionServer(cluster.getServerWith(regionB.getRegionInfo().getRegionName()));
|
||||
UTIL.getAdmin().mergeRegionsAsync(regionA.getRegionInfo().getRegionName(),
|
||||
regionB.getRegionInfo().getRegionName(), false).get(1, TimeUnit.MINUTES);
|
||||
long maxSeqIdA = getMaxSeqId(rsA, regionA.getRegionInfo());
|
||||
long maxSeqIdB = getMaxSeqId(rsB, regionB.getRegionInfo());
|
||||
HRegionLocation loc =
|
||||
UTIL.getConnection().getRegionLocator(NAME).getRegionLocation(Bytes.toBytes(0), true);
|
||||
assertEquals(Math.max(maxSeqIdA, maxSeqIdB) + 1, loc.getSeqNum());
|
||||
}
|
||||
}
|
|
@ -478,42 +478,6 @@ public abstract class AbstractTestDLS {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWriteSeqIdFiles() throws Exception {
|
||||
LOG.info("testReadWriteSeqIdFiles");
|
||||
startCluster(2);
|
||||
final ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
|
||||
Table ht = installTable(zkw, 10);
|
||||
try {
|
||||
FileSystem fs = master.getMasterFileSystem().getFileSystem();
|
||||
Path tableDir =
|
||||
FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName()));
|
||||
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
|
||||
long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
|
||||
WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
|
||||
assertEquals(newSeqId + 2000,
|
||||
WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
|
||||
|
||||
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0));
|
||||
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
return WALSplitter.isSequenceIdFile(p);
|
||||
}
|
||||
});
|
||||
// only one seqid file should exist
|
||||
assertEquals(1, files.length);
|
||||
|
||||
// verify all seqId files aren't treated as recovered.edits files
|
||||
NavigableSet<Path> recoveredEdits =
|
||||
WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0));
|
||||
assertEquals(0, recoveredEdits.size());
|
||||
} finally {
|
||||
if (ht != null) ht.close();
|
||||
if (zkw != null) zkw.close();
|
||||
}
|
||||
}
|
||||
|
||||
private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
|
||||
return installTable(zkw, nrs, 0);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.NavigableSet;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ RegionServerTests.class, SmallTests.class })
|
||||
public class TestReadWriteSeqIdFiles {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReadWriteSeqIdFiles.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestReadWriteSeqIdFiles.class);
|
||||
|
||||
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
|
||||
|
||||
private static FileSystem FS;
|
||||
|
||||
private static Path REGION_DIR;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws IOException {
|
||||
FS = FileSystem.getLocal(UTIL.getConfiguration());
|
||||
REGION_DIR = UTIL.getDataTestDir();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L);
|
||||
assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR));
|
||||
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L);
|
||||
assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR));
|
||||
// can not write a sequence id which is smaller
|
||||
try {
|
||||
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L);
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
LOG.info("Expected error", e);
|
||||
}
|
||||
|
||||
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR);
|
||||
FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
return WALSplitter.isSequenceIdFile(p);
|
||||
}
|
||||
});
|
||||
// only one seqid file should exist
|
||||
assertEquals(1, files.length);
|
||||
|
||||
// verify all seqId files aren't treated as recovered.edits files
|
||||
NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR);
|
||||
assertEquals(0, recoveredEdits.size());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue