Revert "HBASE-25032 Do not assign regions to region server which has not called regionServerReport yet (#3268)"
This reverts commit 2bf86468d8
.
This commit is contained in:
parent
ca3b267130
commit
2fec951303
|
@ -320,7 +320,7 @@ public final class ServerMetricsBuilder {
|
||||||
Map<String,List<ReplicationLoadSource>> sourcesMap = new HashMap<>();
|
Map<String,List<ReplicationLoadSource>> sourcesMap = new HashMap<>();
|
||||||
for(ReplicationLoadSource loadSource : sources){
|
for(ReplicationLoadSource loadSource : sources){
|
||||||
sourcesMap.computeIfAbsent(loadSource.getPeerID(),
|
sourcesMap.computeIfAbsent(loadSource.getPeerID(),
|
||||||
peerId -> new ArrayList<>()).add(loadSource);
|
peerId -> new ArrayList()).add(loadSource);
|
||||||
}
|
}
|
||||||
return sourcesMap;
|
return sourcesMap;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
||||||
private ClusterMetrics clusterStatus;
|
private ClusterMetrics clusterStatus;
|
||||||
private MasterServices masterServices;
|
private MasterServices masterServices;
|
||||||
private volatile RSGroupInfoManager rsGroupInfoManager;
|
private volatile RSGroupInfoManager rsGroupInfoManager;
|
||||||
private volatile LoadBalancer internalBalancer;
|
private LoadBalancer internalBalancer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set this key to {@code true} to allow region fallback.
|
* Set this key to {@code true} to allow region fallback.
|
||||||
|
@ -85,7 +85,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
||||||
*/
|
*/
|
||||||
public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
|
public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
|
||||||
|
|
||||||
private volatile boolean fallbackEnabled = false;
|
private boolean fallbackEnabled = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
|
* Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.ipc.MetaRWQueueRpcExecutor;
|
|
||||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
import org.apache.hadoop.hbase.net.Address;
|
import org.apache.hadoop.hbase.net.Address;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
@ -70,11 +68,6 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp() throws Exception {
|
public static void setUp() throws Exception {
|
||||||
// avoid all the handlers blocked when meta is offline, and regionServerReport can not be
|
|
||||||
// processed which causes dead lock.
|
|
||||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
|
||||||
TEST_UTIL.getConfiguration()
|
|
||||||
.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
|
|
||||||
setUpTestBeforeClass();
|
setUpTestBeforeClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,7 +261,8 @@ public class TestRSGroupsKillRS extends TestRSGroupsBase {
|
||||||
assertTrue(majorVersion >= 1);
|
assertTrue(majorVersion >= 1);
|
||||||
String lowerVersion = String.valueOf(majorVersion - 1) + originVersion.split("\\.")[1];
|
String lowerVersion = String.valueOf(majorVersion - 1) + originVersion.split("\\.")[1];
|
||||||
setFinalStatic(Version.class.getField("version"), lowerVersion);
|
setFinalStatic(Version.class.getField("version"), lowerVersion);
|
||||||
TEST_UTIL.getMiniHBaseCluster().startRegionServer(address.getHostName(), address.getPort());
|
TEST_UTIL.getMiniHBaseCluster().startRegionServer(address.getHostname(),
|
||||||
|
address.getPort());
|
||||||
assertEquals(NUM_SLAVES_BASE,
|
assertEquals(NUM_SLAVES_BASE,
|
||||||
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size());
|
||||||
assertTrue(VersionInfo.compareVersion(originVersion,
|
assertTrue(VersionInfo.compareVersion(originVersion,
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
@ -63,6 +62,7 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
@ -951,19 +951,11 @@ public class ServerManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a list of possible destinations for a region. It contains the online servers, but not
|
* Creates a list of possible destinations for a region. It contains the online servers, but not
|
||||||
* the draining or dying servers.
|
* the draining or dying servers.
|
||||||
* @param serversToExclude can be null if there is no server to exclude
|
* @param serversToExclude can be null if there is no server to exclude
|
||||||
*/
|
*/
|
||||||
public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
|
public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
|
||||||
Set<ServerName> destServers = new HashSet<>();
|
final List<ServerName> destServers = getOnlineServersList();
|
||||||
onlineServers.forEach((sn, sm) -> {
|
|
||||||
if (sm.getLastReportTimestamp() > 0) {
|
|
||||||
// This means we have already called regionServerReport at leaset once, then let's include
|
|
||||||
// this server for region assignment. This is an optimization to avoid assigning regions to
|
|
||||||
// an uninitialized server. See HBASE-25032 for more details.
|
|
||||||
destServers.add(sn);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (serversToExclude != null) {
|
if (serversToExclude != null) {
|
||||||
destServers.removeAll(serversToExclude);
|
destServers.removeAll(serversToExclude);
|
||||||
|
@ -973,7 +965,7 @@ public class ServerManager {
|
||||||
final List<ServerName> drainingServersCopy = getDrainingServersList();
|
final List<ServerName> drainingServersCopy = getDrainingServersList();
|
||||||
destServers.removeAll(drainingServersCopy);
|
destServers.removeAll(drainingServersCopy);
|
||||||
|
|
||||||
return new ArrayList<>(destServers);
|
return destServers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,18 +18,13 @@
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UncheckedIOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompatibilityFactory;
|
import org.apache.hadoop.hbase.CompatibilityFactory;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
|
||||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -42,14 +37,9 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
|
||||||
|
|
||||||
@Category({MasterTests.class, MediumTests.class})
|
@Category({MasterTests.class, MediumTests.class})
|
||||||
public class TestMasterMetrics {
|
public class TestMasterMetrics {
|
||||||
|
@ -76,32 +66,6 @@ public class TestMasterMetrics {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected RSRpcServices createRpcServices() throws IOException {
|
|
||||||
return new MasterRpcServices(this) {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RegionServerStartupResponse regionServerStartup(RpcController controller,
|
|
||||||
RegionServerStartupRequest request) throws ServiceException {
|
|
||||||
RegionServerStartupResponse resp = super.regionServerStartup(controller, request);
|
|
||||||
ServerManager serverManager = getServerManager();
|
|
||||||
// to let the region server actual online otherwise we can not assign meta region
|
|
||||||
new HashMap<>(serverManager.getOnlineServers()).forEach((sn, sm) -> {
|
|
||||||
if (sm.getLastReportTimestamp() <= 0) {
|
|
||||||
try {
|
|
||||||
serverManager.regionServerReport(sn,
|
|
||||||
ServerMetricsBuilder.newBuilder(sn).setVersionNumber(sm.getVersionNumber())
|
|
||||||
.setVersion(sm.getVersion()).setLastReportTimestamp(System.currentTimeMillis())
|
|
||||||
.build());
|
|
||||||
} catch (YouAreDeadException e) {
|
|
||||||
throw new UncheckedIOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return resp;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
|
|
@ -156,8 +156,7 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
this.assignmentManager.start();
|
this.assignmentManager.start();
|
||||||
for (int i = 0; i < numServes; ++i) {
|
for (int i = 0; i < numServes; ++i) {
|
||||||
ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
|
ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
|
||||||
serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
|
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
|
||||||
.setLastReportTimestamp(System.currentTimeMillis()).build());
|
|
||||||
}
|
}
|
||||||
this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
|
this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
|
||||||
}
|
}
|
||||||
|
@ -183,8 +182,7 @@ public class MockMasterServices extends MockNoopMasterServices {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
|
ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
|
||||||
serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
|
serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
|
||||||
.setLastReportTimestamp(System.currentTimeMillis()).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,163 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.master.assignment;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertSame;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ForkJoinPool;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* UT for HBASE-25032.
|
|
||||||
*/
|
|
||||||
@Category({ MasterTests.class, MediumTests.class })
|
|
||||||
public class TestAssignRegionToUninitializedRegionServer {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestAssignRegionToUninitializedRegionServer.class);
|
|
||||||
|
|
||||||
private static CountDownLatch ARRIVE;
|
|
||||||
|
|
||||||
private static CountDownLatch RESUME;
|
|
||||||
|
|
||||||
private static AtomicBoolean ASSIGN_CALLED = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
public static final class RSRpcServicesForTest extends RSRpcServices {
|
|
||||||
|
|
||||||
public RSRpcServicesForTest(HRegionServer rs) throws IOException {
|
|
||||||
super(rs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ExecuteProceduresResponse executeProcedures(RpcController controller,
|
|
||||||
ExecuteProceduresRequest request) throws ServiceException {
|
|
||||||
if (request.getOpenRegionCount() > 0) {
|
|
||||||
ASSIGN_CALLED.set(true);
|
|
||||||
}
|
|
||||||
return super.executeProcedures(controller, request);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class RegionServerForTest extends HRegionServer {
|
|
||||||
|
|
||||||
public RegionServerForTest(Configuration conf) throws IOException {
|
|
||||||
super(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
|
||||||
throws IOException {
|
|
||||||
if (ARRIVE != null) {
|
|
||||||
ARRIVE.countDown();
|
|
||||||
ARRIVE = null;
|
|
||||||
try {
|
|
||||||
RESUME.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected RSRpcServices createRpcServices() throws IOException {
|
|
||||||
return new RSRpcServicesForTest(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private static TableName NAME = TableName.valueOf("test");
|
|
||||||
|
|
||||||
private static byte[] FAMILY = Bytes.toBytes("family");
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUp() throws Exception {
|
|
||||||
UTIL.startMiniCluster(1);
|
|
||||||
UTIL.createTable(NAME, FAMILY);
|
|
||||||
UTIL.waitTableAvailable(NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws IOException {
|
|
||||||
UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMove() throws Exception {
|
|
||||||
UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
|
|
||||||
RegionServerForTest.class, HRegionServer.class);
|
|
||||||
CountDownLatch arrive = new CountDownLatch(1);
|
|
||||||
ARRIVE = arrive;
|
|
||||||
RESUME = new CountDownLatch(1);
|
|
||||||
// restart a new region server, and wait until it finish initialization and want to call
|
|
||||||
// regionServerReport, so it will load the peer state to peer cache.
|
|
||||||
Future<HRegionServer> regionServerFuture = ForkJoinPool.commonPool()
|
|
||||||
.submit(() -> UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer());
|
|
||||||
ARRIVE.await();
|
|
||||||
// try move region to the new region server, it will fail, but we need to make sure that we do
|
|
||||||
// not try to assign it to the new server.
|
|
||||||
HRegionServer src = UTIL.getRSForFirstRegionInTable(NAME);
|
|
||||||
HRegionServer dst = UTIL.getOtherRegionServer(src);
|
|
||||||
try {
|
|
||||||
UTIL.getAdmin().move(UTIL.getAdmin().getRegions(NAME).get(0).getEncodedNameAsBytes(),
|
|
||||||
dst.getServerName());
|
|
||||||
// assert the region should still on the original region server, and we didn't call assign to
|
|
||||||
// the new server
|
|
||||||
assertSame(src, UTIL.getRSForFirstRegionInTable(NAME));
|
|
||||||
assertFalse(ASSIGN_CALLED.get());
|
|
||||||
} finally {
|
|
||||||
// let the region server go
|
|
||||||
RESUME.countDown();
|
|
||||||
}
|
|
||||||
// wait the new region server online
|
|
||||||
assertSame(dst, regionServerFuture.get());
|
|
||||||
// try move again
|
|
||||||
UTIL.getAdmin().move(UTIL.getAdmin().getRegions(NAME).get(0).getEncodedNameAsBytes(),
|
|
||||||
dst.getServerName());
|
|
||||||
// this time the region should be on the new region server
|
|
||||||
assertSame(dst, UTIL.getRSForFirstRegionInTable(NAME));
|
|
||||||
assertTrue(ASSIGN_CALLED.get());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -309,8 +309,7 @@ public abstract class TestAssignmentManagerBase {
|
||||||
ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
|
ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
|
||||||
newRsAdded++;
|
newRsAdded++;
|
||||||
try {
|
try {
|
||||||
this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder
|
this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder.of(newSn));
|
||||||
.newBuilder(newSn).setLastReportTimestamp(System.currentTimeMillis()).build());
|
|
||||||
} catch (YouAreDeadException e) {
|
} catch (YouAreDeadException e) {
|
||||||
// should not happen
|
// should not happen
|
||||||
throw new UncheckedIOException(e);
|
throw new UncheckedIOException(e);
|
||||||
|
|
|
@ -1,115 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ForkJoinPool;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
|
||||||
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This UT is used to make sure that we will not accidentally change the way to generate online
|
|
||||||
* servers. See HBASE-25774 and HBASE-25032 for more details.
|
|
||||||
*/
|
|
||||||
@Category({ MasterTests.class, MediumTests.class })
|
|
||||||
public class TestRefreshPeerWhileRegionServerRestarts extends TestReplicationBase {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestRefreshPeerWhileRegionServerRestarts.class);
|
|
||||||
|
|
||||||
private static CountDownLatch ARRIVE;
|
|
||||||
|
|
||||||
private static CountDownLatch RESUME;
|
|
||||||
|
|
||||||
public static final class RegionServerForTest extends HRegionServer {
|
|
||||||
|
|
||||||
public RegionServerForTest(Configuration conf) throws IOException {
|
|
||||||
super(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
|
||||||
throws IOException {
|
|
||||||
if (ARRIVE != null) {
|
|
||||||
ARRIVE.countDown();
|
|
||||||
ARRIVE = null;
|
|
||||||
try {
|
|
||||||
RESUME.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRestart() throws Exception {
|
|
||||||
UTIL1.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
|
|
||||||
RegionServerForTest.class, HRegionServer.class);
|
|
||||||
CountDownLatch arrive = new CountDownLatch(1);
|
|
||||||
ARRIVE = arrive;
|
|
||||||
RESUME = new CountDownLatch(1);
|
|
||||||
// restart a new region server, and wait until it finish initialization and want to call
|
|
||||||
// regionServerReport, so it will load the peer state to peer cache.
|
|
||||||
Future<HRegionServer> regionServerFuture = ForkJoinPool.commonPool()
|
|
||||||
.submit(() -> UTIL1.getMiniHBaseCluster().startRegionServer().getRegionServer());
|
|
||||||
ARRIVE.await();
|
|
||||||
// change the peer state, wait until it reach the last state, where we have already get the
|
|
||||||
// region server list for refreshing
|
|
||||||
Future<Void> future = hbaseAdmin.disableReplicationPeerAsync(PEER_ID2);
|
|
||||||
try {
|
|
||||||
UTIL1.waitFor(30000, () -> {
|
|
||||||
for (Procedure<?> proc : UTIL1.getMiniHBaseCluster().getMaster().getProcedures()) {
|
|
||||||
if (proc instanceof DisablePeerProcedure) {
|
|
||||||
return ((DisablePeerProcedure) proc).getCurrentStateId() ==
|
|
||||||
MasterProcedureProtos.PeerModificationState.POST_PEER_MODIFICATION_VALUE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
});
|
|
||||||
} finally {
|
|
||||||
// let the new region server go
|
|
||||||
RESUME.countDown();
|
|
||||||
}
|
|
||||||
// wait the disable peer operation to finish
|
|
||||||
future.get();
|
|
||||||
// assert that the peer cache on the new region server has also been refreshed
|
|
||||||
ReplicationPeer peer = regionServerFuture.get().getReplicationSourceService()
|
|
||||||
.getReplicationManager().getReplicationPeers().getPeer(PEER_ID2);
|
|
||||||
assertEquals(PeerState.DISABLED, peer.getPeerState());
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue