HBASE-22774 [WAL] RegionGroupingStrategy loses its function after split

Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
Reid Chan 2019-08-14 10:27:55 +08:00 committed by GitHub
parent 9821fd8ffb
commit f887207322
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 306 additions and 18 deletions

View File

@ -7180,9 +7180,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Move the files from the temporary .splits to the final /table/region directory
fs.commitDaughterRegion(hri);
// rsServices can be null in UT
WAL daughterWAL = rsServices == null ? getWAL() :rsServices.getWAL(hri);
// Create the daughter HRegion instance
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
this.getBaseConf(), hri, this.getTableDesc(), rsServices);
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), daughterWAL,
fs.getFileSystem(), this.getBaseConf(), hri, this.getTableDesc(), rsServices);
r.readRequestsCount.set(this.getReadRequestsCount() / 2);
r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
return r;

View File

@ -72,7 +72,7 @@ public class BoundedGroupingStrategy implements RegionGroupingStrategy{
int regionGroupNumber = config.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
groupNames = new String[regionGroupNumber];
for (int i = 0; i < regionGroupNumber; i++) {
groupNames[i] = providerId + GROUP_NAME_DELIMITER + "regiongroup-" + i;
groupNames[i] = "regiongroup-" + i;
}
}

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy
*/
@InterfaceAudience.Private
public class NamespaceGroupingStrategy implements RegionGroupingStrategy {
private String providerId;
@Override
public String group(byte[] identifier, byte[] namespace) {
@ -41,12 +40,10 @@ public class NamespaceGroupingStrategy implements RegionGroupingStrategy {
} else {
namespaceString = Bytes.toString(namespace);
}
return providerId + GROUP_NAME_DELIMITER + namespaceString;
return namespaceString;
}
@Override
public void init(Configuration config, String providerId) {
this.providerId = providerId;
}
public void init(Configuration config, String providerId) {}
}

View File

