HBASE-18005 read replica: handle the case that region server hosting both primary replica and meta region is down (huaxiang sun)

This commit is contained in:
tedyu 2017-06-06 09:07:17 -07:00
parent 69d3e332f1
commit 39e8e2fb58
5 changed files with 269 additions and 41 deletions

View File

@ -1296,7 +1296,8 @@ class ConnectionManager {
} else {
// If we are not supposed to be using the cache, delete any existing cached location
// so it won't interfere.
metaCache.clearCache(tableName, row);
// We are only supposed to clean the cache for the specific replicaId
metaCache.clearCache(tableName, row, replicaId);
}
// Query the meta region

View File

@ -318,6 +318,40 @@ public class MetaCache {
}
}
/**
* Delete a cached location with specific replicaId.
* @param tableName tableName
* @param row row key
* @param replicaId region replica id
*/
public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
RegionLocations regionLocations = getCachedLocation(tableName, row);
if (regionLocations != null) {
HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
if (toBeRemoved != null) {
RegionLocations updatedLocations = regionLocations.remove(replicaId);
byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
boolean removed;
if (updatedLocations.isEmpty()) {
removed = tableLocations.remove(startKey, regionLocations);
} else {
removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
}
if (removed) {
if (metrics != null) {
metrics.incrMetaCacheNumClearRegion();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Removed " + toBeRemoved + " from cache");
}
}
}
}
}
/**
* Delete a cached location for a table, row and server
*/

View File

