HBASE-13103 [ergonomics] add region size balancing as a feature of master

This commit is contained in:
Mikhail Antonov 2015-06-22 15:52:07 -07:00
parent d51a184051
commit fd37ccb63c
14 changed files with 1146 additions and 5 deletions

View File

@ -185,6 +185,16 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
private static final Bytes REGION_MEMSTORE_REPLICATION_KEY = private static final Bytes REGION_MEMSTORE_REPLICATION_KEY =
new Bytes(Bytes.toBytes(REGION_MEMSTORE_REPLICATION)); new Bytes(Bytes.toBytes(REGION_MEMSTORE_REPLICATION));
/**
* <em>INTERNAL</em> Used by shell/rest interface to access this metadata
* attribute which denotes if the table should be treated by region normalizer.
*
* @see #isNormalizationEnabled()
*/
public static final String NORMALIZATION_ENABLED = "NORMALIZATION_ENABLED";
private static final Bytes NORMALIZATION_ENABLED_KEY =
new Bytes(Bytes.toBytes(NORMALIZATION_ENABLED));
/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
@ -211,6 +221,11 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
*/ */
public static final boolean DEFAULT_COMPACTION_ENABLED = true; public static final boolean DEFAULT_COMPACTION_ENABLED = true;
/**
* Constant that denotes whether the table is normalized by default.
*/
public static final boolean DEFAULT_NORMALIZATION_ENABLED = false;
/** /**
* Constant that denotes the maximum default size of the memstore after which * Constant that denotes the maximum default size of the memstore after which
* the contents are flushed to the store files * the contents are flushed to the store files
@ -613,6 +628,26 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
return this; return this;
} }
/**
* Check if normalization enable flag of the table is true. If flag is
* false then no region normalizer won't attempt to normalize this table.
*
* @return true if region normalization is enabled for this table
*/
public boolean isNormalizationEnabled() {
return isSomething(NORMALIZATION_ENABLED_KEY, DEFAULT_NORMALIZATION_ENABLED);
}
/**
* Setting the table normalization enable flag.
*
* @param isEnable True if enable normalization.
*/
public HTableDescriptor setNormalizationEnabled(final boolean isEnable) {
setValue(NORMALIZATION_ENABLED_KEY, isEnable ? TRUE : FALSE);
return this;
}
/** /**
* Sets the {@link Durability} setting for the table. This defaults to Durability.USE_DEFAULT. * Sets the {@link Durability} setting for the table. This defaults to Durability.USE_DEFAULT.
* @param durability enum value * @param durability enum value

View File

@ -123,6 +123,14 @@ public final class HConstants {
/** Config for pluggable load balancers */ /** Config for pluggable load balancers */
public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class"; public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class";
/** Config for pluggable region normalizer */
public static final String HBASE_MASTER_NORMALIZER_CLASS =
"hbase.master.normalizer.class";
/** Config for enabling/disabling pluggable region normalizer */
public static final String HBASE_NORMALIZER_ENABLED =
"hbase.normalizer.enabled";
/** Cluster is standalone or pseudo-distributed */ /** Cluster is standalone or pseudo-distributed */
public static final boolean CLUSTER_IS_LOCAL = false; public static final boolean CLUSTER_IS_LOCAL = false;

View File

@ -582,6 +582,17 @@ possible configurations would overwhelm and obscure the important.
<value>300000</value> <value>300000</value>
<description>Period at which the region balancer runs in the Master.</description> <description>Period at which the region balancer runs in the Master.</description>
</property> </property>
<property>
<name>hbase.normalizer.enabled</name>
<value>false</value>
<description>If set to true, Master will try to keep region size
within each table approximately the same.</description>
</property>
<property>
<name>hbase.normalizer.period</name>
<value>1800000</value>
<description>Period at which the region normalizer runs in the Master.</description>
</property>
<property> <property>
<name>hbase.regions.slop</name> <name>hbase.regions.slop</name>
<value>0.2</value> <value>0.2</value>
@ -1417,6 +1428,15 @@ possible configurations would overwhelm and obscure the important.
as the SimpleLoadBalancer). as the SimpleLoadBalancer).
</description> </description>
</property> </property>
<property>
<name>hbase.master.normalizer.class</name>
<value>org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer</value>
<description>
Class used to execute the region normalization when the period occurs.
See the class comment for more on how it works
http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.html
</description>
</property>
<property> <property>
<name>hbase.security.exec.permission.checks</name> <name>hbase.security.exec.permission.checks</name>
<value>false</value> <value>false</value>

