diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 1ab51e528fa..e785f96cabf 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -241,11 +241,12 @@ enum SplitTableRegionState { SPLIT_TABLE_REGION_PRE_OPERATION = 2; SPLIT_TABLE_REGION_CLOSE_PARENT_REGION = 3; SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS = 4; - SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META = 5; - SPLIT_TABLE_REGION_UPDATE_META = 6; - SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META = 7; - SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS = 8; - SPLIT_TABLE_REGION_POST_OPERATION = 9; + SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE = 5; + SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META = 6; + SPLIT_TABLE_REGION_UPDATE_META = 7; + SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META = 8; + SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS = 9; + SPLIT_TABLE_REGION_POST_OPERATION = 10; } message SplitTableRegionStateData { @@ -260,11 +261,12 @@ enum MergeTableRegionsState { MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION = 3; MERGE_TABLE_REGIONS_CLOSE_REGIONS = 4; MERGE_TABLE_REGIONS_CREATE_MERGED_REGION = 5; - MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION = 6; - MERGE_TABLE_REGIONS_UPDATE_META = 7; - MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION = 8; - MERGE_TABLE_REGIONS_OPEN_MERGED_REGION = 9; - MERGE_TABLE_REGIONS_POST_OPERATION = 10; + MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE = 6; + MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION = 7; + MERGE_TABLE_REGIONS_UPDATE_META = 8; + MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION = 9; + MERGE_TABLE_REGIONS_OPEN_MERGED_REGION = 10; + MERGE_TABLE_REGIONS_POST_OPERATION = 11; } message MergeTableRegionsStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index 1c448dc5acc..7c041e7d3c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -204,55 +205,55 @@ public class MergeTableRegionsProcedure } @Override - protected Flow executeFromState( - final MasterProcedureEnv env, - final MergeTableRegionsState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { - if (LOG.isDebugEnabled()) { - LOG.debug(this + " execute state=" + state); - } + protected Flow executeFromState(final MasterProcedureEnv env, final MergeTableRegionsState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.trace("{} execute state={}", this, state); try { switch (state) { - case MERGE_TABLE_REGIONS_PREPARE: - if (!prepareMergeRegion(env)) { - assert isFailed() : "Merge region should have an exception here"; + case MERGE_TABLE_REGIONS_PREPARE: + if (!prepareMergeRegion(env)) { + assert isFailed() : "Merge region should have an exception here"; + return Flow.NO_MORE_STATE; + } + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: + preMergeRegions(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS); + break; + case MERGE_TABLE_REGIONS_CLOSE_REGIONS: + addChildProcedure(createUnassignProcedures(env, getRegionReplication(env))); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION); + break; + case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: + createMergedRegion(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE); + break; + case MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE: + writeMaxSequenceIdFile(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: + preMergeRegionsCommit(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META); + break; + case MERGE_TABLE_REGIONS_UPDATE_META: + updateMetaForMergedRegions(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION); + break; + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + postMergeRegionsCommit(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION); + break; + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + addChildProcedure(createAssignProcedures(env, getRegionReplication(env))); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION); + break; + case MERGE_TABLE_REGIONS_POST_OPERATION: + postCompletedMergeRegions(env); return Flow.NO_MORE_STATE; - } - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION); - break; - case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: - preMergeRegions(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS); - break; - case MERGE_TABLE_REGIONS_CLOSE_REGIONS: - addChildProcedure(createUnassignProcedures(env, getRegionReplication(env))); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION); - break; - case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: - createMergedRegion(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION); - break; - case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: - preMergeRegionsCommit(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META); - break; - case MERGE_TABLE_REGIONS_UPDATE_META: - updateMetaForMergedRegions(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION); - break; - case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: - postMergeRegionsCommit(env); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION); - break; - case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: - addChildProcedure(createAssignProcedures(env, getRegionReplication(env))); - setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION); - break; - case MERGE_TABLE_REGIONS_POST_OPERATION: - postCompletedMergeRegions(env); - return Flow.NO_MORE_STATE; - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); } } catch (IOException e) { String msg = "Error trying to merge regions " + @@ -285,31 +286,32 @@ public class MergeTableRegionsProcedure try { switch (state) { - case MERGE_TABLE_REGIONS_POST_OPERATION: - case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: - case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: - case MERGE_TABLE_REGIONS_UPDATE_META: - String msg = this + " We are in the " + state + " state." - + " It is complicated to rollback the merge operation that region server is working on." - + " Rollback is not supported and we should let the merge operation to complete"; - LOG.warn(msg); - // PONR - throw new UnsupportedOperationException(this + " unhandled state=" + state); - case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: - break; - case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: - cleanupMergedRegion(env); - break; - case MERGE_TABLE_REGIONS_CLOSE_REGIONS: - rollbackCloseRegionsForMerge(env); - break; - case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: - postRollBackMergeRegions(env); - break; - case MERGE_TABLE_REGIONS_PREPARE: - break; - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); + case MERGE_TABLE_REGIONS_POST_OPERATION: + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + case MERGE_TABLE_REGIONS_UPDATE_META: + String msg = this + " We are in the " + state + " state." + + " It is complicated to rollback the merge operation that region server is working on." + + " Rollback is not supported and we should let the merge operation to complete"; + LOG.warn(msg); + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: + break; + case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: + case MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE: + cleanupMergedRegion(env); + break; + case MERGE_TABLE_REGIONS_CLOSE_REGIONS: + rollbackCloseRegionsForMerge(env); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: + postRollBackMergeRegions(env); + break; + case MERGE_TABLE_REGIONS_PREPARE: + break; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); } } catch (Exception e) { // This will be retried. Unless there is a bug in the code, @@ -326,10 +328,10 @@ public class MergeTableRegionsProcedure @Override protected boolean isRollbackSupported(final MergeTableRegionsState state) { switch (state) { - case MERGE_TABLE_REGIONS_POST_OPERATION: - case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: - case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: - case MERGE_TABLE_REGIONS_UPDATE_META: + case MERGE_TABLE_REGIONS_POST_OPERATION: + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + case MERGE_TABLE_REGIONS_UPDATE_META: // It is not safe to rollback if we reach to these states. return false; default: @@ -562,7 +564,6 @@ public class MergeTableRegionsProcedure /** * Set the region states to MERGING state * @param env MasterProcedureEnv - * @throws IOException */ public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException { // Set State.MERGING to regions to be merged @@ -571,23 +572,9 @@ public class MergeTableRegionsProcedure regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.MERGING); } - /** - * Rollback the region state change - * Not used for now, since rollbackCloseRegionsForMerge() will mark regions as OPEN - * @param env MasterProcedureEnv - * @throws IOException - */ - private void setRegionStateBackToOpen(final MasterProcedureEnv env) throws IOException { - // revert region state to Open - RegionStates regionStates = env.getAssignmentManager().getRegionStates(); - regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.OPEN); - regionStates.getRegionStateNode(regionsToMerge[1]).setState(State.OPEN); - } - /** * Create merged region * @param env MasterProcedureEnv - * @throws IOException */ private void createMergedRegion(final MasterProcedureEnv env) throws IOException { final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); @@ -614,7 +601,6 @@ public class MergeTableRegionsProcedure * @param env MasterProcedureEnv * @param regionFs region file system * @param mergedDir the temp directory of merged region - * @throws IOException */ private void mergeStoreFiles( final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir) @@ -642,7 +628,6 @@ public class MergeTableRegionsProcedure /** * Clean up merged region * @param env MasterProcedureEnv - * @throws IOException */ private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException { final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); @@ -777,6 +762,18 @@ public class MergeTableRegionsProcedure return regionLocation; } + private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { + FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + long maxSequenceId = -1L; + for (RegionInfo region : regionsToMerge) { + maxSequenceId = + Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region))); + } + if (maxSequenceId > 0) { + WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId); + } + } + /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index cabccbc3205..70ddbe5b3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -214,52 +215,54 @@ public class SplitTableRegionProcedure @Override protected Flow executeFromState(final MasterProcedureEnv env, final SplitTableRegionState state) throws InterruptedException { - if (isTraceEnabled()) { - LOG.trace(this + " execute state=" + state); - } + LOG.trace("{} execute state={}", this, state); try { switch (state) { - case SPLIT_TABLE_REGION_PREPARE: - if (prepareSplitRegion(env)) { - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION); + case SPLIT_TABLE_REGION_PREPARE: + if (prepareSplitRegion(env)) { + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION); + break; + } else { + return Flow.NO_MORE_STATE; + } + case SPLIT_TABLE_REGION_PRE_OPERATION: + preSplitRegion(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION); break; - } else { + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: + addChildProcedure(createUnassignProcedures(env, getRegionReplication(env))); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS); + break; + case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: + createDaughterRegions(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE); + break; + case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE: + writeMaxSequenceIdFile(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META: + preSplitRegionBeforeMETA(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META); + break; + case SPLIT_TABLE_REGION_UPDATE_META: + updateMetaForDaughterRegions(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: + preSplitRegionAfterMETA(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS); + break; + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + addChildProcedure(createAssignProcedures(env, getRegionReplication(env))); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION); + break; + case SPLIT_TABLE_REGION_POST_OPERATION: + postSplitRegion(env); return Flow.NO_MORE_STATE; - } - case SPLIT_TABLE_REGION_PRE_OPERATION: - preSplitRegion(env); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION); - break; - case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: - addChildProcedure(createUnassignProcedures(env, getRegionReplication(env))); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS); - break; - case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: - createDaughterRegions(env); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META); - break; - case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META: - preSplitRegionBeforeMETA(env); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META); - break; - case SPLIT_TABLE_REGION_UPDATE_META: - updateMetaForDaughterRegions(env); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META); - break; - case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: - preSplitRegionAfterMETA(env); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS); - break; - case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: - addChildProcedure(createAssignProcedures(env, getRegionReplication(env))); - setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION); - break; - case SPLIT_TABLE_REGION_POST_OPERATION: - postSplitRegion(env); - return Flow.NO_MORE_STATE; - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); } } catch (IOException e) { String msg = "Error trying to split region " + getParentRegion().getEncodedName() + @@ -291,27 +294,28 @@ public class SplitTableRegionProcedure try { switch (state) { - case SPLIT_TABLE_REGION_POST_OPERATION: - case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: - case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: - case SPLIT_TABLE_REGION_UPDATE_META: - // PONR - throw new UnsupportedOperationException(this + " unhandled state=" + state); - case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META: - break; - case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: - // Doing nothing, as re-open parent region would clean up daughter region directories. - break; - case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: - openParentRegion(env); - break; - case SPLIT_TABLE_REGION_PRE_OPERATION: - postRollBackSplitRegion(env); - break; - case SPLIT_TABLE_REGION_PREPARE: - break; // nothing to do - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); + case SPLIT_TABLE_REGION_POST_OPERATION: + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: + case SPLIT_TABLE_REGION_UPDATE_META: + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META: + break; + case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: + case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE: + // Doing nothing, as re-open parent region would clean up daughter region directories. + break; + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: + openParentRegion(env); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION: + postRollBackSplitRegion(env); + break; + case SPLIT_TABLE_REGION_PREPARE: + break; // nothing to do + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); } } catch (IOException e) { // This will be retried. Unless there is a bug in the code, @@ -415,11 +419,11 @@ public class SplitTableRegionProcedure return daughter_2_RI.getStartKey(); } - private static State [] EXPECTED_SPLIT_STATES = new State [] {State.OPEN, State.CLOSED}; + private static final State[] EXPECTED_SPLIT_STATES = new State[] { State.OPEN, State.CLOSED }; + /** * Prepare to Split region. * @param env MasterProcedureEnv - * @throws IOException */ @VisibleForTesting public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException { @@ -475,8 +479,6 @@ public class SplitTableRegionProcedure /** * Action before splitting region in a table. * @param env MasterProcedureEnv - * @throws IOException - * @throws InterruptedException */ private void preSplitRegion(final MasterProcedureEnv env) throws IOException, InterruptedException { @@ -499,7 +501,6 @@ public class SplitTableRegionProcedure /** * Action after rollback a split table region action. * @param env MasterProcedureEnv - * @throws IOException */ private void postRollBackSplitRegion(final MasterProcedureEnv env) throws IOException { final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); @@ -511,7 +512,7 @@ public class SplitTableRegionProcedure /** * Rollback close parent region * @param env MasterProcedureEnv - **/ + */ private void openParentRegion(final MasterProcedureEnv env) throws IOException { // Check whether the region is closed; if so, open it in the same server final int regionReplication = getRegionReplication(env); @@ -528,7 +529,6 @@ public class SplitTableRegionProcedure /** * Create daughter regions * @param env MasterProcedureEnv - * @throws IOException */ @VisibleForTesting public void createDaughterRegions(final MasterProcedureEnv env) throws IOException { @@ -558,7 +558,6 @@ public class SplitTableRegionProcedure /** * Create Split directory * @param env MasterProcedureEnv - * @throws IOException */ private Pair splitStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs) throws IOException { @@ -756,7 +755,6 @@ public class SplitTableRegionProcedure /** * Add daughter regions to META * @param env MasterProcedureEnv - * @throws IOException */ private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException { env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env), @@ -823,6 +821,16 @@ public class SplitTableRegionProcedure return htd.getRegionReplication(); } + private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { + FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + long maxSequenceId = + WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion())); + if (maxSequenceId > 0) { + WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId); + } + } + /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index d67d9f9a3a2..833b659a5bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -19,13 +19,17 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; - +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.yetus.audience.InterfaceAudience; /** * Base class for all the Table procedures that want to use a StateMachineProcedure. @@ -114,4 +118,10 @@ public abstract class AbstractStateMachineTableProcedure throw new TableNotFoundException(getTableName()); } } + + protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException { + MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), getTableName()); + return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a64d6f10974..9f3d9bd58f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -947,15 +947,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). - long nextSeqid = maxSeqId; - if (this.writestate.writesEnabled) { - nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), - this.fs.getRegionDir(), nextSeqid, 1); - } else { - nextSeqid++; + long maxSeqIdFromFile = + WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir()); + long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1; + if (writestate.writesEnabled) { + WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId); } - LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqid); + LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId); // A region can be reopened if failed a split; reset flags this.closing.set(false); @@ -967,7 +966,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } status.markComplete("Region opened successfully"); - return nextSeqid; + return nextSeqId; } /** @@ -1103,7 +1102,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // table is still online if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), - mvcc.getReadPoint(), 0); + mvcc.getReadPoint()); } } @@ -7014,7 +7013,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Open HRegion. * Calls initialize and sets sequenceId. * @return Returns this - * @throws IOException */ protected HRegion openHRegion(final CancelableProgressable reporter) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 3f64d75d4a8..6c77c4ce951 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -47,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.ArrayUtils; @@ -89,13 +87,14 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; @@ -623,70 +622,70 @@ public class WALSplitter { || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); } - /** - * Create a file with name as region open sequence id - * @param fs - * @param regiondir - * @param newSeqId - * @param safetyBumper - * @return long new sequence Id value - * @throws IOException - */ - public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir, - long newSeqId, long safetyBumper) throws IOException { + private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException { // TODO: Why are we using a method in here as part of our normal region open where // there is no splitting involved? Fix. St.Ack 01/20/2017. - Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); - long maxSeqId = 0; - FileStatus[] files = null; - if (fs.exists(editsdir)) { - files = FSUtils.listStatus(fs, editsdir, new PathFilter() { - @Override - public boolean accept(Path p) { - return isSequenceIdFile(p); - } - }); - if (files != null) { - for (FileStatus status : files) { - String fileName = status.getPath().getName(); - try { - long tmpSeqId = - Long.parseLong(fileName.substring(0, fileName.length() - - SEQUENCE_ID_FILE_SUFFIX_LENGTH)); - maxSeqId = Math.max(tmpSeqId, maxSeqId); - } catch (NumberFormatException ex) { - LOG.warn("Invalid SeqId File Name={}", fileName); - } - } + Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); + try { + FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile); + return files != null ? files : new FileStatus[0]; + } catch (FileNotFoundException e) { + return new FileStatus[0]; + } + } + + private static long getMaxSequenceId(FileStatus[] files) { + long maxSeqId = -1L; + for (FileStatus file : files) { + String fileName = file.getPath().getName(); + try { + maxSeqId = Math.max(maxSeqId, Long + .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); + } catch (NumberFormatException ex) { + LOG.warn("Invalid SeqId File Name={}", fileName); } } - if (maxSeqId > newSeqId) { - newSeqId = maxSeqId; - } - newSeqId += safetyBumper; // bump up SeqId + return maxSeqId; + } + /** + * Get the max sequence id which is stored in the region directory. -1 if none. + */ + public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException { + return getMaxSequenceId(getSequenceIdFiles(fs, regionDir)); + } + + /** + * Create a file with name as region's max sequence id + */ + public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId) + throws IOException { + FileStatus[] files = getSequenceIdFiles(fs, regionDir); + long maxSeqId = getMaxSequenceId(files); + if (maxSeqId > newMaxSeqId) { + throw new IOException("The new max sequence id " + newMaxSeqId + + " is less than the old max sequence id " + maxSeqId); + } // write a new seqId file - Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX); - if (newSeqId != maxSeqId) { + Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir), + newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); + if (newMaxSeqId != maxSeqId) { try { if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } - LOG.debug("Wrote file={}, newSeqId={}, maxSeqId={}", newSeqIdFile, - newSeqId, maxSeqId); + LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, + maxSeqId); } catch (FileAlreadyExistsException ignored) { // latest hdfs throws this exception. it's all right if newSeqIdFile already exists } } // remove old ones - if (files != null) { - for (FileStatus status : files) { - if (!newSeqIdFile.equals(status.getPath())) { - fs.delete(status.getPath(), false); - } + for (FileStatus status : files) { + if (!newSeqIdFile.equals(status.getPath())) { + fs.delete(status.getPath(), false); } } - return newSeqId; } /** @@ -1817,9 +1816,7 @@ public class WALSplitter { * @throws IOException */ public static List getMutationsFromWALEntry(WALEntry entry, CellScanner cells, - Pair logEntry, Durability durability) - throws IOException { - + Pair logEntry, Durability durability) throws IOException { if (entry == null) { // return an empty array return Collections.emptyList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java new file mode 100644 index 00000000000..e657d9c74a3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase for HBASE-20066 + */ +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestSequenceIdMonotonicallyIncreasing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSequenceIdMonotonicallyIncreasing.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName NAME = TableName.valueOf("test"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws IOException { + Admin admin = UTIL.getAdmin(); + if (admin.tableExists(NAME)) { + admin.disableTable(NAME); + admin.deleteTable(NAME); + } + } + + private Table createTable(boolean multiRegions) throws IOException { + if (multiRegions) { + return UTIL.createTable(NAME, CF, new byte[][] { Bytes.toBytes(1) }); + } else { + return UTIL.createTable(NAME, CF); + } + } + + private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException { + Path walFile = ((AbstractFSWAL) rs.getWAL(null)).getCurrentFileName(); + long maxSeqId = -1L; + try (WAL.Reader reader = + WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) { + for (;;) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + if (Bytes.equals(region.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())) { + maxSeqId = Math.max(maxSeqId, entry.getKey().getSequenceId()); + } + } + } + return maxSeqId; + } + + @Test + public void testSplit() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + try (Table table = createTable(false)) { + table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))); + table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(0))); + } + UTIL.flush(NAME); + HRegionServer rs = UTIL.getRSForFirstRegionInTable(NAME); + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); + UTIL.getAdmin().splitRegionAsync(region.getRegionName(), Bytes.toBytes(1)).get(1, + TimeUnit.MINUTES); + long maxSeqId = getMaxSeqId(rs, region); + RegionLocator locator = UTIL.getConnection().getRegionLocator(NAME); + HRegionLocation locA = locator.getRegionLocation(Bytes.toBytes(0), true); + HRegionLocation locB = locator.getRegionLocation(Bytes.toBytes(1), true); + assertEquals(maxSeqId + 1, locA.getSeqNum()); + assertEquals(maxSeqId + 1, locB.getSeqNum()); + } + + @Test + public void testMerge() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + try (Table table = createTable(true)) { + table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))); + table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(0))); + table.put(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, Bytes.toBytes(0))); + } + UTIL.flush(NAME); + MiniHBaseCluster cluster = UTIL.getMiniHBaseCluster(); + List regions = cluster.getRegions(NAME); + HRegion regionA = regions.get(0); + HRegion regionB = regions.get(1); + HRegionServer rsA = + cluster.getRegionServer(cluster.getServerWith(regionA.getRegionInfo().getRegionName())); + HRegionServer rsB = + cluster.getRegionServer(cluster.getServerWith(regionB.getRegionInfo().getRegionName())); + UTIL.getAdmin().mergeRegionsAsync(regionA.getRegionInfo().getRegionName(), + regionB.getRegionInfo().getRegionName(), false).get(1, TimeUnit.MINUTES); + long maxSeqIdA = getMaxSeqId(rsA, regionA.getRegionInfo()); + long maxSeqIdB = getMaxSeqId(rsB, regionB.getRegionInfo()); + HRegionLocation loc = + UTIL.getConnection().getRegionLocator(NAME).getRegionLocation(Bytes.toBytes(0), true); + assertEquals(Math.max(maxSeqIdA, maxSeqIdB) + 1, loc.getSeqNum()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index 1f9fc5d53f0..3e2f1139416 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -478,42 +478,6 @@ public abstract class AbstractTestDLS { } } - @Test - public void testReadWriteSeqIdFiles() throws Exception { - LOG.info("testReadWriteSeqIdFiles"); - startCluster(2); - final ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); - Table ht = installTable(zkw, 10); - try { - FileSystem fs = master.getMasterFileSystem().getFileSystem(); - Path tableDir = - FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName())); - List regionDirs = FSUtils.getRegionDirs(fs, tableDir); - long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); - WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); - assertEquals(newSeqId + 2000, - WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); - - Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); - FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { - @Override - public boolean accept(Path p) { - return WALSplitter.isSequenceIdFile(p); - } - }); - // only one seqid file should exist - assertEquals(1, files.length); - - // verify all seqId files aren't treated as recovered.edits files - NavigableSet recoveredEdits = - WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0)); - assertEquals(0, recoveredEdits.size()); - } finally { - if (ht != null) ht.close(); - if (zkw != null) zkw.close(); - } - } - private Table installTable(ZKWatcher zkw, int nrs) throws Exception { return installTable(zkw, nrs, 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java new file mode 100644 index 00000000000..6e3aa105f70 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.NavigableSet; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestReadWriteSeqIdFiles { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReadWriteSeqIdFiles.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestReadWriteSeqIdFiles.class); + + private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + private static FileSystem FS; + + private static Path REGION_DIR; + + @BeforeClass + public static void setUp() throws IOException { + FS = FileSystem.getLocal(UTIL.getConfiguration()); + REGION_DIR = UTIL.getDataTestDir(); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.cleanupTestDir(); + } + + @Test + public void test() throws IOException { + WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L); + assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); + WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L); + assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); + // can not write a sequence id which is smaller + try { + WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L); + } catch (IOException e) { + // expected + LOG.info("Expected error", e); + } + + Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR); + FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + return WALSplitter.isSequenceIdFile(p); + } + }); + // only one seqid file should exist + assertEquals(1, files.length); + + // verify all seqId files aren't treated as recovered.edits files + NavigableSet recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR); + assertEquals(0, recoveredEdits.size()); + } +}