@ -195,10 +195,39 @@ public class RpcRetryingCallerWithReadReplicas {
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
ResultBoundedCompletionService<Result> cs =
new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
RegionLocations rl = null;
boolean skipPrimary = false;
try {
rl = getRegionLocations(true,
(isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
cConnection, tableName, get.getRow());
} catch (RetriesExhaustedException | DoNotRetryIOException e) {
// When there is no specific replica id specified. It just needs to load all replicas.
if (isTargetReplicaSpecified) {
throw e;
} else {
// We cannot get the primary replica location, it is possible that the region
// server hosting meta is down, it needs to proceed to try cached replicas.
if (cConnection instanceof ConnectionManager.HConnectionImplementation) {
rl = ((ConnectionManager.HConnectionImplementation)cConnection).getCachedLocation(
tableName, get.getRow());
if (rl == null) {
// No cached locations
throw e;
}
// Primary replica location is not known, skip primary replica
skipPrimary = true;
} else {
// For completeness
throw e;
}
}
}
final ResultBoundedCompletionService<Result> cs =
new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
int startIndex = 0;
int endIndex = rl.size();
@ -206,25 +235,30 @@ public class RpcRetryingCallerWithReadReplicas {
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
endIndex = 1;
} else {
addCallsForReplica(cs, rl, 0, 0);
try {
// wait for the timeout to see whether the primary responds back
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
return f.get(); //great we got a response
}
} catch (ExecutionException e) {
// We ignore the ExecutionException and continue with the secondary replicas
if (LOG.isDebugEnabled()) {
LOG.debug("Primary replica returns " + e.getCause());
}
if (!skipPrimary) {
addCallsForReplica(cs, rl, 0, 0);
try {
// wait for the timeout to see whether the primary responds back
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
return f.get(); //great we got a response
}
} catch (ExecutionException e) {
// We ignore the ExecutionException and continue with the secondary replicas
if (LOG.isDebugEnabled()) {
LOG.debug("Primary replica returns " + e.getCause());
}
// Skip the result from the primary as we know that there is something wrong
startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
throw new InterruptedIOException();
// Skip the result from the primary as we know that there is something wrong
startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} else {
// Since primary replica is skipped, the endIndex needs to be adjusted accordingly
endIndex --;
}
// submit call for the all of the secondaries at once
@ -324,10 +358,10 @@ public class RpcRetryingCallerWithReadReplicas {
} catch (InterruptedIOException e) {
throw e;
} catch (IOException e) {
throw new RetriesExhaustedException("Can't get the location", e);
throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e);
}
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
throw new RetriesExhaustedException("Can't get the location for replica " + replicaId);
}
return rl;

View File

@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
@ -142,10 +143,25 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
//2. We should close the "losing" scanners (scanners other than the ones we hear back
// from first)
//
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
RegionLocations rl = null;
try {
rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
} catch (RetriesExhaustedException | DoNotRetryIOException e) {
// We cannot get the primary replica region location, it is possible that the region server
// hosting meta table is down, it needs to proceed to try cached replicas directly.
if (cConnection instanceof ConnectionManager.HConnectionImplementation) {
rl = ((ConnectionManager.HConnectionImplementation) cConnection)
.getCachedLocation(tableName, currentScannerCallable.getRow());
if (rl == null) {
throw e;
}
} else {
// For completeness
throw e;
}
}
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@ -157,13 +159,40 @@ public class TestReplicaWithCluster {
return null;
}
@Override
public void postGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final Result result)
throws IOException {
}
}
/**
* This copro is used to slow down the primary meta region scan a bit
*/
public static class RegionServerHostingPrimayMetaRegionSlowCopro extends BaseRegionObserver {
public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro extends BaseRegionObserver {
static boolean slowDownPrimaryMetaScan = false;
static boolean throwException = false;
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Fail for the primary replica, but not for meta
if (throwException) {
if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
.getRegion().getRegionInfo());
throw new RegionServerStoppedException("Server " +
e.getEnvironment().getRegionServerServices().getServerName() + " not running");
}
} else {
LOG.info("Get, We're replica region " + replicaId);
}
}
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
@ -172,21 +201,34 @@ public class TestReplicaWithCluster {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Slow down with the primary meta region scan
if (slowDownPrimaryMetaScan && (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion()
&& (replicaId == 0))) {
LOG.info("Scan with primary meta region, slow down a bit");
try {
Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
} catch (InterruptedException ie) {
// Ingore
if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
if (slowDownPrimaryMetaScan) {
LOG.info("Scan with primary meta region, slow down a bit");
try {
Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
} catch (InterruptedException ie) {
// Ingore
}
}
// Fail for the primary replica
if (throwException) {
LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
.getRegion().getRegionInfo());
throw new RegionServerStoppedException("Server " +
e.getEnvironment().getRegionServerServices().getServerName() + " not running");
} else {
LOG.info("Scan, We're replica region " + replicaId);
}
} else {
LOG.info("Scan, We're replica region " + replicaId);
}
return null;
}
}
@BeforeClass
public static void beforeClass() throws Exception {
// enable store file refreshing
@ -213,7 +255,7 @@ public class TestReplicaWithCluster {
// Set system coprocessor so it can be applied to meta regions
HTU.getConfiguration().set("hbase.coprocessor.region.classes",
RegionServerHostingPrimayMetaRegionSlowCopro.class.getName());
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT,
META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
@ -632,14 +674,14 @@ public class TestReplicaWithCluster {
HTU.createTable(hdt, new byte[][] { f }, null);
RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = true;
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
// Get user table location, always get it from the primary meta replica
RegionLocations url = ((ClusterConnection) HTU.getConnection())
.locateRegion(hdt.getTableName(), row, false, false);
} finally {
RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = false;
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
((ConnectionManager.HConnectionImplementation) HTU.getHBaseAdmin().getConnection()).
setUseMetaReplicas(false);
HTU.getHBaseAdmin().setBalancerRunning(true, true);
@ -647,4 +689,105 @@ public class TestReplicaWithCluster {
HTU.deleteTable(hdt.getTableName());
}
}
// This test is to simulate the case that the meta region and the primary user region
// are down, hbase client is able to access user replica regions and return stale data.
// Meta replica is enabled to show the case that the meta replica region could be out of sync
// with the primary meta region.
@Test
public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
HTU.getHBaseAdmin().setBalancerRunning(false, true);
((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
setUseMetaReplicas(true);
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
hdt.setRegionReplication(2);
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
// Get Meta location
RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
.locateRegion(TableName.META_TABLE_NAME,
HConstants.EMPTY_START_ROW, false, false);
// Get user table location
RegionLocations url = ((ClusterConnection) HTU.getConnection())
.locateRegion(hdt.getTableName(), row, false, false);
// Make sure that user primary region is co-hosted with the meta region
if (!url.getDefaultRegionLocation().getServerName().equals(
mrl.getDefaultRegionLocation().getServerName())) {
HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
mrl.getDefaultRegionLocation().getServerName());
}
// Make sure that the user replica region is not hosted by the same region server with
// primary
if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
.getServerName())) {
HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
url.getDefaultRegionLocation().getServerName());
}
// Wait until the meta table is updated with new location info
while (true) {
mrl = ((ClusterConnection) HTU.getConnection())
.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
// Get user table location
url = ((ClusterConnection) HTU.getConnection())
.locateRegion(hdt.getTableName(), row, false, true);
LOG.info("meta locations " + mrl);
LOG.info("table locations " + url);
ServerName a = url.getDefaultRegionLocation().getServerName();
ServerName b = mrl.getDefaultRegionLocation().getServerName();
if(a.equals(b)) {
break;
} else {
LOG.info("Waiting for new region info to be updated in meta table");
Thread.sleep(100);
}
}
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
// Flush so it can be picked by the replica refresher thread
HTU.flush(table.getName());
// Sleep for some time until data is picked up by replicas
try {
Thread.sleep(2 * REFRESH_PERIOD);
} catch (InterruptedException e1) {
LOG.error(e1);
}
// Simulating the RS down
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
// The first Get is supposed to succeed
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
// The second Get will succeed as well
r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
setUseMetaReplicas(false);
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
HTU.getHBaseAdmin().setBalancerRunning(true, true);
HTU.getHBaseAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
}
}