HBASE-23282 HBCKServerCrashProcedure for 'Unknown Servers'

Have the existing scheduleRecoveries launch a new HBCKSCP
instead of SCP. It gets regions to recover from Master
in-memory context AND from a scan of hbase:meta. This
new HBCKSCP is For processing 'Unknown Servers', servers that
are 'dead' and purged but still have references in
hbase:meta. Rare occurance but needs tooling to address.
Later have catalogjanitor take care of these deviations
between Master in-memory and hbase:meta content (usually
because of overdriven cluster with failed RPCs to hbase:meta,
etc)

Changed expireServers in ServerManager so could pass in
custom reaction to expired server.... This is how we
run our custom HBCKSCP while keeping all other aspects
of expiring services (rather than try replicate it
externally).
This commit is contained in:
stack 2019-11-13 22:36:26 -08:00
parent 8bfdfe1b85
commit f8f9a26cb3
10 changed files with 389 additions and 60 deletions

View File

@ -248,6 +248,10 @@ public class RegionState {
return state == State.CLOSED;
}
public boolean isClosedOrAbnormallyClosed() {
return isClosed() || this.state == State.ABNORMALLY_CLOSED;
}
public boolean isOpening() {
return state == State.OPENING;
}

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -80,6 +79,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
// TODO: Only works with single hbase:meta region currently. Fix.
// TODO: Should it start over every time? Could it continue if runs into problem? Only if
// problem does not mess up 'results'.
// TODO: Do more by way of 'repair'; see note on unknownServers below.
@InterfaceAudience.Private
public class CatalogJanitor extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(CatalogJanitor.class.getName());
@ -169,17 +169,16 @@ public class CatalogJanitor extends ScheduledChore {
LOG.debug("CatalogJanitor already running");
return gcs;
}
Report report = scanForReport();
this.lastReport = report;
if (!report.isEmpty()) {
LOG.warn(report.toString());
this.lastReport = scanForReport();
if (!this.lastReport.isEmpty()) {
LOG.warn(this.lastReport.toString());
}
if (isRIT(this.services.getAssignmentManager())) {
LOG.warn("Playing-it-safe skipping merge/split gc'ing of regions from hbase:meta while " +
"regions-in-transition (RIT)");
}
Map<RegionInfo, Result> mergedRegions = report.mergedRegions;
Map<RegionInfo, Result> mergedRegions = this.lastReport.mergedRegions;
for (Map.Entry<RegionInfo, Result> e : mergedRegions.entrySet()) {
if (this.services.isInMaintenanceMode()) {
// Stop cleaning if the master is in maintenance mode
@ -192,7 +191,7 @@ public class CatalogJanitor extends ScheduledChore {
}
}
// Clean split parents
Map<RegionInfo, Result> splitParents = report.splitParents;
Map<RegionInfo, Result> splitParents = this.lastReport.splitParents;
// Now work on our list of found parents. See if any we can clean up.
HashSet<String> parentNotCleaned = new HashSet<>();
@ -443,7 +442,14 @@ public class CatalogJanitor extends ScheduledChore {
private final List<Pair<RegionInfo, RegionInfo>> holes = new ArrayList<>();
private final List<Pair<RegionInfo, RegionInfo>> overlaps = new ArrayList<>();
/**
* TODO: If CatalogJanitor finds an 'Unknown Server', it should 'fix' it by queuing
* a {@link org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure} for
* found server for it to clean up meta.
*/
private final List<Pair<RegionInfo, ServerName>> unknownServers = new ArrayList<>();
private final List<byte []> emptyRegionInfo = new ArrayList<>();
@VisibleForTesting
@ -703,7 +709,9 @@ public class CatalogJanitor extends ScheduledChore {
if (locations.getRegionLocations() == null) {
return;
}
// Check referenced servers are known/online.
// Check referenced servers are known/online. Here we are looking
// at both the default replica -- the main replica -- and then replica
// locations too.
for (HRegionLocation location: locations.getRegionLocations()) {
if (location == null) {
continue;
@ -717,19 +725,25 @@ public class CatalogJanitor extends ScheduledChore {
// This should never happen but if it does, will mess up below.
continue;
}
RegionInfo ri = location.getRegion();
// Skip split parent region
if (location.getRegion().isSplitParent()) {
if (ri.isSplitParent()) {
continue;
}
// skip the offline regions which belong to disabled table.
if (isTableDisabled(location.getRegion())) {
if (isTableDisabled(ri)) {
continue;
}
RegionState rs = this.services.getAssignmentManager().getRegionStates().getRegionState(ri);
if (rs.isClosedOrAbnormallyClosed()) {
// If closed against an 'Unknown Server', that is should be fine.
continue;
}
ServerManager.ServerLiveState state = this.services.getServerManager().
isServerKnownAndOnline(sn);
switch (state) {
case UNKNOWN:
this.report.unknownServers.add(new Pair<>(location.getRegion(), sn));
this.report.unknownServers.add(new Pair<>(ri, sn));
break;
default:

View File

@ -61,8 +61,8 @@ public class DeadServer {
private final Set<ServerName> processingServers = new HashSet<ServerName>();
/**
* A dead server that comes back alive has a different start code. The new start code should be
* greater than the old one, but we don't take this into account in this method.
* Handles restart of a server. The new server instance has a different start code.
* The new start code should be greater than the old one. We don't check that here.
*
* @param newServerName Servername as either <code>host:port</code> or
* <code>host,port,startcode</code>.
@ -78,7 +78,8 @@ public class DeadServer {
// remove from processingServers
boolean removed = processingServers.remove(sn);
if (removed) {
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
LOG.debug("Removed {}, processing={}, numProcessing={}", sn, removed,
processingServers.size());
}
return true;
}
@ -122,7 +123,6 @@ public class DeadServer {
/**
* Adds the server to the dead server list if it's not there already.
* @param sn the server name
*/
public synchronized void add(ServerName sn) {
if (!deadServers.containsKey(sn)){

View File

@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -70,6 +71,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
@ -2588,17 +2590,26 @@ public class MasterRpcServices extends RSRpcServices
List<HBaseProtos.ServerName> serverNames = request.getServerNameList();
List<Long> pids = new ArrayList<>();
try {
for (HBaseProtos.ServerName serverName : serverNames) {
ServerName server = ProtobufUtil.toServerName(serverName);
for (HBaseProtos.ServerName sn: serverNames) {
ServerName serverName = ProtobufUtil.toServerName(sn);
LOG.info("{} schedule ServerCrashProcedure for {}",
master.getClientIdAuditPrefix(), server);
if (shouldSubmitSCP(server)) {
master.getServerManager().moveFromOnlineToDeadServers(server);
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
pids.add(procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
server, true, containMetaWals(server))));
this.master.getClientIdAuditPrefix(), serverName);
if (shouldSubmitSCP(serverName)) {
final boolean containsMetaWALs = containMetaWals(serverName);
long pid = this.master.getServerManager().expireServer(serverName,
new Function<ServerName, Long>() {
@Override
public Long apply(ServerName serverName) {
ProcedureExecutor<MasterProcedureEnv> procExec =
master.getMasterProcedureExecutor();
return procExec.submitProcedure(
new HBCKServerCrashProcedure(procExec.getEnvironment(),
serverName, true, containsMetaWALs));
}
});
pids.add(pid);
} else {
pids.add(-1L);
pids.add(Procedure.NO_PROC_ID);
}
}
return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build();

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@ -54,6 +55,8 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
@ -564,17 +567,37 @@ public class ServerManager {
* have queued an SCP for this server or SCP processing is currently disabled because we
* are in startup phase).
*/
public synchronized boolean expireServer(final ServerName serverName) {
public boolean expireServer(final ServerName serverName) {
return expireServer(serverName, new Function<ServerName, Long>() {
@Override
public Long apply(ServerName serverName) {
return master.getAssignmentManager().submitServerCrash(serverName, true);
}
}) != Procedure.NO_PROC_ID;
}
/**
* Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
* Used when expireServer is externally invoked by hbck2.
* @param function Takes ServerName and returns pid. See default implementation which queues
* an SCP via the AssignmentManager.
* @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
* many reasons including the fact that its this server that is going down or we already
* have queued an SCP for this server or SCP processing is currently disabled because we
* are in startup phase).
*/
synchronized long expireServer(final ServerName serverName,
Function<ServerName, Long> function) {
// THIS server is going down... can't handle our own expiration.
if (serverName.equals(master.getServerName())) {
if (!(master.isAborted() || master.isStopped())) {
master.stop("We lost our znode?");
}
return false;
return Procedure.NO_PROC_ID;
}
if (this.deadservers.isDeadServer(serverName)) {
LOG.warn("Expiration called on {} but crash processing already in progress", serverName);
return false;
LOG.warn("Expiration called on {} but already in DeadServer", serverName);
return Procedure.NO_PROC_ID;
}
moveFromOnlineToDeadServers(serverName);
@ -586,40 +609,42 @@ public class ServerManager {
if (this.onlineServers.isEmpty()) {
master.stop("Cluster shutdown set; onlineServer=0");
}
return false;
return Procedure.NO_PROC_ID;
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
long pid = master.getAssignmentManager().submitServerCrash(serverName, true);
if(pid <= 0) {
return false;
} else {
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
for (ServerListener listener : this.listeners) {
listener.serverRemoved(serverName);
}
}
// trigger a persist of flushedSeqId
if (flushedSeqIdFlusher != null) {
flushedSeqIdFlusher.triggerNow();
}
return true;
long pid = function.apply(serverName);
if (pid <= 0) {
return Procedure.NO_PROC_ID;
}
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
}
// trigger a persist of flushedSeqId
if (flushedSeqIdFlusher != null) {
flushedSeqIdFlusher.triggerNow();
}
return pid;
}
// Note: this is currently invoked from RPC, not just tests. Locking in this class needs cleanup.
@VisibleForTesting
public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
synchronized (onlineServers) {
if (!this.onlineServers.containsKey(sn)) {
synchronized (this.onlineServers) {
boolean online = this.onlineServers.containsKey(sn);
if (online) {
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(sn);
this.onlineServers.remove(sn);
onlineServers.notifyAll();
} else {
// If not online, that is odd but may happen if 'Unknown Servers' -- where meta
// has references to servers not online nor in dead servers list. If
// 'Unknown Server', don't add to DeadServers else will be there for ever.
LOG.trace("Expiration of {} but server not online", sn);
}
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(sn);
this.onlineServers.remove(sn);
onlineServers.notifyAll();
}
}

View File

@ -1494,7 +1494,7 @@ public class AssignmentManager {
boolean carryingMeta;
long pid;
ServerStateNode serverNode = regionStates.getServerNode(serverName);
if(serverNode == null){
if (serverNode == null) {
LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
return -1;
}
@ -1504,7 +1504,7 @@ public class AssignmentManager {
rsReports.remove(serverName);
}
// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
// We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
// sure that, the region list fetched by SCP will not be changed any more.

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A SCP that differs from default only in how it gets the list of
* Regions hosted on the crashed-server; it also reads hbase:meta directly rather
* than rely solely on Master memory for list of Regions that were on crashed server.
* This version of SCP is for external invocation as part of fix-up (e.g. HBCK2's
* scheduleRecoveries). It is for the case where meta has references to 'Unknown Servers',
* servers that are in hbase:meta but not in live-server or dead-server lists; i.e. Master
* and hbase:meta content have deviated. It should never happen in normal running
* cluster but if we do drop accounting of servers, we need a means of fix-up.
* Eventually, as part of normal CatalogJanitor task, rather than just identify
* these 'Unknown Servers', it would make repair, queuing something like this
* HBCKSCP to do cleanup, reassigning them so Master and hbase:meta are aligned again.
*
* <p>NOTE that this SCP is costly to run; does a full scan of hbase:meta.</p>
*/
@InterfaceAudience.Private
public class HBCKServerCrashProcedure extends ServerCrashProcedure {
private static final Logger LOG = LoggerFactory.getLogger(HBCKServerCrashProcedure.class);
/**
* @param serverName Name of the crashed server.
* @param shouldSplitWal True if we should split WALs as part of crashed server processing.
* @param carryingMeta True if carrying hbase:meta table region.
*/
public HBCKServerCrashProcedure(final MasterProcedureEnv env, final ServerName serverName,
final boolean shouldSplitWal, final boolean carryingMeta) {
super(env, serverName, shouldSplitWal, carryingMeta);
}
/**
* Used when deserializing from a procedure store; we'll construct one of these then call
* #deserializeStateData(InputStream). Do not use directly.
*/
public HBCKServerCrashProcedure() {}
/**
* Adds Regions found by super method any found scanning hbase:meta.
*/
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="FindBugs seems confused on ps in below.")
List<RegionInfo> getRegionsOnCrashedServer(MasterProcedureEnv env) {
// Super can return immutable emptyList.
List<RegionInfo> ris = super.getRegionsOnCrashedServer(env);
List<Pair<RegionInfo, ServerName>> ps = null;
try {
ps = MetaTableAccessor.getTableRegionsAndLocations(env.getMasterServices().getConnection(),
null, false);
} catch (IOException ioe) {
LOG.warn("Failed get of all regions; continuing", ioe);
}
if (ps == null || ps.isEmpty()) {
return ris;
}
List<RegionInfo> aggregate = ris == null || ris.isEmpty()?
new ArrayList<>(): new ArrayList<>(ris);
ps.stream().filter(p -> p.getSecond() != null && p.getSecond().equals(getServerName())).
forEach(p -> aggregate.add(p.getFirst()));
return aggregate;
}
}

View File

@ -120,8 +120,9 @@ public class ServerCrashProcedure
final MasterServices services = env.getMasterServices();
final AssignmentManager am = env.getAssignmentManager();
updateProgress(true);
// HBASE-14802
// If we have not yet notified that we are processing a dead server, we should do now.
// HBASE-14802 If we have not yet notified that we are processing a dead server, do so now.
// This adds server to the DeadServer processing list but not to the DeadServers list.
// Server gets removed from processing list below on procedure successful finish.
if (!notifiedDeadServer) {
services.getServerManager().getDeadServers().notifyServer(serverName);
notifiedDeadServer = true;
@ -175,12 +176,14 @@ public class ServerCrashProcedure
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
case SERVER_CRASH_GET_REGIONS:
this.regionsOnCrashedServer =
services.getAssignmentManager().getRegionsOnServer(serverName);
this.regionsOnCrashedServer = getRegionsOnCrashedServer(env);
// Where to go next? Depends on whether we should split logs at all or
// if we should do distributed log splitting.
if (regionsOnCrashedServer != null) {
LOG.info("{} had {} regions", serverName, regionsOnCrashedServer.size());
if (LOG.isTraceEnabled()) {
this.regionsOnCrashedServer.stream().forEach(ri -> LOG.trace(ri.getShortNameToLog()));
}
}
if (!this.shouldSplitWal) {
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
@ -242,6 +245,13 @@ public class ServerCrashProcedure
return Flow.HAS_MORE_STATE;
}
/**
* @return List of Regions on crashed server.
*/
List<RegionInfo> getRegionsOnCrashedServer(MasterProcedureEnv env) {
return env.getMasterServices().getAssignmentManager().getRegionsOnServer(serverName);
}
private void cleanupSplitDir(MasterProcedureEnv env) {
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
try {

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test of the HBCK-version of SCP.
* The HBCKSCP is an SCP only it reads hbase:meta for list of Regions that were
* on the server-to-process rather than consult Master in-memory-state.
*/
@Category({ MasterTests.class, LargeTests.class })
public class TestHBCKSCP extends TestSCPBase {
private static final Logger LOG = LoggerFactory.getLogger(TestHBCKSCP.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBCKSCP.class);
@Rule
public TestName name = new TestName();
@Test
public void test() throws Exception {
// we are about to do one for it?
MiniHBaseCluster cluster = this.util.getHBaseCluster();
// Assert that we have three RegionServers. Test depends on there being multiple.
assertEquals(RS_COUNT, cluster.getLiveRegionServerThreads().size());
int count;
try (Table table = createTable(TableName.valueOf(this.name.getMethodName()))) {
// Load the table with a bit of data so some logs to split and some edits in each region.
this.util.loadTable(table, HBaseTestingUtility.COLUMNS[0]);
count = util.countRows(table);
}
assertTrue("expected some rows", count > 0);
// Make the test easier by not working on server hosting meta...
// Find another RS. Purge it from Master memory w/o running SCP (if
// SCP runs, it will clear entries from hbase:meta which frustrates
// our attempt at manufacturing 'Unknown Servers' condition).
int metaIndex = this.util.getMiniHBaseCluster().getServerWithMeta();
int rsIndex = (metaIndex + 1) % RS_COUNT;
ServerName rsServerName = cluster.getRegionServer(rsIndex).getServerName();
HMaster master = cluster.getMaster();
// Get a Region that is on the server.
RegionInfo rsRI = master.getAssignmentManager().getRegionsOnServer(rsServerName).get(0);
Result r = MetaTableAccessor.getRegionResult(master.getConnection(), rsRI.getRegionName());
// Assert region is OPEN.
assertEquals(RegionState.State.OPEN.toString(),
Bytes.toString(r.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)));
ServerName serverName = MetaTableAccessor.getServerName(r, 0);
assertTrue(rsServerName.equals(serverName));
// moveFrom adds to dead servers and adds it to processing list only we will
// not be processing this server 'normally'. Remove it from processing by
// calling 'finish' and then remove it from dead servers so rsServerName
// becomes an 'Unknown Server' even though it is still around.
master.getServerManager().moveFromOnlineToDeadServers(rsServerName);
master.getServerManager().getDeadServers().finish(rsServerName);
master.getServerManager().getDeadServers().removeDeadServer(rsServerName);
// Kill the server. Nothing should happen since an 'Unknown Server' as far
// as the Master is concerned; i.e. no SCP.
LOG.info("Killing {}", rsServerName);
HRegionServer hrs = cluster.getRegionServer(rsServerName);
hrs.abort("KILLED");
while (!hrs.isStopped()) {
Threads.sleep(10);
}
LOG.info("Dead {}", rsServerName);
// Now assert still references in hbase:meta to the 'dead' server -- they haven't been
// cleaned up by an SCP or by anything else.
assertTrue(searchMeta(master, rsServerName));
// Assert region is OPEN on dead server still.
r = MetaTableAccessor.getRegionResult(master.getConnection(), rsRI.getRegionName());
assertEquals(RegionState.State.OPEN.toString(),
Bytes.toString(r.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)));
serverName = MetaTableAccessor.getServerName(r, 0);
assertNotNull(cluster.getRegionServer(serverName));
assertEquals(rsServerName, serverName);
// I now have 'Unknown Server' references in hbase:meta; i.e. Server references
// with no corresponding SCP. Queue one.
MasterProtos.ScheduleServerCrashProcedureResponse response =
master.getMasterRpcServices().scheduleServerCrashProcedure(null,
MasterProtos.ScheduleServerCrashProcedureRequest.newBuilder().
addServerName(ProtobufUtil.toServerName(rsServerName)).build());
assertEquals(1, response.getPidCount());
long pid = response.getPid(0);
assertNotEquals(Procedure.NO_PROC_ID, pid);
while (master.getMasterProcedureExecutor().getActiveProcIds().contains(pid)) {
Threads.sleep(10);
}
// After SCP, assert region is OPEN on new server.
r = MetaTableAccessor.getRegionResult(master.getConnection(), rsRI.getRegionName());
assertEquals(RegionState.State.OPEN.toString(),
Bytes.toString(r.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)));
serverName = MetaTableAccessor.getServerName(r, 0);
assertNotNull(cluster.getRegionServer(serverName));
assertNotEquals(rsServerName, serverName);
// Make sure no mention of old server post SCP.
assertFalse(searchMeta(master, rsServerName));
assertFalse(master.getServerManager().getDeadServers().isProcessingServer(rsServerName));
assertFalse(master.getServerManager().getDeadServers().isDeadServer(rsServerName));
}
/**
* @return True if we find reference to <code>sn</code> in meta table.
*/
boolean searchMeta(HMaster master, ServerName sn) throws IOException {
List<Pair<RegionInfo, ServerName>> ps =
MetaTableAccessor.getTableRegionsAndLocations(master.getConnection(), null);
for (Pair<RegionInfo, ServerName> p: ps) {
if (p.getSecond().equals(sn)) {
return true;
}
}
return false;
}
}

View File

@ -46,15 +46,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestSCPBase {
private static final Logger LOG = LoggerFactory.getLogger(TestSCPBase.class);
static final int RS_COUNT = 3;
protected HBaseTestingUtility util;
protected void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
conf.set("hbase.balancer.tablesOnMaster", "none");
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RS_COUNT);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3);
conf.setBoolean("hbase.split.writer.creation.bounded", true);
conf.setInt("hbase.regionserver.hlog.splitlog.writer.threads", 8);