@ -62,7 +62,6 @@ public class RegionGroupingProvider implements WALProvider {
* Map identifiers to a group number.
*/
public static interface RegionGroupingStrategy {
String GROUP_NAME_DELIMITER = ".";
/**
* Given an identifier and a namespace, pick a group.
@ -252,7 +251,7 @@ public class RegionGroupingProvider implements WALProvider {
public void init(Configuration config, String providerId) {}
@Override
public String group(final byte[] identifier, final byte[] namespace) {
return Bytes.toString(identifier);
return "identity-" + Bytes.toString(identifier);
}
}

View File

@ -25,11 +25,36 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -49,7 +74,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.PairOfSameType;
@ -66,7 +90,8 @@ import com.google.common.collect.ImmutableList;
* Test the {@link SplitTransactionImpl} class against an HRegion (as opposed to
* running cluster).
*/
@Category(SmallTests.class)
@RunWith(Parameterized.class)
@Category(MediumTests.class)
public class TestSplitTransaction {
private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final Path testdir =
@ -82,16 +107,37 @@ public class TestSplitTransaction {
private static boolean preRollBackCalled = false;
private static boolean postRollBackCalled = false;
private String walProvider;
private String strategy;
@Parameterized.Parameters
public static final Collection<Object[]> parameters() throws IOException {
List<Object[]> params = new ArrayList<>(4);
params.add(new Object[] { "filesystem", "" });
params.add(new Object[] { "multiwal", "identity" });
params.add(new Object[] { "multiwal", "bounded" });
params.add(new Object[] { "multiwal", "namespace" });
return params;
}
public TestSplitTransaction(String walProvider, String strategy) {
this.walProvider = walProvider;
this.strategy = strategy;
}
@Before
public void setup() throws IOException {
this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName());
this.fs.delete(this.testdir, true);
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
if (!strategy.isEmpty()) {
TEST_UTIL.getConfiguration().set(RegionGroupingProvider.REGION_GROUPING_STRATEGY, strategy);
}
final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(walConf, this.testdir);
this.wals = new WALFactory(walConf, null, this.getClass().getName());
this.parent = createRegion(this.testdir, this.wals);
RegionCoprocessorHost host = new RegionCoprocessorHost(this.parent, null, TEST_UTIL.getConfiguration());
this.parent.setCoprocessorHost(host);
@ -277,7 +323,7 @@ public class TestSplitTransaction {
assertTrue(count > 0 && count != rowcount);
daughtersRowCount += count;
} finally {
HRegion.closeHRegion((HRegion)openRegion);
((HRegion) openRegion).close();
}
}
assertEquals(rowcount, daughtersRowCount);
@ -363,7 +409,7 @@ public class TestSplitTransaction {
assertTrue(count > 0 && count != rowcount);
daughtersRowCount += count;
} finally {
HRegion.closeHRegion((HRegion)openRegion);
((HRegion) openRegion).close();
}
}
assertEquals(rowcount, daughtersRowCount);
@ -407,11 +453,16 @@ public class TestSplitTransaction {
HColumnDescriptor hcd = new HColumnDescriptor(CF);
htd.addFamily(hcd);
HRegionInfo hri = new HRegionInfo(htd.getTableName(), STARTROW, ENDROW);
HRegion r = HRegion.createHRegion(hri, testdir, TEST_UTIL.getConfiguration(), htd);
Configuration conf = TEST_UTIL.getConfiguration();
HRegion r = HRegion.createHRegion(hri, testdir, conf, htd);
HRegion.closeHRegion(r);
ServerName sn = ServerName.valueOf("testSplitTransaction", 100, 42);
final RegionServerServices rss = TEST_UTIL.createMockRegionServerService(sn);
MockRegionServerServicesWithWALs rsw =
new MockRegionServerServicesWithWALs(rss, wals.getWALProvider());
return HRegion.openHRegion(testdir, hri, htd,
wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()),
TEST_UTIL.getConfiguration());
conf, rsw, null);
}
public static class CustomObserver extends BaseRegionObserver{
@ -428,4 +479,243 @@ public class TestSplitTransaction {
}
}
static class MockRegionServerServicesWithWALs implements RegionServerServices {
WALProvider provider;
RegionServerServices rss;
MockRegionServerServicesWithWALs(RegionServerServices rss, WALProvider provider) {
this.rss = rss;
this.provider = provider;
}
@Override
public boolean isStopping() {
return rss.isStopping();
}
@Override
public WAL getWAL(HRegionInfo hri) throws IOException {
return provider.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
}
@Override
public CompactionRequestor getCompactionRequester() {
return rss.getCompactionRequester();
}
@Override
public FlushRequester getFlushRequester() {
return rss.getFlushRequester();
}
@Override
public RegionServerAccounting getRegionServerAccounting() {
return rss.getRegionServerAccounting();
}
@Override
public TableLockManager getTableLockManager() {
return rss.getTableLockManager();
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
return rss.getRegionServerQuotaManager();
}
@Override
public void postOpenDeployTasks(PostOpenDeployContext context)
throws KeeperException, IOException {
rss.postOpenDeployTasks(context);
}
@Override
public void postOpenDeployTasks(Region r) throws KeeperException, IOException {
rss.postOpenDeployTasks(r);
}
@Override
public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
return rss.reportRegionStateTransition(context);
}
@Override
public boolean reportRegionStateTransition(
RegionServerStatusProtos.RegionStateTransition.TransitionCode code, long openSeqNum,
HRegionInfo... hris) {
return rss.reportRegionStateTransition(code, openSeqNum, hris);
}
@Override
public boolean reportRegionStateTransition(
RegionServerStatusProtos.RegionStateTransition.TransitionCode code, HRegionInfo... hris) {
return rss.reportRegionStateTransition(code, hris);
}
@Override
public RpcServerInterface getRpcServer() {
return rss.getRpcServer();
}
@Override
public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
return rss.getRegionsInTransitionInRS();
}
@Override
public FileSystem getFileSystem() {
return rss.getFileSystem();
}
@Override
public Leases getLeases() {
return rss.getLeases();
}
@Override
public ExecutorService getExecutorService() {
return rss.getExecutorService();
}
@Override
public Map<String, Region> getRecoveringRegions() {
return rss.getRecoveringRegions();
}
@Override
public ServerNonceManager getNonceManager() {
return rss.getNonceManager();
}
@Override
public boolean registerService(Service service) {
return rss.registerService(service);
}
@Override
public HeapMemoryManager getHeapMemoryManager() {
return rss.getHeapMemoryManager();
}
@Override
public double getCompactionPressure() {
return rss.getCompactionPressure();
}
@Override
public Set<TableName> getOnlineTables() {
return rss.getOnlineTables();
}
@Override
public ThroughputController getFlushThroughputController() {
return rss.getFlushThroughputController();
}
@Override
public double getFlushPressure() {
return rss.getFlushPressure();
}
@Override
public MetricsRegionServer getMetrics() {
return rss.getMetrics();
}
@Override
public void unassign(byte[] regionName) throws IOException {
rss.unassign(regionName);
}
@Override
public void addToOnlineRegions(Region r) {
rss.addToOnlineRegions(r);
}
@Override
public boolean removeFromOnlineRegions(Region r, ServerName destination) {
return rss.removeFromOnlineRegions(r, destination);
}
@Override
public Region getFromOnlineRegions(String encodedRegionName) {
return rss.getFromOnlineRegions(encodedRegionName);
}
@Override
public List<Region> getOnlineRegions(TableName tableName) throws IOException {
return rss.getOnlineRegions(tableName);
}
@Override
public List<Region> getOnlineRegions() {
return rss.getOnlineRegions();
}
@Override
public Configuration getConfiguration() {
return rss.getConfiguration();
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return rss.getZooKeeper();
}
@Override
public ClusterConnection getConnection() {
return rss.getConnection();
}
@Override
public MetaTableLocator getMetaTableLocator() {
return rss.getMetaTableLocator();
}
@Override
public ServerName getServerName() {
return rss.getServerName();
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return rss.getCoordinatedStateManager();
}
@Override
public ChoreService getChoreService() {
return rss.getChoreService();
}
@Override
public void abort(String why, Throwable e) {
rss.abort(why, e);
}
@Override
public boolean isAborted() {
return rss.isAborted();
}
@Override
public void stop(String why) {
rss.stop(why);
}
@Override
public boolean isStopped() {
return rss.isStopped();
}
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<HBaseProtos.ServerName> favoredNodes) {
rss.updateRegionFavoredNodesMapping(encodedRegionName, favoredNodes);
}
@Override
public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
return rss.getFavoredNodesForRegion(encodedRegionName);
}
}
}