View File

@ -101,6 +101,9 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -268,7 +271,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private volatile boolean serverCrashProcessingEnabled = false; private volatile boolean serverCrashProcessingEnabled = false;
LoadBalancer balancer; LoadBalancer balancer;
RegionNormalizer normalizer;
private boolean normalizerEnabled = false;
private BalancerChore balancerChore; private BalancerChore balancerChore;
private RegionNormalizerChore normalizerChore;
private ClusterStatusChore clusterStatusChore; private ClusterStatusChore clusterStatusChore;
private ClusterStatusPublisher clusterStatusPublisherChore = null; private ClusterStatusPublisher clusterStatusPublisherChore = null;
@ -546,6 +552,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
void initializeZKBasedSystemTrackers() throws IOException, void initializeZKBasedSystemTrackers() throws IOException,
InterruptedException, KeeperException, CoordinatedStateException { InterruptedException, KeeperException, CoordinatedStateException {
this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this);
this.normalizerEnabled = conf.getBoolean(HConstants.HBASE_NORMALIZER_ENABLED, false);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start(); this.loadBalancerTracker.start();
this.assignmentManager = new AssignmentManager(this, serverManager, this.assignmentManager = new AssignmentManager(this, serverManager,
@ -742,6 +751,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
getChoreService().scheduleChore(clusterStatusChore); getChoreService().scheduleChore(clusterStatusChore);
this.balancerChore = new BalancerChore(this); this.balancerChore = new BalancerChore(this);
getChoreService().scheduleChore(balancerChore); getChoreService().scheduleChore(balancerChore);
this.normalizerChore = new RegionNormalizerChore(this);
getChoreService().scheduleChore(normalizerChore);
this.catalogJanitorChore = new CatalogJanitor(this, this); this.catalogJanitorChore = new CatalogJanitor(this, this);
getChoreService().scheduleChore(catalogJanitorChore); getChoreService().scheduleChore(catalogJanitorChore);
@ -1119,6 +1130,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (this.balancerChore != null) { if (this.balancerChore != null) {
this.balancerChore.cancel(true); this.balancerChore.cancel(true);
} }
if (this.normalizerChore != null) {
this.normalizerChore.cancel(true);
}
if (this.clusterStatusChore != null) { if (this.clusterStatusChore != null) {
this.clusterStatusChore.cancel(true); this.clusterStatusChore.cancel(true);
} }
@ -1248,6 +1262,47 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
return true; return true;
} }
/**
* Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
*
* @return true if normalization step was performed successfully, false otherwise
* (specifically, if HMaster hasn't been initialized properly or normalization
* is globally disabled)
* @throws IOException
*/
public boolean normalizeRegions() throws IOException {
if (!this.initialized) {
LOG.debug("Master has not been initialized, don't run region normalizer.");
return false;
}
if (!this.normalizerEnabled) {
LOG.debug("Region normalization is disabled, don't run region normalizer.");
return false;
}
synchronized (this.normalizer) {
// Don't run the normalizer concurrently
List<TableName> allEnabledTables = new ArrayList<>(
this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
Collections.shuffle(allEnabledTables);
for(TableName table : allEnabledTables) {
if (table.isSystemTable() || !getTableDescriptors().getDescriptor(table).
getHTableDescriptor().isNormalizationEnabled()) {
LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
+ " table or doesn't have auto normalization turned on");
continue;
}
this.normalizer.computePlanForTable(table).execute(clusterConnection.getAdmin());
}
}
// If Region did not generate any plans, it means the cluster is already balanced.
// Return true indicating a success.
return true;
}
/** /**
* @return Client info for use as prefix on an audit log string; who did an action * @return Client info for use as prefix on an audit log string; who did an action
*/ */
@ -1270,7 +1325,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
final HRegionInfo region_b, final boolean forcible) throws IOException { final HRegionInfo region_b, final boolean forcible) throws IOException {
checkInitialized(); checkInitialized();
this.service.submit(new DispatchMergingRegionHandler(this, this.service.submit(new DispatchMergingRegionHandler(this,
this.catalogJanitorChore, region_a, region_b, forcible)); this.catalogJanitorChore, region_a, region_b, forcible));
} }
void move(final byte[] encodedRegionName, void move(final byte[] encodedRegionName,
@ -1524,7 +1579,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
HConstants.DEFAULT_ZK_SESSION_TIMEOUT); HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
// If we're a backup master, stall until a primary to writes his address // If we're a backup master, stall until a primary to writes his address
if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
LOG.debug("HMaster started in backup mode. " LOG.debug("HMaster started in backup mode. "
+ "Stalling until master znode is written."); + "Stalling until master znode is written.");
// This will only be a minute or so while the cluster starts up, // This will only be a minute or so while the cluster starts up,
@ -1546,12 +1601,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
LOG.fatal("Failed to become active master", t); LOG.fatal("Failed to become active master", t);
// HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
if (t instanceof NoClassDefFoundError && if (t instanceof NoClassDefFoundError &&
t.getMessage() t.getMessage()
.contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) { .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
// improved error message for this special case // improved error message for this special case
abort("HBase is having a problem with its Hadoop jars. You may need to " abort("HBase is having a problem with its Hadoop jars. You may need to "
+ "recompile HBase against Hadoop version " + "recompile HBase against Hadoop version "
+ org.apache.hadoop.util.VersionInfo.getVersion() + org.apache.hadoop.util.VersionInfo.getVersion()
+ " or change your hadoop jars to start properly", t); + " or change your hadoop jars to start properly", t);
} else { } else {
abort("Unhandled exception. Starting shutdown.", t); abort("Unhandled exception. Starting shutdown.", t);

View File

@ -0,0 +1,48 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
/**
* Plan which signifies that no normalization is required,
* or normalization of this table isn't allowed, this is singleton.
*/
@InterfaceAudience.Private
public final class EmptyNormalizationPlan implements NormalizationPlan {
private static final EmptyNormalizationPlan instance = new EmptyNormalizationPlan();
private EmptyNormalizationPlan() {
}
/**
* @return singleton instance
*/
public static EmptyNormalizationPlan getInstance(){
return instance;
}
/**
* No-op for empty plan.
*/
@Override
public void execute(Admin admin) {
}
}

View File

@ -0,0 +1,73 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import java.io.IOException;
/**
* Normalization plan to merge regions (smallest region in the table with its smallest neighbor).
*/
@InterfaceAudience.Private
public class MergeNormalizationPlan implements NormalizationPlan {
private static final Log LOG = LogFactory.getLog(MergeNormalizationPlan.class.getName());
private final HRegionInfo firstRegion;
private final HRegionInfo secondRegion;
public MergeNormalizationPlan(HRegionInfo firstRegion, HRegionInfo secondRegion) {
this.firstRegion = firstRegion;
this.secondRegion = secondRegion;
}
HRegionInfo getFirstRegion() {
return firstRegion;
}
HRegionInfo getSecondRegion() {
return secondRegion;
}
@Override
public String toString() {
return "MergeNormalizationPlan{" +
"firstRegion=" + firstRegion +
", secondRegion=" + secondRegion +
'}';
}
/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing merging normalization plan: " + this);
try {
admin.mergeRegions(firstRegion.getEncodedNameAsBytes(),
secondRegion.getEncodedNameAsBytes(), true);
} catch (IOException ex) {
LOG.error("Error during region merge: ", ex);
}
}
}

View File

@ -0,0 +1,35 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
/**
* Interface for normalization plan.
*/
@InterfaceAudience.Private
public interface NormalizationPlan {
/**
* Executes normalization plan on cluster (does actual splitting/merging work).
* @param admin instance of Admin
*/
void execute(Admin admin);
}

View File

@ -0,0 +1,51 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
/**
* Performs "normalization" of regions on the cluster, making sure that suboptimal
* choice of split keys doesn't leave cluster in a situation when some regions are
* substantially larger than others for considerable amount of time.
*
* Users who want to use this feature could either use default {@link SimpleRegionNormalizer}
* or plug in their own implementation. Please note that overly aggressive normalization rules
* (attempting to make all regions perfectly equal in size) could potentially lead to
* "split/merge storms".
*/
@InterfaceAudience.Private
public interface RegionNormalizer {
/**
* Set the master service. Must be called before first call to
* {@link #computePlanForTable(TableName)}.
* @param masterServices master services to use
*/
void setMasterServices(MasterServices masterServices);
/**
* Computes next optimal normalization plan.
* @param table table to normalize
* @return Next (perhaps most urgent) normalization action to perform
*/
NormalizationPlan computePlanForTable(TableName table) throws HBaseIOException;
}

View File

@ -0,0 +1,53 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.HMaster;
import java.io.IOException;
/**
* Chore that will call {@link org.apache.hadoop.hbase.master.HMaster#normalizeRegions()}
* when needed.
*/
@InterfaceAudience.Private
public class RegionNormalizerChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(RegionNormalizerChore.class);
private final HMaster master;
public RegionNormalizerChore(HMaster master) {
super(master.getServerName() + "-RegionNormalizerChore", master,
master.getConfiguration().getInt("hbase.normalizer.period", 1800000));
this.master = master;
}
@Override
protected void chore() {
try {
master.normalizeRegions();
} catch (IOException e) {
LOG.error("Failed to normalize regions.", e);
}
}
}

View File

@ -0,0 +1,48 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Factory to create instance of {@link RegionNormalizer} as configured.
*/
@InterfaceAudience.Private
public final class RegionNormalizerFactory {
private RegionNormalizerFactory() {
}
/**
* Create a region normalizer from the given conf.
* @param conf configuration
* @return {@link RegionNormalizer} implementation
*/
public static RegionNormalizer getRegionNormalizer(Configuration conf) {
// Create instance of Region Normalizer
Class<? extends RegionNormalizer> balancerKlass =
conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class,
RegionNormalizer.class);
return ReflectionUtils.newInstance(balancerKlass, conf);
}
}

