HBASE-18490 Modifying a table descriptor to enable replicas does not

create replica regions (Ram)
This commit is contained in:
Ramkrishna 2017-10-03 13:10:44 +05:30
parent 2348b7fe45
commit 3318e8724e
6 changed files with 371 additions and 32 deletions

View File

@ -274,7 +274,7 @@ public class MetaTableAccessor {
* @return An {@link Table} for <code>hbase:meta</code>
* @throws IOException
*/
static Table getMetaHTable(final Connection connection)
public static Table getMetaHTable(final Connection connection)
throws IOException {
// We used to pass whole CatalogTracker in here, now we just pass in Connection
if (connection == null) {

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@ -157,4 +159,32 @@ public class RegionReplicaUtil {
}
return 0;
}
/**
* Create any replicas for the regions (the default replicas that was already created is passed to
* the method)
* @param tableDescriptor descriptor to use
* @param regions existing regions
* @param oldReplicaCount existing replica count
* @param newReplicaCount updated replica count due to modify table
* @return the combined list of default and non-default replicas
*/
public static List<RegionInfo> addReplicas(final TableDescriptor tableDescriptor,
final List<RegionInfo> regions, int oldReplicaCount, int newReplicaCount) {
if ((newReplicaCount - 1) <= 0) {
return regions;
}
List<RegionInfo> hRegionInfos = new ArrayList<>((newReplicaCount) * regions.size());
for (int i = 0; i < regions.size(); i++) {
if (RegionReplicaUtil.isDefaultReplica(regions.get(i))) {
// region level replica index starts from 0. So if oldReplicaCount was 2 then the max replicaId for
// the existing regions would be 1
for (int j = oldReplicaCount; j < newReplicaCount; j++) {
hRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), j));
}
}
}
hRegionInfos.addAll(regions);
return hRegionInfos;
}
}

View File

@ -340,7 +340,9 @@ public class CreateTableProcedure
ProcedureSyncWait.waitMetaRegions(env);
// Add replicas if needed
List<RegionInfo> newRegions = addReplicas(env, tableDescriptor, regions);
// we need to create regions with replicaIds starting from 1
List<RegionInfo> newRegions = RegionReplicaUtil.addReplicas(tableDescriptor, regions, 1,
tableDescriptor.getRegionReplication());
// Add regions to META
addRegionsToMeta(env, tableDescriptor, newRegions);
@ -352,31 +354,6 @@ public class CreateTableProcedure
return newRegions;
}
/**
* Create any replicas for the regions (the default replicas that was
* already created is passed to the method)
* @param tableDescriptor descriptor to use
* @param regions default replicas
* @return the combined list of default and non-default replicas
*/
private static List<RegionInfo> addReplicas(final MasterProcedureEnv env,
final TableDescriptor tableDescriptor,
final List<RegionInfo> regions) {
int numRegionReplicas = tableDescriptor.getRegionReplication() - 1;
if (numRegionReplicas <= 0) {
return regions;
}
List<RegionInfo> hRegionInfos = new ArrayList<>((numRegionReplicas+1)*regions.size());
for (int i = 0; i < regions.size(); i++) {
for (int j = 1; j <= numRegionReplicas; j++) {
hRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), j));
}
}
hRegionInfos.addAll(regions);
return hRegionInfos;
}
protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName)
throws IOException {
// Mark the table as Enabling

View File

@ -19,13 +19,25 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.TableStateManager;
@ -72,6 +84,7 @@ public class EnableTableProcedure
this.skipTableStateCheck = skipTableStateCheck;
}
@SuppressWarnings("deprecation")
@Override
protected Flow executeFromState(final MasterProcedureEnv env, final EnableTableState state)
throws InterruptedException {
@ -98,7 +111,72 @@ public class EnableTableProcedure
setNextState(EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE);
break;
case ENABLE_TABLE_MARK_REGIONS_ONLINE:
addChildProcedure(env.getAssignmentManager().createAssignProcedures(tableName));
Connection connection = env.getMasterServices().getConnection();
// we will need to get the tableDescriptor here to see if there is a change in the replica
// count
TableDescriptor hTableDescriptor =
env.getMasterServices().getTableDescriptors().get(tableName);
// Get the replica count
int regionReplicaCount = hTableDescriptor.getRegionReplication();
// Get the regions for the table from the memory
List<RegionInfo> regionsOfTable =
env.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
if (regionReplicaCount > 1) {
int currentMaxReplica = 0;
// Check if the regions in memory have replica regions as marked in META table
for (RegionInfo regionInfo : regionsOfTable) {
if (regionInfo.getReplicaId() > currentMaxReplica) {
// Iterating through all the list to identify the highest replicaID region.
// We can stop after checking with the first set of regions??
currentMaxReplica = regionInfo.getReplicaId();
}
}
// read the META table to know the actual number of replicas for the table - if there
// was a table modification on region replica then this will reflect the new entries also
int replicasFound =
getNumberOfReplicasFromMeta(connection, regionReplicaCount, regionsOfTable);
assert regionReplicaCount - 1 == replicasFound;
LOG.info(replicasFound + " META entries added for the given regionReplicaCount "
+ regionReplicaCount + " for the table " + tableName.getNameAsString());
if (currentMaxReplica == (regionReplicaCount - 1)) {
if (LOG.isDebugEnabled()) {
LOG.debug("There is no change to the number of region replicas."
+ " Assigning the available regions." + " Current and previous"
+ "replica count is " + regionReplicaCount);
}
} else if (currentMaxReplica > (regionReplicaCount - 1)) {
// we have additional regions as the replica count has been decreased. Delete
// those regions because already the table is in the unassigned state
LOG.info("The number of replicas " + (currentMaxReplica + 1)
+ " is more than the region replica count " + regionReplicaCount);
List<RegionInfo> copyOfRegions = new ArrayList<RegionInfo>(regionsOfTable);
for (RegionInfo regionInfo : copyOfRegions) {
if (regionInfo.getReplicaId() > (regionReplicaCount - 1)) {
// delete the region from the regionStates
env.getAssignmentManager().getRegionStates().deleteRegion(regionInfo);
// remove it from the list of regions of the table
LOG.info("The regioninfo being removed is " + regionInfo + " "
+ regionInfo.getReplicaId());
regionsOfTable.remove(regionInfo);
}
}
} else {
// the replicasFound is less than the regionReplication
LOG.info(
"The number of replicas has been changed(increased)."
+ " Lets assign the new region replicas. The previous replica count was "
+ (currentMaxReplica + 1) + ". The current replica count is "
+ regionReplicaCount);
regionsOfTable = RegionReplicaUtil.addReplicas(hTableDescriptor, regionsOfTable,
currentMaxReplica + 1, regionReplicaCount);
}
}
// Assign all the table regions. (including region replicas if added)
addChildProcedure(env.getAssignmentManager().createAssignProcedures(regionsOfTable));
setNextState(EnableTableState.ENABLE_TABLE_SET_ENABLED_TABLE_STATE);
break;
case ENABLE_TABLE_SET_ENABLED_TABLE_STATE:
@ -122,6 +200,31 @@ public class EnableTableProcedure
return Flow.HAS_MORE_STATE;
}
private int getNumberOfReplicasFromMeta(Connection connection, int regionReplicaCount,
List<RegionInfo> regionsOfTable) throws IOException {
Result r = getRegionFromMeta(connection, regionsOfTable);
int replicasFound = 0;
for (int i = 1; i < regionReplicaCount; i++) {
// Since we have already added the entries to the META we will be getting only that here
List<Cell> columnCells =
r.getColumnCells(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(i));
if (!columnCells.isEmpty()) {
replicasFound++;
}
}
return replicasFound;
}
private Result getRegionFromMeta(Connection connection, List<RegionInfo> regionsOfTable)
throws IOException {
byte[] metaKeyForRegion = MetaTableAccessor.getMetaKeyForRegion(regionsOfTable.get(0));
Get get = new Get(metaKeyForRegion);
get.addFamily(HConstants.CATALOG_FAMILY);
Table metaTable = MetaTableAccessor.getMetaHTable(connection);
Result r = metaTable.get(get);
return r;
}
@Override
protected void rollbackState(final MasterProcedureEnv env, final EnableTableState state)
throws IOException {

View File

@ -359,13 +359,27 @@ public class ModifyTableProcedure
connection);
}
}
// Setup replication for region replicas if needed
if (newReplicaCount > 1 && oldReplicaCount <= 1) {
ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
if (newReplicaCount > oldReplicaCount) {
Connection connection = env.getMasterServices().getConnection();
// Get the existing table regions
List<RegionInfo> existingTableRegions =
MetaTableAccessor.getTableRegions(connection, getTableName());
// add all the new entries to the meta table
addRegionsToMeta(env, newTableDescriptor, existingTableRegions);
if (oldReplicaCount <= 1) {
// The table has been newly enabled for replica. So check if we need to setup
// region replication
ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
}
}
}
private static void addRegionsToMeta(final MasterProcedureEnv env,
final TableDescriptor tableDescriptor, final List<RegionInfo> regionInfos)
throws IOException {
MetaTableAccessor.addRegionsToMeta(env.getMasterServices().getConnection(), regionInfos,
tableDescriptor.getRegionReplication());
}
/**
* Action after modifying table.
* @param env MasterProcedureEnv

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestRegionReplicasWithModifyTable {
private static final Log LOG = LogFactory.getLog(TestRegionReplicasWithModifyTable.class);
private static final int NB_SERVERS = 3;
private static Table table;
private static final byte[] row = "TestRegionReplicasWithModifyTable".getBytes();
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void before() throws Exception {
HTU.startMiniCluster(NB_SERVERS);
}
private static void enableReplicationByModification(final TableName tableName,
boolean withReplica, int initialReplicaCount, int enableReplicaCount, int splitCount)
throws IOException, InterruptedException {
HTableDescriptor htd = new HTableDescriptor(tableName);
if (withReplica) {
htd.setRegionReplication(initialReplicaCount);
}
if (splitCount > 0) {
byte[][] splits = getSplits(splitCount);
table = HTU.createTable(htd, new byte[][] { f }, splits,
new Configuration(HTU.getConfiguration()));
} else {
table = HTU.createTable(htd, new byte[][] { f }, (byte[][]) null,
new Configuration(HTU.getConfiguration()));
}
HBaseTestingUtility.setReplicas(HTU.getAdmin(), table.getName(), enableReplicaCount);
}
private static byte[][] getSplits(int numRegions) {
RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
split.setFirstRow(Bytes.toBytes(0L));
split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
return split.split(numRegions);
}
@AfterClass
public static void afterClass() throws Exception {
HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
table.close();
HTU.shutdownMiniCluster();
}
private HRegionServer getRS() {
return HTU.getMiniHBaseCluster().getRegionServer(0);
}
private HRegionServer getSecondaryRS() {
return HTU.getMiniHBaseCluster().getRegionServer(1);
}
private HRegionServer getTertiaryRS() {
return HTU.getMiniHBaseCluster().getRegionServer(2);
}
@Test(timeout = 60000)
public void testRegionReplicasUsingEnableTable() throws Exception {
TableName tableName = null;
try {
tableName = TableName.valueOf(name.getMethodName());
enableReplicationByModification(tableName, false, 0, 3, 0);
List<Region> onlineRegions = getRS().getRegions(tableName);
List<Region> onlineRegions2 = getSecondaryRS().getRegions(tableName);
List<Region> onlineRegions3 = getTertiaryRS().getRegions(tableName);
int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
assertEquals("the number of regions should be more than 1", totalRegions, 3);
} finally {
disableAndDeleteTable(tableName);
}
}
private void disableAndDeleteTable(TableName tableName) throws IOException {
HTU.getAdmin().disableTable(tableName);
HTU.getAdmin().deleteTable(tableName);
}
@Test(timeout = 60000)
public void testRegionReplicasUsingEnableTableForMultipleRegions() throws Exception {
TableName tableName = null;
try {
tableName = TableName.valueOf(name.getMethodName());
enableReplicationByModification(tableName, false, 0, 3, 10);
List<Region> onlineRegions = getRS().getRegions(tableName);
List<Region> onlineRegions2 = getSecondaryRS().getRegions(tableName);
List<Region> onlineRegions3 = getTertiaryRS().getRegions(tableName);
int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
assertEquals("the number of regions should be equal to 30", totalRegions, 30);
} finally {
disableAndDeleteTable(tableName);
}
}
@Test(timeout = 60000)
public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreased() throws Exception {
TableName tableName = null;
try {
tableName = TableName.valueOf(name.getMethodName());
enableReplicationByModification(tableName, true, 2, 3, 0);
List<Region> onlineRegions = getRS().getRegions(tableName);
List<Region> onlineRegions2 = getSecondaryRS().getRegions(tableName);
List<Region> onlineRegions3 = getTertiaryRS().getRegions(tableName);
int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
assertEquals("the number of regions should be 3", totalRegions, 3);
} finally {
disableAndDeleteTable(tableName);
}
}
@Test(timeout = 60000)
public void testRegionReplicasByEnableTableWhenReplicaCountIsDecreased() throws Exception {
TableName tableName = null;
try {
tableName = TableName.valueOf(name.getMethodName());
enableReplicationByModification(tableName, true, 3, 2, 0);
List<Region> onlineRegions = getRS().getRegions(tableName);
List<Region> onlineRegions2 = getSecondaryRS().getRegions(tableName);
List<Region> onlineRegions3 = getTertiaryRS().getRegions(tableName);
int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
assertEquals("the number of regions should be reduced to 2", totalRegions, 2);
} finally {
disableAndDeleteTable(tableName);
}
}
@Test(timeout = 60000)
public void testRegionReplicasByEnableTableWhenReplicaCountIsDecreasedWithMultipleRegions()
throws Exception {
TableName tableName = null;
try {
tableName = TableName.valueOf(name.getMethodName());
enableReplicationByModification(tableName, true, 3, 2, 20);
List<Region> onlineRegions = getRS().getRegions(tableName);
List<Region> onlineRegions2 = getSecondaryRS().getRegions(tableName);
List<Region> onlineRegions3 = getTertiaryRS().getRegions(tableName);
int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
assertEquals("the number of regions should be reduced to 40", totalRegions, 40);
} finally {
disableAndDeleteTable(tableName);
}
}
@Test(timeout = 60000)
public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreasedWithmultipleRegions()
throws Exception {
TableName tableName = null;
try {
tableName = TableName.valueOf(name.getMethodName());
enableReplicationByModification(tableName, true, 2, 3, 15);
List<Region> onlineRegions = getRS().getRegions(tableName);
List<Region> onlineRegions2 = getSecondaryRS().getRegions(tableName);
List<Region> onlineRegions3 = getTertiaryRS().getRegions(tableName);
int totalRegions = onlineRegions.size() + onlineRegions2.size() + onlineRegions3.size();
assertEquals("the number of regions should be equal to 45", totalRegions, 3 * 15);
} finally {
disableAndDeleteTable(tableName);
}
}
}