HBASE-22861 [WAL] Merged region should get its WAL according to WALProvider

This commit is contained in:
Reid Chan 2019-08-22 11:15:21 +08:00 committed by GitHub
parent 24a821c38d
commit 105008e748
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 337 additions and 264 deletions

View File

@ -7198,7 +7198,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
final HRegion region_b) throws IOException {
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
WAL mergedRegionWAL = rsServices == null ? getWAL() : rsServices.getWAL(mergedRegionInfo);
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), mergedRegionWAL,
fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
this.getTableDesc(), this.rsServices);
r.readRequestsCount.set(this.getReadRequestsCount()

View File

@ -0,0 +1,296 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import com.google.protobuf.Service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* Mock region server services with WALProvider, it can be used for testing wal related tests,
* like split or merge regions.
*/
public class MockRegionServerServicesWithWALs implements RegionServerServices {
WALProvider provider;
RegionServerServices rss;
public MockRegionServerServicesWithWALs(RegionServerServices rss, WALProvider provider) {
this.rss = rss;
this.provider = provider;
}
@Override
public boolean isStopping() {
return rss.isStopping();
}
@Override
public WAL getWAL(HRegionInfo hri) throws IOException {
return provider.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
}
@Override
public CompactionRequestor getCompactionRequester() {
return rss.getCompactionRequester();
}
@Override
public FlushRequester getFlushRequester() {
return rss.getFlushRequester();
}
@Override
public RegionServerAccounting getRegionServerAccounting() {
return rss.getRegionServerAccounting();
}
@Override
public TableLockManager getTableLockManager() {
return rss.getTableLockManager();
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
return rss.getRegionServerQuotaManager();
}
@Override
public void postOpenDeployTasks(PostOpenDeployContext context)
throws KeeperException, IOException {
rss.postOpenDeployTasks(context);
}
@Override
public void postOpenDeployTasks(Region r) throws KeeperException, IOException {
rss.postOpenDeployTasks(r);
}
@Override
public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
return rss.reportRegionStateTransition(context);
}
@Override
public boolean reportRegionStateTransition(
RegionServerStatusProtos.RegionStateTransition.TransitionCode code, long openSeqNum,
HRegionInfo... hris) {
return rss.reportRegionStateTransition(code, openSeqNum, hris);
}
@Override
public boolean reportRegionStateTransition(
RegionServerStatusProtos.RegionStateTransition.TransitionCode code, HRegionInfo... hris) {
return rss.reportRegionStateTransition(code, hris);
}
@Override
public RpcServerInterface getRpcServer() {
return rss.getRpcServer();
}
@Override
public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
return rss.getRegionsInTransitionInRS();
}
@Override
public FileSystem getFileSystem() {
return rss.getFileSystem();
}
@Override
public Leases getLeases() {
return rss.getLeases();
}
@Override
public ExecutorService getExecutorService() {
return rss.getExecutorService();
}
@Override
public Map<String, Region> getRecoveringRegions() {
return rss.getRecoveringRegions();
}
@Override
public ServerNonceManager getNonceManager() {
return rss.getNonceManager();
}
@Override
public boolean registerService(Service service) {
return rss.registerService(service);
}
@Override
public HeapMemoryManager getHeapMemoryManager() {
return rss.getHeapMemoryManager();
}
@Override
public double getCompactionPressure() {
return rss.getCompactionPressure();
}
@Override
public Set<TableName> getOnlineTables() {
return rss.getOnlineTables();
}
@Override
public ThroughputController getFlushThroughputController() {
return rss.getFlushThroughputController();
}
@Override
public double getFlushPressure() {
return rss.getFlushPressure();
}
@Override
public MetricsRegionServer getMetrics() {
return rss.getMetrics();
}
@Override
public void unassign(byte[] regionName) throws IOException {
rss.unassign(regionName);
}
@Override
public void addToOnlineRegions(Region r) {
rss.addToOnlineRegions(r);
}
@Override
public boolean removeFromOnlineRegions(Region r, ServerName destination) {
return rss.removeFromOnlineRegions(r, destination);
}
@Override
public Region getFromOnlineRegions(String encodedRegionName) {
return rss.getFromOnlineRegions(encodedRegionName);
}
@Override
public List<Region> getOnlineRegions(TableName tableName) throws IOException {
return rss.getOnlineRegions(tableName);
}
@Override
public List<Region> getOnlineRegions() {
return rss.getOnlineRegions();
}
@Override
public Configuration getConfiguration() {
return rss.getConfiguration();
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return rss.getZooKeeper();
}
@Override
public ClusterConnection getConnection() {
return rss.getConnection();
}
@Override
public MetaTableLocator getMetaTableLocator() {
return rss.getMetaTableLocator();
}
@Override
public ServerName getServerName() {
return rss.getServerName();
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return rss.getCoordinatedStateManager();
}
@Override
public ChoreService getChoreService() {
return rss.getChoreService();
}
@Override
public void abort(String why, Throwable e) {
rss.abort(why, e);
}
@Override
public boolean isAborted() {
return rss.isAborted();
}
@Override
public void stop(String why) {
rss.stop(why);
}
@Override
public boolean isStopped() {
return rss.isStopped();
}
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<HBaseProtos.ServerName> favoredNodes) {
rss.updateRegionFavoredNodesMapping(encodedRegionName, favoredNodes);
}
@Override
public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
return rss.getFavoredNodesForRegion(encodedRegionName);
}
}

View File

@ -25,6 +25,7 @@ import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -38,7 +39,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MockRegionServerServicesWithWALs;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
@ -46,12 +49,15 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableList;
@ -60,6 +66,7 @@ import com.google.common.collect.ImmutableList;
* Test the {@link RegionMergeTransactionImpl} class against two HRegions (as
* opposed to running cluster).
*/
@RunWith(Parameterized.class)
@Category(SmallTests.class)
public class TestRegionMergeTransaction {
private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -77,13 +84,40 @@ public class TestRegionMergeTransaction {
private static final byte[] ENDROW = new byte[] { '{', '{', '{' };
private static final byte[] CF = HConstants.CATALOG_FAMILY;
private MockRegionServerServicesWithWALs rsw;
private String walProvider;
private String strategy;
@Parameterized.Parameters
public static final Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>(4);
params.add(new Object[] { "filesystem", "" });
params.add(new Object[] { "multiwal", "identity" });
params.add(new Object[] { "multiwal", "bounded" });
params.add(new Object[] { "multiwal", "namespace" });
return params;
}
public TestRegionMergeTransaction(String walProvider, String strategy) {
this.walProvider = walProvider;
this.strategy = strategy;
}
@Before
public void setup() throws IOException {
this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
this.fs.delete(this.testdir, true);
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
if (!strategy.isEmpty()) {
TEST_UTIL.getConfiguration().set(RegionGroupingProvider.REGION_GROUPING_STRATEGY, strategy);
}
final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(walConf, this.testdir);
this.wals = new WALFactory(walConf, null, TestRegionMergeTransaction.class.getName());
ServerName sn = ServerName.valueOf("testRegionMergeTransaction", 10, 66);
final RegionServerServices rss = TEST_UTIL.createMockRegionServerService(sn);
this.rsw = new MockRegionServerServicesWithWALs(rss, wals.getWALProvider());
this.region_a = createRegion(this.testdir, this.wals, STARTROW_A, STARTROW_B);
this.region_b = createRegion(this.testdir, this.wals, STARTROW_B, STARTROW_C);
this.region_c = createRegion(this.testdir, this.wals, STARTROW_C, ENDROW);
@ -275,7 +309,7 @@ public class TestRegionMergeTransaction {
assertEquals((rowCountOfRegionA + rowCountOfRegionB),
mergedRegionRowCount);
} finally {
HRegion.closeHRegion(mergedRegion);
mergedRegion.close();
}
// Assert the write lock is no longer held on region_a and region_b
assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
@ -335,7 +369,7 @@ public class TestRegionMergeTransaction {
assertEquals((rowCountOfRegionA + rowCountOfRegionB),
mergedRegionRowCount);
} finally {
HRegion.closeHRegion(mergedRegion);
mergedRegion.close();
}
// Assert the write lock is no longer held on region_a and region_b
assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
@ -444,7 +478,7 @@ public class TestRegionMergeTransaction {
HRegion.closeHRegion(a);
return HRegion.openHRegion(testdir, hri, htd,
wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()),
TEST_UTIL.getConfiguration());
TEST_UTIL.getConfiguration(), rsw, null);
}
private int countRows(final HRegion r) throws IOException {

View File

@ -25,36 +25,18 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.MockRegionServerServicesWithWALs;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -112,7 +94,7 @@ public class TestSplitTransaction {
private String strategy;
@Parameterized.Parameters
public static final Collection<Object[]> parameters() throws IOException {
public static final Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>(4);
params.add(new Object[] { "filesystem", "" });
params.add(new Object[] { "multiwal", "identity" });
@ -478,244 +460,4 @@ public class TestSplitTransaction {
postRollBackCalled = true;
}
}
static class MockRegionServerServicesWithWALs implements RegionServerServices {
WALProvider provider;
RegionServerServices rss;
MockRegionServerServicesWithWALs(RegionServerServices rss, WALProvider provider) {
this.rss = rss;
this.provider = provider;
}
@Override
public boolean isStopping() {
return rss.isStopping();
}
@Override
public WAL getWAL(HRegionInfo hri) throws IOException {
return provider.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
}
@Override
public CompactionRequestor getCompactionRequester() {
return rss.getCompactionRequester();
}
@Override
public FlushRequester getFlushRequester() {
return rss.getFlushRequester();
}
@Override
public RegionServerAccounting getRegionServerAccounting() {
return rss.getRegionServerAccounting();
}
@Override
public TableLockManager getTableLockManager() {
return rss.getTableLockManager();
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
return rss.getRegionServerQuotaManager();
}
@Override
public void postOpenDeployTasks(PostOpenDeployContext context)
throws KeeperException, IOException {
rss.postOpenDeployTasks(context);
}
@Override
public void postOpenDeployTasks(Region r) throws KeeperException, IOException {
rss.postOpenDeployTasks(r);
}
@Override
public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
return rss.reportRegionStateTransition(context);
}
@Override
public boolean reportRegionStateTransition(
RegionServerStatusProtos.RegionStateTransition.TransitionCode code, long openSeqNum,
HRegionInfo... hris) {
return rss.reportRegionStateTransition(code, openSeqNum, hris);
}
@Override
public boolean reportRegionStateTransition(
RegionServerStatusProtos.RegionStateTransition.TransitionCode code, HRegionInfo... hris) {
return rss.reportRegionStateTransition(code, hris);
}
@Override
public RpcServerInterface getRpcServer() {
return rss.getRpcServer();
}
@Override
public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
return rss.getRegionsInTransitionInRS();
}
@Override
public FileSystem getFileSystem() {
return rss.getFileSystem();
}
@Override
public Leases getLeases() {
return rss.getLeases();
}
@Override
public ExecutorService getExecutorService() {
return rss.getExecutorService();
}
@Override
public Map<String, Region> getRecoveringRegions() {
return rss.getRecoveringRegions();
}
@Override
public ServerNonceManager getNonceManager() {
return rss.getNonceManager();
}
@Override
public boolean registerService(Service service) {
return rss.registerService(service);
}
@Override
public HeapMemoryManager getHeapMemoryManager() {
return rss.getHeapMemoryManager();
}
@Override
public double getCompactionPressure() {
return rss.getCompactionPressure();
}
@Override
public Set<TableName> getOnlineTables() {
return rss.getOnlineTables();
}
@Override
public ThroughputController getFlushThroughputController() {
return rss.getFlushThroughputController();
}
@Override
public double getFlushPressure() {
return rss.getFlushPressure();
}
@Override
public MetricsRegionServer getMetrics() {
return rss.getMetrics();
}
@Override
public void unassign(byte[] regionName) throws IOException {
rss.unassign(regionName);
}
@Override
public void addToOnlineRegions(Region r) {
rss.addToOnlineRegions(r);
}
@Override
public boolean removeFromOnlineRegions(Region r, ServerName destination) {
return rss.removeFromOnlineRegions(r, destination);
}
@Override
public Region getFromOnlineRegions(String encodedRegionName) {
return rss.getFromOnlineRegions(encodedRegionName);
}
@Override
public List<Region> getOnlineRegions(TableName tableName) throws IOException {
return rss.getOnlineRegions(tableName);
}
@Override
public List<Region> getOnlineRegions() {
return rss.getOnlineRegions();
}
@Override
public Configuration getConfiguration() {
return rss.getConfiguration();
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return rss.getZooKeeper();
}
@Override
public ClusterConnection getConnection() {
return rss.getConnection();
}
@Override
public MetaTableLocator getMetaTableLocator() {
return rss.getMetaTableLocator();
}
@Override
public ServerName getServerName() {
return rss.getServerName();
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return rss.getCoordinatedStateManager();
}
@Override
public ChoreService getChoreService() {
return rss.getChoreService();
}
@Override
public void abort(String why, Throwable e) {
rss.abort(why, e);
}
@Override
public boolean isAborted() {
return rss.isAborted();
}
@Override
public void stop(String why) {
rss.stop(why);
}
@Override
public boolean isStopped() {
return rss.isStopped();
}
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<HBaseProtos.ServerName> favoredNodes) {
rss.updateRegionFavoredNodesMapping(encodedRegionName, favoredNodes);
}
@Override
public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
return rss.getFavoredNodesForRegion(encodedRegionName);
}
}
}