View File

@ -0,0 +1,176 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Pair;
import java.util.List;
/**
* Simple implementation of region normalizer.
*
* Logic in use:
*
* - get all regions of a given table
* - get avg size S of each region (by total size of store files reported in RegionLoad)
* - If biggest region is bigger than S * 2, it is kindly requested to split,
* and normalization stops
* - Otherwise, two smallest region R1 and its smallest neighbor R2 are kindly requested
* to merge, if R1 + R1 < S, and normalization stops
* - Otherwise, no action is performed
*/
@InterfaceAudience.Private
public class SimpleRegionNormalizer implements RegionNormalizer {
private static final Log LOG = LogFactory.getLog(SimpleRegionNormalizer.class);
private MasterServices masterServices;
/**
* Set the master service.
* @param masterServices inject instance of MasterServices
*/
@Override
public void setMasterServices(MasterServices masterServices) {
this.masterServices = masterServices;
}
/**
* Computes next most "urgent" normalization action on the table.
* Action may be either a split, or a merge, or no action.
*
* @param table table to normalize
* @return normalization plan to execute
*/
@Override
public NormalizationPlan computePlanForTable(TableName table)
throws HBaseIOException {
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of table " + table + " isn't allowed");
return EmptyNormalizationPlan.getInstance();
}
List<HRegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
getRegionsOfTable(table);
//TODO: should we make min number of regions a config param?
if (tableRegions == null || tableRegions.size() < 3) {
LOG.debug("Table " + table + " has " + tableRegions.size() + " regions, required min number"
+ " of regions for normalizer to run is 3, not running normalizer");
return EmptyNormalizationPlan.getInstance();
}
LOG.debug("Computing normalization plan for table: " + table +
", number of regions: " + tableRegions.size());
long totalSizeMb = 0;
Pair<HRegionInfo, Long> largestRegion = new Pair<>();
// A is a smallest region, B is it's smallest neighbor
Pair<HRegionInfo, Long> smallestRegion = new Pair<>();
Pair<HRegionInfo, Long> smallestNeighborOfSmallestRegion;
int smallestRegionIndex = 0;
for (int i = 0; i < tableRegions.size(); i++) {
HRegionInfo hri = tableRegions.get(i);
long regionSize = getRegionSize(hri);
totalSizeMb += regionSize;
if (largestRegion.getFirst() == null || regionSize > largestRegion.getSecond()) {
largestRegion.setFirst(hri);
largestRegion.setSecond(regionSize);
}
if (smallestRegion.getFirst() == null || regionSize < smallestRegion.getSecond()) {
smallestRegion.setFirst(hri);
smallestRegion.setSecond(regionSize);
smallestRegionIndex = i;
}
}
// now get smallest neighbor of smallest region
long leftNeighborSize = -1;
long rightNeighborSize = -1;
if (smallestRegionIndex > 0) {
leftNeighborSize = getRegionSize(tableRegions.get(smallestRegionIndex - 1));
}
if (smallestRegionIndex < tableRegions.size() - 1) {
rightNeighborSize = getRegionSize(tableRegions.get(smallestRegionIndex + 1));
}
if (leftNeighborSize == -1) {
smallestNeighborOfSmallestRegion =
new Pair<>(tableRegions.get(smallestRegionIndex + 1), rightNeighborSize);
} else if (rightNeighborSize == -1) {
smallestNeighborOfSmallestRegion =
new Pair<>(tableRegions.get(smallestRegionIndex - 1), leftNeighborSize);
} else {
if (leftNeighborSize < rightNeighborSize) {
smallestNeighborOfSmallestRegion =
new Pair<>(tableRegions.get(smallestRegionIndex - 1), leftNeighborSize);
} else {
smallestNeighborOfSmallestRegion =
new Pair<>(tableRegions.get(smallestRegionIndex + 1), rightNeighborSize);
}
}
double avgRegionSize = totalSizeMb / (double) tableRegions.size();
LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
// now; if the largest region is >2 times large than average, we split it, split
// is more high priority normalization action than merge.
if (largestRegion.getSecond() > 2 * avgRegionSize) {
LOG.debug("Table " + table + ", largest region "
+ largestRegion.getFirst().getRegionName() + " has size "
+ largestRegion.getSecond() + ", more than 2 times than avg size, splitting");
return new SplitNormalizationPlan(largestRegion.getFirst(), null);
} else {
if ((smallestRegion.getSecond() + smallestNeighborOfSmallestRegion.getSecond()
< avgRegionSize)) {
LOG.debug("Table " + table + ", smallest region size: " + smallestRegion.getSecond()
+ " and its smallest neighbor size: " + smallestNeighborOfSmallestRegion.getSecond()
+ ", less than half the avg size, merging them");
return new MergeNormalizationPlan(smallestRegion.getFirst(),
smallestNeighborOfSmallestRegion.getFirst());
} else {
LOG.debug("No normalization needed, regions look good for table: " + table);
return EmptyNormalizationPlan.getInstance();
}
}
}
private long getRegionSize(HRegionInfo hri) {
ServerName sn = masterServices.getAssignmentManager().getRegionStates().
getRegionServerOfRegion(hri);
RegionLoad regionLoad = masterServices.getServerManager().getLoad(sn).
getRegionsLoad().get(hri.getRegionName());
return regionLoad.getStorefileSizeMB();
}
}

View File

@ -0,0 +1,81 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import java.io.IOException;
import java.util.Arrays;
/**
* Normalization plan to split region.
*/
@InterfaceAudience.Private
public class SplitNormalizationPlan implements NormalizationPlan {
private static final Log LOG = LogFactory.getLog(SplitNormalizationPlan.class.getName());
private HRegionInfo regionInfo;
private byte[] splitPoint;
public SplitNormalizationPlan(HRegionInfo regionInfo, byte[] splitPoint) {
this.regionInfo = regionInfo;
this.splitPoint = splitPoint;
}
public HRegionInfo getRegionInfo() {
return regionInfo;
}
public void setRegionInfo(HRegionInfo regionInfo) {
this.regionInfo = regionInfo;
}
public byte[] getSplitPoint() {
return splitPoint;
}
public void setSplitPoint(byte[] splitPoint) {
this.splitPoint = splitPoint;
}
@Override
public String toString() {
return "SplitNormalizationPlan{" +
"regionInfo=" + regionInfo +
", splitPoint=" + Arrays.toString(splitPoint) +
'}';
}
/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing splitting normalization plan: " + this);
try {
admin.splitRegion(regionInfo.getRegionName());
} catch (IOException ex) {
LOG.error("Error during region split: ", ex);
}
}
}

View File

@ -0,0 +1,240 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.when;
/**
* Tests logic of {@link SimpleRegionNormalizer}.
*/
@Category({MasterTests.class, SmallTests.class})
public class TestSimpleRegionNormalizer {
private static final Log LOG = LogFactory.getLog(TestSimpleRegionNormalizer.class);
private static RegionNormalizer normalizer;
// mocks
private static MasterServices masterServices;
@BeforeClass
public static void beforeAllTests() throws Exception {
normalizer = new SimpleRegionNormalizer();
}
@Test
public void testNoNormalizationForMetaTable() throws HBaseIOException {
TableName testTable = TableName.META_TABLE_NAME;
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
assertTrue(plan instanceof EmptyNormalizationPlan);
}
@Test
public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 10);
HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 15);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
assertTrue((plan instanceof EmptyNormalizationPlan));
}
@Test
public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 10);
HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 15);
HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 8);
HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 10);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
assertTrue(plan instanceof EmptyNormalizationPlan);
}
@Test
public void testMergeOfSmallRegions() throws HBaseIOException {
TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 15);
HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 5);
HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 5);
HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 15);
HRegionInfo hri5 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
hris.add(hri4);
regionSizes.put(hri5.getRegionName(), 16);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
}
@Test
public void testMergeOfSmallNonAdjacentRegions() throws HBaseIOException {
TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 15);
HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 5);
HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 16);
HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 15);
HRegionInfo hri5 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
hris.add(hri4);
regionSizes.put(hri5.getRegionName(), 5);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
assertTrue(plan instanceof EmptyNormalizationPlan);
}
@Test
public void testSplitOfLargeRegion() throws HBaseIOException {
TableName testTable = TableName.valueOf("testSplitOfLargeRegion");
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
HRegionInfo hri1 = new HRegionInfo(testTable, Bytes.toBytes("aaa"), Bytes.toBytes("bbb"));
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 8);
HRegionInfo hri2 = new HRegionInfo(testTable, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"));
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 6);
HRegionInfo hri3 = new HRegionInfo(testTable, Bytes.toBytes("ccc"), Bytes.toBytes("ddd"));
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 10);
HRegionInfo hri4 = new HRegionInfo(testTable, Bytes.toBytes("ddd"), Bytes.toBytes("eee"));
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 30);
setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
assertTrue(plan instanceof SplitNormalizationPlan);
assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
}
protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<HRegionInfo> hris) {
masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
// for simplicity all regions are assumed to be on one server; doesn't matter to us
ServerName sn = ServerName.valueOf("localhost", -1, 1L);
when(masterServices.getAssignmentManager().getRegionStates().
getRegionsOfTable(any(TableName.class))).thenReturn(hris);
when(masterServices.getAssignmentManager().getRegionStates().
getRegionServerOfRegion(any(HRegionInfo.class))).thenReturn(sn);
for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
RegionLoad regionLoad = Mockito.mock(RegionLoad.class);
when(regionLoad.getName()).thenReturn(region.getKey());
when(regionLoad.getStorefileSizeMB()).thenReturn(region.getValue());
when(masterServices.getServerManager().getLoad(sn).
getRegionsLoad().get(region.getKey())).thenReturn(regionLoad);
}
normalizer.setMasterServices(masterServices);
}
}

View File

@ -0,0 +1,218 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* Testing {@link SimpleRegionNormalizer} on minicluster.
*/
@Category({MasterTests.class, MediumTests.class})
public class TestSimpleRegionNormalizerOnCluster {
private static final Log LOG = LogFactory.getLog(TestSimpleRegionNormalizerOnCluster.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
private static Admin admin;
@BeforeClass
public static void beforeAllTests() throws Exception {
// we will retry operations when PleaseHoldException is thrown
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_NORMALIZER_ENABLED, true);
// Start a cluster of two regionservers.
TEST_UTIL.startMiniCluster(1);
admin = TEST_UTIL.getHBaseAdmin();
}
@AfterClass
public static void afterAllTests() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test(timeout = 60000)
@SuppressWarnings("deprecation")
public void testRegionNormalizationSplitOnCluster() throws Exception {
final TableName TABLENAME =
TableName.valueOf("testRegionNormalizationSplitOnCluster");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster();
try (HTable ht = TEST_UTIL.createMultiRegionTable(TABLENAME, FAMILYNAME, 5)) {
// Need to get sorted list of regions here
List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME);
Collections.sort(generatedRegions, new Comparator<HRegion>() {
@Override
public int compare(HRegion o1, HRegion o2) {
return o1.getRegionInfo().compareTo(o2.getRegionInfo());
}
});
HRegion region = generatedRegions.get(0);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(1);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(2);
generateTestData(region, 2);
region.flush(true);
region = generatedRegions.get(3);
generateTestData(region, 2);
region.flush(true);
region = generatedRegions.get(4);
generateTestData(region, 5);
region.flush(true);
}
HTableDescriptor htd = admin.getTableDescriptor(TABLENAME);
htd.setNormalizationEnabled(true);
admin.modifyTable(TABLENAME, htd);
admin.flush(TABLENAME);
System.out.println(admin.getTableDescriptor(TABLENAME));
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
// Now trigger a split and stop when the split is in progress
Thread.sleep(5000); // to let region load to update
m.normalizeRegions();
while (MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME) < 6) {
LOG.info("Waiting for normalization split to complete");
Thread.sleep(100);
}
assertEquals(6, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
}
@Test(timeout = 60000)
@SuppressWarnings("deprecation")
public void testRegionNormalizationMergeOnCluster() throws Exception {
final TableName TABLENAME =
TableName.valueOf("testRegionNormalizationMergeOnCluster");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster();
// create 5 regions with sizes to trigger merge of small regions
try (HTable ht = TEST_UTIL.createMultiRegionTable(TABLENAME, FAMILYNAME, 5)) {
// Need to get sorted list of regions here
List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME);
Collections.sort(generatedRegions, new Comparator<HRegion>() {
@Override
public int compare(HRegion o1, HRegion o2) {
return o1.getRegionInfo().compareTo(o2.getRegionInfo());
}
});
HRegion region = generatedRegions.get(0);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(1);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(2);
generateTestData(region, 3);
region.flush(true);
region = generatedRegions.get(3);
generateTestData(region, 3);
region.flush(true);
region = generatedRegions.get(4);
generateTestData(region, 5);
region.flush(true);
}
HTableDescriptor htd = admin.getTableDescriptor(TABLENAME);
htd.setNormalizationEnabled(true);
admin.modifyTable(TABLENAME, htd);
admin.flush(TABLENAME);
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
// Now trigger a merge and stop when the merge is in progress
Thread.sleep(5000); // to let region load to update
m.normalizeRegions();
while (MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME) > 4) {
LOG.info("Waiting for normalization merge to complete");
Thread.sleep(100);
}
assertEquals(4, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
}
private void generateTestData(Region region, int numRows) throws IOException {
// generating 1Mb values
LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(1024 * 1024, 1024 * 1024);
for (int i = 0; i < numRows; ++i) {
byte[] key = Bytes.add(region.getRegionInfo().getStartKey(), Bytes.toBytes(i));
for (int j = 0; j < 1; ++j) {
Put put = new Put(key);
byte[] col = Bytes.toBytes(String.valueOf(j));
byte[] value = dataGenerator.generateRandomSizeValue(key, col);
put.add(FAMILYNAME, col, value);
region.put(put);
}
}
}
}