HBASE-15073 Finer grained control over normalization actions for RegionNormalizer

This commit is contained in:
tedyu 2016-01-06 15:38:46 -08:00
parent c73b4f8df6
commit 53983f5b2e
13 changed files with 138 additions and 46 deletions

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@ -201,13 +203,14 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
/**
* <em>INTERNAL</em> Used by shell/rest interface to access this metadata
* attribute which denotes if the table should be treated by region normalizer.
* attribute which denotes the allowed types of action (split/merge) when the table is treated
* by region normalizer.
*
* @see #isNormalizationEnabled()
* @see #getDesiredNormalizationTypes()
*/
public static final String NORMALIZATION_ENABLED = "NORMALIZATION_ENABLED";
private static final ImmutableBytesWritable NORMALIZATION_ENABLED_KEY =
new ImmutableBytesWritable(Bytes.toBytes(NORMALIZATION_ENABLED));
public static final String NORMALIZATION_MODE = "NORMALIZATION_MODE";
private static final ImmutableBytesWritable NORMALIZATION_MODE_KEY =
new ImmutableBytesWritable(Bytes.toBytes(NORMALIZATION_MODE));
/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
@ -235,11 +238,6 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
*/
public static final boolean DEFAULT_COMPACTION_ENABLED = true;
/**
* Constant that denotes whether the table is normalized by default.
*/
public static final boolean DEFAULT_NORMALIZATION_ENABLED = false;
/**
* Constant that denotes the maximum default size of the memstore after which
* the contents are flushed to the store files
@ -264,7 +262,7 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH));
DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name
DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION));
DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED));
DEFAULT_VALUES.put(NORMALIZATION_MODE, "");
for (String s : DEFAULT_VALUES.keySet()) {
RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s)));
}
@ -657,22 +655,42 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
}
/**
* Check if normalization enable flag of the table is true. If flag is
* false then no region normalizer won't attempt to normalize this table.
* Check if normalization flag of the table. If flag is
* empty then region normalizer won't attempt to normalize this table.
*
* @return true if region normalization is enabled for this table
* @return List of PlanType if region normalization is enabled for this table
* null means region normalization is disabled
*/
public boolean isNormalizationEnabled() {
return isSomething(NORMALIZATION_ENABLED_KEY, DEFAULT_NORMALIZATION_ENABLED);
public List<PlanType> getDesiredNormalizationTypes() {
byte [] value = getValue(NORMALIZATION_MODE_KEY);
if (value == null) {
return null;
}
String strValue = Bytes.toString(value);
if (strValue.isEmpty()) {
return null;
}
List<NormalizationPlan.PlanType> types = new ArrayList<>();
if (strValue.toUpperCase().contains("M")) {
types.add(PlanType.MERGE);
}
if (strValue.toUpperCase().contains("S")) {
types.add(PlanType.SPLIT);
}
return types;
}
/**
* Setting the table normalization enable flag.
* Setting the types of action for table normalization mode flag.
*
* @param isEnable True if enable normalization.
* @param types String containing desired types of action:
* "M" for region merge
* "S" for region split
* "MS" for region merge / split
*/
public HTableDescriptor setNormalizationEnabled(final boolean isEnable) {
setValue(NORMALIZATION_ENABLED_KEY, isEnable ? TRUE : FALSE);
public HTableDescriptor setNormalizationMode(final String types) {
setValue(NORMALIZATION_MODE_KEY, types == null || types.isEmpty() ? null :
new ImmutableBytesWritable(Bytes.toBytes(types.toUpperCase())));
return this;
}

View File

@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
package org.apache.hadoop.hbase.normalizer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
@ -26,10 +26,20 @@ import org.apache.hadoop.hbase.client.Admin;
*/
@InterfaceAudience.Private
public interface NormalizationPlan {
enum PlanType {
SPLIT,
MERGE,
NONE
}
/**
* Executes normalization plan on cluster (does actual splitting/merging work).
* @param admin instance of Admin
*/
void execute(Admin admin);
/**
* @return the type of this plan
*/
PlanType getType();
}

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
@ -1356,13 +1357,20 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
LOG.debug("Skipping normalizing " + table + " since its namespace has quota");
continue;
}
if (table.isSystemTable() || (getTableDescriptors().get(table) != null &&
!getTableDescriptors().get(table).isNormalizationEnabled())) {
LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
+ " table or doesn't have auto normalization turned on");
if (table.isSystemTable()) {
LOG.debug("Skipping normalization for table: " + table + ", as it's system table");
continue;
}
this.normalizer.computePlanForTable(table).execute(clusterConnection.getAdmin());
List<PlanType> types = null;
if (getTableDescriptors().get(table) != null) {
types = getTableDescriptors().get(table).getDesiredNormalizationTypes();
if (types == null) {
LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
+ " table or doesn't have auto normalization turned on");
continue;
}
}
this.normalizer.computePlanForTable(table, types).execute(clusterConnection.getAdmin());
}
}
// If Region did not generate any plans, it means the cluster is already balanced.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
/**
* Plan which signifies that no normalization is required,
@ -45,4 +46,9 @@ public final class EmptyNormalizationPlan implements NormalizationPlan {
@Override
public void execute(Admin admin) {
}
@Override
public PlanType getType() {
return PlanType.NONE;
}
}

View File

@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import java.io.IOException;
@ -41,6 +42,11 @@ public class MergeNormalizationPlan implements NormalizationPlan {
this.secondRegion = secondRegion;
}
@Override
public PlanType getType() {
return PlanType.MERGE;
}
HRegionInfo getFirstRegion() {
return firstRegion;
}

View File

@ -18,10 +18,14 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.util.List;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
/**
* Performs "normalization" of regions on the cluster, making sure that suboptimal
@ -45,7 +49,9 @@ public interface RegionNormalizer {
/**
* Computes next optimal normalization plan.
* @param table table to normalize
* @param types desired types of NormalizationPlan
* @return Next (perhaps most urgent) normalization action to perform
*/
NormalizationPlan computePlanForTable(TableName table) throws HBaseIOException;
NormalizationPlan computePlanForTable(TableName table, List<PlanType> types)
throws HBaseIOException;
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.util.Triple;
import java.util.ArrayList;
@ -90,10 +92,12 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
* Action may be either a split, or a merge, or no action.
*
* @param table table to normalize
* @param types desired types of NormalizationPlan
* @return normalization plan to execute
*/
@Override
public NormalizationPlan computePlanForTable(TableName table) throws HBaseIOException {
public NormalizationPlan computePlanForTable(TableName table, List<PlanType> types)
throws HBaseIOException {
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of system table " + table + " isn't allowed");
return EmptyNormalizationPlan.getInstance();
@ -134,7 +138,7 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
// now; if the largest region is >2 times large than average, we split it, split
// is more high priority normalization action than merge.
if (largestRegion.getSecond() > 2 * avgRegionSize) {
if (types.contains(PlanType.SPLIT) && largestRegion.getSecond() > 2 * avgRegionSize) {
LOG.debug("Table " + table + ", largest region "
+ largestRegion.getFirst().getRegionNameAsString() + " has size "
+ largestRegion.getSecond() + ", more than 2 times than avg size, splitting");
@ -155,7 +159,8 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
}
Triple<HRegionInfo, Long, Integer> candidateRegion = regionsWithSize.get(candidateIdx);
Triple<HRegionInfo, Long, Integer> candidateRegion2 = regionsWithSize.get(candidateIdx+1);
if (candidateRegion.getSecond() + candidateRegion2.getSecond() < avgRegionSize) {
if (types.contains(PlanType.MERGE) &&
candidateRegion.getSecond() + candidateRegion2.getSecond() < avgRegionSize) {
LOG.debug("Table " + table + ", smallest region size: " + candidateRegion.getSecond()
+ " and its smallest neighbor size: " + candidateRegion2.getSecond()
+ ", less than the avg size, merging them");

View File

@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import java.io.IOException;
import java.util.Arrays;
@ -42,6 +43,11 @@ public class SplitNormalizationPlan implements NormalizationPlan {
this.splitPoint = splitPoint;
}
@Override
public PlanType getType() {
return PlanType.SPLIT;
}
public HRegionInfo getRegionInfo() {
return regionInfo;
}

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
@ -52,6 +54,18 @@ public class TestSimpleRegionNormalizer {
private static final Log LOG = LogFactory.getLog(TestSimpleRegionNormalizer.class);
private static RegionNormalizer normalizer;
private static List<PlanType> bothTypes;
static {
bothTypes = new ArrayList<>();
bothTypes.add(PlanType.SPLIT);
bothTypes.add(PlanType.MERGE);
}
private static List<PlanType> splitType;
static {
splitType = new ArrayList<>();
splitType.add(PlanType.SPLIT);
}
// mocks
private static MasterServices masterServices;
@ -68,7 +82,7 @@ public class TestSimpleRegionNormalizer {
Map<byte[], Integer> regionSizes = new HashMap<>();
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue(plan instanceof EmptyNormalizationPlan);
}
@ -87,7 +101,7 @@ public class TestSimpleRegionNormalizer {
regionSizes.put(hri2.getRegionName(), 15);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue((plan instanceof EmptyNormalizationPlan));
}
@ -113,14 +127,18 @@ public class TestSimpleRegionNormalizer {
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 10);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue(plan instanceof EmptyNormalizationPlan);
}
@Test
public void testMergeOfSmallRegions() throws HBaseIOException {
testMergeOfSmallRegions(true);
testMergeOfSmallRegions(false);
}
public void testMergeOfSmallRegions(boolean mergeDesired) throws HBaseIOException {
TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
@ -146,11 +164,16 @@ public class TestSimpleRegionNormalizer {
regionSizes.put(hri5.getRegionName(), 16);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable,
mergeDesired ? bothTypes : splitType);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
if (mergeDesired) {
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
} else {
assertTrue(plan instanceof EmptyNormalizationPlan);
}
}
// Test for situation illustrated in HBASE-14867
@ -185,7 +208,7 @@ public class TestSimpleRegionNormalizer {
regionSizes.put(hri6.getRegionName(), 2700);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
@ -219,7 +242,7 @@ public class TestSimpleRegionNormalizer {
regionSizes.put(hri5.getRegionName(), 5);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue(plan instanceof EmptyNormalizationPlan);
}
@ -247,7 +270,7 @@ public class TestSimpleRegionNormalizer {
regionSizes.put(hri4.getRegionName(), 30);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue(plan instanceof SplitNormalizationPlan);
assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());

View File

@ -113,7 +113,7 @@ public class TestSimpleRegionNormalizerOnCluster {
}
HTableDescriptor htd = admin.getTableDescriptor(TABLENAME);
htd.setNormalizationEnabled(true);
htd.setNormalizationMode("MS");
admin.modifyTable(TABLENAME, htd);
admin.flush(TABLENAME);
@ -178,7 +178,7 @@ public class TestSimpleRegionNormalizerOnCluster {
}
HTableDescriptor htd = admin.getTableDescriptor(TABLENAME);
htd.setNormalizationEnabled(true);
htd.setNormalizationMode("MS");
admin.modifyTable(TABLENAME, htd);
admin.flush(TABLENAME);

View File

@ -246,7 +246,10 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Parse arguments and update HTableDescriptor accordingly
def parse_htd_args(htd, arg)
htd.setNormalizationEnabled(JBoolean.valueOf(arg.delete(NORMALIZATION_ENABLED))) if arg[NORMALIZATION_ENABLED]
if arg.has_key?(NORMALIZATION_MODE)
mode = arg.delete(NORMALIZATION_MODE)
htd.setValue(NORMALIZATION_MODE, mode)
end
end
#----------------------------------------------------------------------------------------------

View File

@ -22,7 +22,7 @@ module Shell
class Normalize < Command
def help
return <<-EOF
Trigger region normalizer for all tables which have NORMALIZATION_ENABLED flag set. Returns true
Trigger region normalizer for all tables which have NORMALIZATION_MODE flag set. Returns true
if normalizer ran successfully, false otherwise. Note that this command has no effect
if region normalizer is disabled (make sure it's turned on using 'normalizer_switch' command).

View File

@ -23,7 +23,8 @@ module Shell
def help
return <<-EOF
Enable/Disable region normalizer. Returns previous normalizer state.
When normalizer is enabled, it handles all tables with 'NORMALIZATION_ENABLED' => true.
When normalizer is enabled, it handles all tables with 'NORMALIZATION_MODE' flag containing
types of normalization actions.
Examples:
hbase> normalizer_switch true