HBASE-12198 Fix the bug of not updating location cache
Summary: # Clear the cache of the server when failed Test Plan: Add testcase `TestHTableMultiplexerFlushCache` to reproduce the bug. Differential Revision: https://reviews.facebook.net/D24603 Signed-off-by: Elliott Clark <elliott@fb.com> Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
2c07372c2f
commit
3c1fbd2ddf
|
@ -1013,6 +1013,10 @@ class AsyncProcess {
|
||||||
Retry canRetry = errorsByServer.canRetryMore(numAttempt)
|
Retry canRetry = errorsByServer.canRetryMore(numAttempt)
|
||||||
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
|
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
|
||||||
|
|
||||||
|
if (tableName == null) {
|
||||||
|
// tableName is null when we made a cross-table RPC call.
|
||||||
|
hConnection.clearCaches(server);
|
||||||
|
}
|
||||||
int failed = 0, stopped = 0;
|
int failed = 0, stopped = 0;
|
||||||
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
|
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
|
||||||
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
|
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
|
||||||
|
@ -1021,7 +1025,9 @@ class AsyncProcess {
|
||||||
// Do not use the exception for updating cache because it might be coming from
|
// Do not use the exception for updating cache because it might be coming from
|
||||||
// any of the regions in the MultiAction.
|
// any of the regions in the MultiAction.
|
||||||
// TODO: depending on type of exception we might not want to update cache at all?
|
// TODO: depending on type of exception we might not want to update cache at all?
|
||||||
hConnection.updateCachedLocations(tableName, regionName, row, null, server);
|
if (tableName != null) {
|
||||||
|
hConnection.updateCachedLocations(tableName, regionName, row, null, server);
|
||||||
|
}
|
||||||
for (Action<Row> action : e.getValue()) {
|
for (Action<Row> action : e.getValue()) {
|
||||||
Retry retry = manageError(
|
Retry retry = manageError(
|
||||||
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
|
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -79,7 +81,7 @@ public class HTableMultiplexer {
|
||||||
private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
|
private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration workerConf;
|
||||||
private final ClusterConnection conn;
|
private final ClusterConnection conn;
|
||||||
private final ExecutorService pool;
|
private final ExecutorService pool;
|
||||||
private final int retryNum;
|
private final int retryNum;
|
||||||
|
@ -95,10 +97,9 @@ public class HTableMultiplexer {
|
||||||
*/
|
*/
|
||||||
public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
|
public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.conf = conf;
|
|
||||||
this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
|
this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
|
||||||
this.pool = HTable.getDefaultExecutor(conf);
|
this.pool = HTable.getDefaultExecutor(conf);
|
||||||
this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||||
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
|
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
|
||||||
this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
|
this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
|
||||||
|
@ -107,6 +108,11 @@ public class HTableMultiplexer {
|
||||||
this.executor =
|
this.executor =
|
||||||
Executors.newScheduledThreadPool(initThreads,
|
Executors.newScheduledThreadPool(initThreads,
|
||||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
|
||||||
|
|
||||||
|
this.workerConf = HBaseConfiguration.create(conf);
|
||||||
|
// We do not do the retry because we need to reassign puts to different queues if regions are
|
||||||
|
// moved.
|
||||||
|
this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -218,7 +224,7 @@ public class HTableMultiplexer {
|
||||||
worker = serverToFlushWorkerMap.get(addr);
|
worker = serverToFlushWorkerMap.get(addr);
|
||||||
if (worker == null) {
|
if (worker == null) {
|
||||||
// Create the flush worker
|
// Create the flush worker
|
||||||
worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize,
|
worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize,
|
||||||
pool, executor);
|
pool, executor);
|
||||||
this.serverToFlushWorkerMap.put(addr, worker);
|
this.serverToFlushWorkerMap.put(addr, worker);
|
||||||
executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
|
executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
|
||||||
|
@ -388,32 +394,30 @@ public class HTableMultiplexer {
|
||||||
|
|
||||||
private static class FlushWorker implements Runnable {
|
private static class FlushWorker implements Runnable {
|
||||||
private final HRegionLocation addr;
|
private final HRegionLocation addr;
|
||||||
private final AsyncProcess asyncProc;
|
|
||||||
private final LinkedBlockingQueue<PutStatus> queue;
|
private final LinkedBlockingQueue<PutStatus> queue;
|
||||||
private final HTableMultiplexer multiplexer;
|
private final HTableMultiplexer multiplexer;
|
||||||
private final AtomicLong totalFailedPutCount = new AtomicLong(0);
|
private final AtomicLong totalFailedPutCount = new AtomicLong(0);
|
||||||
private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
|
private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
|
||||||
private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
|
private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
|
||||||
private final AtomicLong maxLatency = new AtomicLong(0);
|
private final AtomicLong maxLatency = new AtomicLong(0);
|
||||||
private final ExecutorService pool;
|
|
||||||
|
private final AsyncProcess ap;
|
||||||
private final List<PutStatus> processingList = new ArrayList<>();
|
private final List<PutStatus> processingList = new ArrayList<>();
|
||||||
private final ScheduledExecutorService executor;
|
private final ScheduledExecutorService executor;
|
||||||
private final int maxRetryInQueue;
|
private final int maxRetryInQueue;
|
||||||
private final AtomicInteger retryInQueue = new AtomicInteger(0);
|
private final AtomicInteger retryInQueue = new AtomicInteger(0);
|
||||||
private final int rpcTimeOutMs;
|
|
||||||
|
|
||||||
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
|
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
|
||||||
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
|
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
|
||||||
ExecutorService pool, ScheduledExecutorService executor) {
|
ExecutorService pool, ScheduledExecutorService executor) {
|
||||||
this.addr = addr;
|
this.addr = addr;
|
||||||
this.asyncProc = conn.getAsyncProcess();
|
|
||||||
this.multiplexer = htableMultiplexer;
|
this.multiplexer = htableMultiplexer;
|
||||||
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
|
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
|
||||||
this.pool = pool;
|
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
|
||||||
|
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||||
|
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
|
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
|
||||||
this.rpcTimeOutMs =
|
|
||||||
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected LinkedBlockingQueue<PutStatus> getQueue() {
|
protected LinkedBlockingQueue<PutStatus> getQueue() {
|
||||||
|
@ -456,10 +460,11 @@ public class HTableMultiplexer {
|
||||||
// The currentPut is failed. So get the table name for the currentPut.
|
// The currentPut is failed. So get the table name for the currentPut.
|
||||||
final TableName tableName = ps.regionInfo.getTable();
|
final TableName tableName = ps.regionInfo.getTable();
|
||||||
|
|
||||||
// Wait at least RPC timeout time
|
long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
|
||||||
long delayMs = rpcTimeOutMs;
|
multiplexer.retryNum - retryCount - 1);
|
||||||
delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2,
|
if (LOG.isDebugEnabled()) {
|
||||||
multiplexer.retryNum - retryCount)));
|
LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
|
||||||
|
}
|
||||||
|
|
||||||
executor.schedule(new Runnable() {
|
executor.schedule(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -513,8 +518,8 @@ public class HTableMultiplexer {
|
||||||
Collections.singletonMap(server, actions);
|
Collections.singletonMap(server, actions);
|
||||||
try {
|
try {
|
||||||
AsyncRequestFuture arf =
|
AsyncRequestFuture arf =
|
||||||
asyncProc.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
|
ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
|
||||||
null, actionsByServer, pool);
|
null, actionsByServer, null);
|
||||||
arf.waitUntilDone();
|
arf.waitUntilDone();
|
||||||
if (arf.hasError()) {
|
if (arf.hasError()) {
|
||||||
// We just log and ignore the exception here since failed Puts will be resubmit again.
|
// We just log and ignore the exception here since failed Puts will be resubmit again.
|
||||||
|
@ -523,20 +528,20 @@ public class HTableMultiplexer {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
for (int i = 0; i < results.length; i++) {
|
for (int i = 0; i < results.length; i++) {
|
||||||
if (results[i] == null) {
|
if (results[i] instanceof Result) {
|
||||||
|
failedCount--;
|
||||||
|
} else {
|
||||||
if (failed == null) {
|
if (failed == null) {
|
||||||
failed = new ArrayList<PutStatus>();
|
failed = new ArrayList<PutStatus>();
|
||||||
}
|
}
|
||||||
failed.add(processingList.get(i));
|
failed.add(processingList.get(i));
|
||||||
} else {
|
|
||||||
failedCount--;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (failed != null) {
|
if (failed != null) {
|
||||||
// Resubmit failed puts
|
// Resubmit failed puts
|
||||||
for (PutStatus putStatus : processingList) {
|
for (PutStatus putStatus : failed) {
|
||||||
if (resubmitFailedPut(putStatus, this.addr)) {
|
if (resubmitFailedPut(putStatus, this.addr)) {
|
||||||
failedCount--;
|
failedCount--;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,115 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ LargeTests.class, ClientTests.class })
|
||||||
|
public class TestHTableMultiplexerFlushCache {
|
||||||
|
final Log LOG = LogFactory.getLog(getClass());
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||||
|
private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1");
|
||||||
|
private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2");
|
||||||
|
private static byte[] VALUE1 = Bytes.toBytes("testValue1");
|
||||||
|
private static byte[] VALUE2 = Bytes.toBytes("testValue2");
|
||||||
|
private static int SLAVES = 3;
|
||||||
|
private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws java.lang.Exception
|
||||||
|
*/
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(SLAVES);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws java.lang.Exception
|
||||||
|
*/
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality,
|
||||||
|
byte[] value) throws Exception {
|
||||||
|
// verify that the Get returns the correct result
|
||||||
|
Result r;
|
||||||
|
Get get = new Get(row);
|
||||||
|
get.addColumn(family, quality);
|
||||||
|
int nbTry = 0;
|
||||||
|
do {
|
||||||
|
assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
|
||||||
|
nbTry++;
|
||||||
|
Thread.sleep(100);
|
||||||
|
r = htable.get(get);
|
||||||
|
} while (r == null || r.getValue(family, quality) == null);
|
||||||
|
assertEquals("value", Bytes.toStringBinary(value),
|
||||||
|
Bytes.toStringBinary(r.getValue(family, quality)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOnRegionChange() throws Exception {
|
||||||
|
TableName TABLE = TableName.valueOf("testOnRegionChange");
|
||||||
|
final int NUM_REGIONS = 10;
|
||||||
|
HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
|
||||||
|
Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
|
||||||
|
|
||||||
|
HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
|
||||||
|
PER_REGIONSERVER_QUEUE_SIZE);
|
||||||
|
|
||||||
|
byte[][] startRows = htable.getStartKeys();
|
||||||
|
byte[] row = startRows[1];
|
||||||
|
assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
|
||||||
|
|
||||||
|
Put put = new Put(row).add(FAMILY, QUALIFIER1, VALUE1);
|
||||||
|
assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
|
||||||
|
|
||||||
|
checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
|
||||||
|
|
||||||
|
// Now let's shutdown the regionserver and let regions moved to other servers.
|
||||||
|
HRegionLocation loc = htable.getRegionLocation(row);
|
||||||
|
MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
hbaseCluster.stopRegionServer(loc.getServerName());
|
||||||
|
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
|
||||||
|
|
||||||
|
// put with multiplexer.
|
||||||
|
put = new Put(row).add(FAMILY, QUALIFIER2, VALUE2);
|
||||||
|
assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
|
||||||
|
|
||||||
|
checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue