HBASE-18005 read replica: handle the case that region server hosting both primary replica and meta region is down (huaxiang sun)
This commit is contained in:
parent
549171465d
commit
ef46debde8
|
@ -826,7 +826,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
} else {
|
||||
// If we are not supposed to be using the cache, delete any existing cached location
|
||||
// so it won't interfere.
|
||||
metaCache.clearCache(tableName, row);
|
||||
// We are only supposed to clean the cache for the specific replicaId
|
||||
metaCache.clearCache(tableName, row, replicaId);
|
||||
}
|
||||
|
||||
// Query the meta region
|
||||
|
|
|
@ -312,6 +312,40 @@ public class MetaCache {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a cached location with specific replicaId.
|
||||
* @param tableName tableName
|
||||
* @param row row key
|
||||
* @param replicaId region replica id
|
||||
*/
|
||||
public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
|
||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
||||
|
||||
RegionLocations regionLocations = getCachedLocation(tableName, row);
|
||||
if (regionLocations != null) {
|
||||
HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
|
||||
if (toBeRemoved != null) {
|
||||
RegionLocations updatedLocations = regionLocations.remove(replicaId);
|
||||
byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
|
||||
boolean removed;
|
||||
if (updatedLocations.isEmpty()) {
|
||||
removed = tableLocations.remove(startKey, regionLocations);
|
||||
} else {
|
||||
removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
|
||||
}
|
||||
|
||||
if (removed) {
|
||||
if (metrics != null) {
|
||||
metrics.incrMetaCacheNumClearRegion();
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Removed " + toBeRemoved + " from cache");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a cached location for a table, row and server
|
||||
*/
|
||||
|
|
|
@ -170,9 +170,36 @@ public class RpcRetryingCallerWithReadReplicas {
|
|||
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
|
||||
boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
|
||||
|
||||
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
|
||||
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
|
||||
final ResultBoundedCompletionService<Result> cs =
|
||||
RegionLocations rl = null;
|
||||
boolean skipPrimary = false;
|
||||
try {
|
||||
rl = getRegionLocations(true,
|
||||
(isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
|
||||
cConnection, tableName, get.getRow());
|
||||
} catch (RetriesExhaustedException | DoNotRetryIOException e) {
|
||||
// When there is no specific replica id specified. It just needs to load all replicas.
|
||||
if (isTargetReplicaSpecified) {
|
||||
throw e;
|
||||
} else {
|
||||
// We cannot get the primary replica location, it is possible that the region
|
||||
// server hosting meta is down, it needs to proceed to try cached replicas.
|
||||
if (cConnection instanceof ConnectionImplementation) {
|
||||
rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
|
||||
if (rl == null) {
|
||||
// No cached locations
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Primary replica location is not known, skip primary replica
|
||||
skipPrimary = true;
|
||||
} else {
|
||||
// For completeness
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final ResultBoundedCompletionService<Result> cs =
|
||||
new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
|
||||
int startIndex = 0;
|
||||
int endIndex = rl.size();
|
||||
|
@ -181,25 +208,30 @@ public class RpcRetryingCallerWithReadReplicas {
|
|||
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
|
||||
endIndex = 1;
|
||||
} else {
|
||||
addCallsForReplica(cs, rl, 0, 0);
|
||||
try {
|
||||
// wait for the timeout to see whether the primary responds back
|
||||
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
|
||||
if (f != null) {
|
||||
return f.get(); //great we got a response
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
// We ignore the ExecutionException and continue with the secondary replicas
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Primary replica returns " + e.getCause());
|
||||
}
|
||||
if (!skipPrimary) {
|
||||
addCallsForReplica(cs, rl, 0, 0);
|
||||
try {
|
||||
// wait for the timeout to see whether the primary responds back
|
||||
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
|
||||
if (f != null) {
|
||||
return f.get(); //great we got a response
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
// We ignore the ExecutionException and continue with the secondary replicas
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Primary replica returns " + e.getCause());
|
||||
}
|
||||
|
||||
// Skip the result from the primary as we know that there is something wrong
|
||||
startIndex = 1;
|
||||
} catch (CancellationException e) {
|
||||
throw new InterruptedIOException();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
// Skip the result from the primary as we know that there is something wrong
|
||||
startIndex = 1;
|
||||
} catch (CancellationException e) {
|
||||
throw new InterruptedIOException();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
} else {
|
||||
// Since primary replica is skipped, the endIndex needs to be adjusted accordingly
|
||||
endIndex --;
|
||||
}
|
||||
|
||||
// submit call for the all of the secondaries at once
|
||||
|
@ -288,10 +320,10 @@ public class RpcRetryingCallerWithReadReplicas {
|
|||
} catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
throw new RetriesExhaustedException("Can't get the location", e);
|
||||
throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e);
|
||||
}
|
||||
if (rl == null) {
|
||||
throw new RetriesExhaustedException("Can't get the locations");
|
||||
throw new RetriesExhaustedException("Can't get the location for replica " + replicaId);
|
||||
}
|
||||
|
||||
return rl;
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -142,10 +143,25 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
//2. We should close the "losing" scanners (scanners other than the ones we hear back
|
||||
// from first)
|
||||
//
|
||||
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
|
||||
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
|
||||
currentScannerCallable.getRow());
|
||||
|
||||
RegionLocations rl = null;
|
||||
try {
|
||||
rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
|
||||
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
|
||||
currentScannerCallable.getRow());
|
||||
} catch (RetriesExhaustedException | DoNotRetryIOException e) {
|
||||
// We cannot get the primary replica region location, it is possible that the region server
|
||||
// hosting meta table is down, it needs to proceed to try cached replicas directly.
|
||||
if (cConnection instanceof ConnectionImplementation) {
|
||||
rl = ((ConnectionImplementation) cConnection)
|
||||
.getCachedLocation(tableName, currentScannerCallable.getRow());
|
||||
if (rl == null) {
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
// For completeness
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
// allocate a boundedcompletion pool of some multiple of number of replicas.
|
||||
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
|
||||
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
|
||||
|
|
|
@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
|
@ -162,8 +164,28 @@ public class TestReplicaWithCluster {
|
|||
/**
|
||||
* This copro is used to slow down the primary meta region scan a bit
|
||||
*/
|
||||
public static class RegionServerHostingPrimayMetaRegionSlowCopro implements RegionObserver {
|
||||
public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro implements RegionObserver {
|
||||
static boolean slowDownPrimaryMetaScan = false;
|
||||
static boolean throwException = false;
|
||||
|
||||
@Override
|
||||
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Get get, final List<Cell> results) throws IOException {
|
||||
|
||||
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
|
||||
|
||||
// Fail for the primary replica, but not for meta
|
||||
if (throwException) {
|
||||
if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
|
||||
LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
|
||||
.getRegion().getRegionInfo());
|
||||
throw new RegionServerStoppedException("Server " +
|
||||
e.getEnvironment().getRegionServerServices().getServerName() + " not running");
|
||||
}
|
||||
} else {
|
||||
LOG.info("Get, We're replica region " + replicaId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
|
@ -172,21 +194,34 @@ public class TestReplicaWithCluster {
|
|||
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
|
||||
|
||||
// Slow down with the primary meta region scan
|
||||
if (slowDownPrimaryMetaScan && (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion()
|
||||
&& (replicaId == 0))) {
|
||||
LOG.info("Scan with primary meta region, slow down a bit");
|
||||
try {
|
||||
Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
|
||||
} catch (InterruptedException ie) {
|
||||
// Ingore
|
||||
if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
|
||||
if (slowDownPrimaryMetaScan) {
|
||||
LOG.info("Scan with primary meta region, slow down a bit");
|
||||
try {
|
||||
Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
|
||||
} catch (InterruptedException ie) {
|
||||
// Ingore
|
||||
}
|
||||
}
|
||||
|
||||
// Fail for the primary replica
|
||||
if (throwException) {
|
||||
LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
|
||||
.getRegion().getRegionInfo());
|
||||
|
||||
throw new RegionServerStoppedException("Server " +
|
||||
e.getEnvironment().getRegionServerServices().getServerName() + " not running");
|
||||
} else {
|
||||
LOG.info("Scan, We're replica region " + replicaId);
|
||||
}
|
||||
} else {
|
||||
LOG.info("Scan, We're replica region " + replicaId);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
// enable store file refreshing
|
||||
|
@ -216,7 +251,7 @@ public class TestReplicaWithCluster {
|
|||
|
||||
// Set system coprocessor so it can be applied to meta regions
|
||||
HTU.getConfiguration().set("hbase.coprocessor.region.classes",
|
||||
RegionServerHostingPrimayMetaRegionSlowCopro.class.getName());
|
||||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
|
||||
|
||||
HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT,
|
||||
META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
|
||||
|
@ -630,18 +665,117 @@ public class TestReplicaWithCluster {
|
|||
|
||||
HTU.createTable(hdt, new byte[][] { f }, null);
|
||||
|
||||
RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = true;
|
||||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
|
||||
|
||||
// Get user table location, always get it from the primary meta replica
|
||||
RegionLocations url = ((ClusterConnection) HTU.getConnection())
|
||||
.locateRegion(hdt.getTableName(), row, false, false);
|
||||
|
||||
} finally {
|
||||
RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = false;
|
||||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
|
||||
((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
|
||||
HTU.getAdmin().setBalancerRunning(true, true);
|
||||
HTU.getAdmin().disableTable(hdt.getTableName());
|
||||
HTU.deleteTable(hdt.getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// This test is to simulate the case that the meta region and the primary user region
|
||||
// are down, hbase client is able to access user replica regions and return stale data.
|
||||
// Meta replica is enabled to show the case that the meta replica region could be out of sync
|
||||
// with the primary meta region.
|
||||
@Test
|
||||
public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
|
||||
HTU.getAdmin().setBalancerRunning(false, true);
|
||||
|
||||
((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
|
||||
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
|
||||
hdt.setRegionReplication(2);
|
||||
try {
|
||||
|
||||
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
|
||||
|
||||
// Get Meta location
|
||||
RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
|
||||
.locateRegion(TableName.META_TABLE_NAME,
|
||||
HConstants.EMPTY_START_ROW, false, false);
|
||||
|
||||
// Get user table location
|
||||
RegionLocations url = ((ClusterConnection) HTU.getConnection())
|
||||
.locateRegion(hdt.getTableName(), row, false, false);
|
||||
|
||||
// Make sure that user primary region is co-hosted with the meta region
|
||||
if (!url.getDefaultRegionLocation().getServerName().equals(
|
||||
mrl.getDefaultRegionLocation().getServerName())) {
|
||||
HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
|
||||
mrl.getDefaultRegionLocation().getServerName());
|
||||
}
|
||||
|
||||
// Make sure that the user replica region is not hosted by the same region server with
|
||||
// primary
|
||||
if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
|
||||
.getServerName())) {
|
||||
HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
|
||||
url.getDefaultRegionLocation().getServerName());
|
||||
}
|
||||
|
||||
// Wait until the meta table is updated with new location info
|
||||
while (true) {
|
||||
mrl = ((ClusterConnection) HTU.getConnection())
|
||||
.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
|
||||
|
||||
// Get user table location
|
||||
url = ((ClusterConnection) HTU.getConnection())
|
||||
.locateRegion(hdt.getTableName(), row, false, true);
|
||||
|
||||
LOG.info("meta locations " + mrl);
|
||||
LOG.info("table locations " + url);
|
||||
ServerName a = url.getDefaultRegionLocation().getServerName();
|
||||
ServerName b = mrl.getDefaultRegionLocation().getServerName();
|
||||
if(a.equals(b)) {
|
||||
break;
|
||||
} else {
|
||||
LOG.info("Waiting for new region info to be updated in meta table");
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
Put p = new Put(row);
|
||||
p.addColumn(f, row, row);
|
||||
table.put(p);
|
||||
|
||||
// Flush so it can be picked by the replica refresher thread
|
||||
HTU.flush(table.getName());
|
||||
|
||||
// Sleep for some time until data is picked up by replicas
|
||||
try {
|
||||
Thread.sleep(2 * REFRESH_PERIOD);
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.error(e1);
|
||||
}
|
||||
|
||||
// Simulating the RS down
|
||||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
|
||||
|
||||
// The first Get is supposed to succeed
|
||||
Get g = new Get(row);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
Result r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
|
||||
// The second Get will succeed as well
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
|
||||
} finally {
|
||||
((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
|
||||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
|
||||
HTU.getAdmin().setBalancerRunning(true, true);
|
||||
HTU.getAdmin().disableTable(hdt.getTableName());
|
||||
HTU.deleteTable(hdt.getTableName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue