HBASE-15232 Handle region location cache mgmt in AsyncProcess for multi()'s
Further investigation after HBASE-15221 lead to some findings that AsyncProcess should have been managing the contents of the region location cache, appropriately clearing it when necessary (e.g. an RPC to a server fails because the server doesn't host that region) For multi() RPCs, the tableName argument is null since there is no single table that the updates are destined to. This inadvertently caused the existing region location cache updates to fail on 1.x branches. AsyncProcess needs to handle when tableName is null and perform the necessary cache evictions. As such, much of the new retry logic in HTableMultiplexer is unnecessary and is removed with this commit. Getters which were added as a part of testing were left since that are mostly harmless and should contain no negative impact. Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
5e501123ca
commit
a8073c4a98
|
@ -1360,8 +1360,15 @@ class AsyncProcess {
|
|||
errorsByServer.reportServerError(server);
|
||||
canRetry = errorsByServer.canTryMore(numAttempt);
|
||||
}
|
||||
if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
|
||||
// For multi-actions, we don't have a table name, but we want to make sure to clear the
|
||||
// cache in case there were location-related exceptions. We don't to clear the cache
|
||||
// for every possible exception that comes through, however.
|
||||
connection.clearCaches(server);
|
||||
} else {
|
||||
connection.updateCachedLocations(
|
||||
tableName, region, actions.get(0).getAction().getRow(), throwable, server);
|
||||
}
|
||||
failureCount += actions.size();
|
||||
|
||||
for (Action<Row> action : actions) {
|
||||
|
|
|
@ -194,19 +194,6 @@ public class HTableMultiplexer {
|
|||
* @return true if the request can be accepted by its corresponding buffer queue.
|
||||
*/
|
||||
public boolean put(final TableName tableName, final Put put, int maxAttempts) {
|
||||
return _put(tableName, put, maxAttempts, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal "put" which exposes a boolean flag to control whether or not the region location
|
||||
* cache should be reloaded when trying to queue the {@link Put}.
|
||||
* @param tableName Destination table for the Put
|
||||
* @param put The Put to send
|
||||
* @param maxAttempts Number of attempts to retry the {@code put}
|
||||
* @param reloadCache Should the region location cache be reloaded
|
||||
* @return true if the request was accepted in the queue, otherwise false
|
||||
*/
|
||||
boolean _put(final TableName tableName, final Put put, int maxAttempts, boolean reloadCache) {
|
||||
if (maxAttempts <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -215,7 +202,9 @@ public class HTableMultiplexer {
|
|||
HTable.validatePut(put, maxKeyValueSize);
|
||||
// Allow mocking to get at the connection, but don't expose the connection to users.
|
||||
ClusterConnection conn = (ClusterConnection) getConnection();
|
||||
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
|
||||
// AsyncProcess in the FlushWorker should take care of refreshing the location cache
|
||||
// as necessary. We shouldn't have to do that here.
|
||||
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
|
||||
if (loc != null) {
|
||||
// Add the put pair into its corresponding queue.
|
||||
LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
|
||||
|
@ -512,12 +501,16 @@ public class HTableMultiplexer {
|
|||
LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
|
||||
}
|
||||
|
||||
// HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
|
||||
// the region location cache when the Put original failed with some exception. If we keep
|
||||
// re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
|
||||
// that we expect it to.
|
||||
getExecutor().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean succ = false;
|
||||
try {
|
||||
succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true);
|
||||
succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
|
||||
} finally {
|
||||
FlushWorker.this.getRetryInQueue().decrementAndGet();
|
||||
if (!succ) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.exceptions;
|
|||
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.RetryImmediatelyException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -40,14 +41,15 @@ public final class ClientExceptionsUtil {
|
|||
if (cur == null) {
|
||||
return true;
|
||||
}
|
||||
return !isSpecialException(cur) || (cur instanceof RegionMovedException);
|
||||
return !isSpecialException(cur) || (cur instanceof RegionMovedException)
|
||||
|| cur instanceof NotServingRegionException;
|
||||
}
|
||||
|
||||
public static boolean isSpecialException(Throwable cur) {
|
||||
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|
||||
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|
||||
|| cur instanceof CallQueueTooBigException);
|
||||
|| cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -16,34 +16,17 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker;
|
||||
import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -52,112 +35,19 @@ import static org.mockito.Mockito.when;
|
|||
@Category(SmallTests.class)
|
||||
public class TestHTableMultiplexerViaMocks {
|
||||
|
||||
private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
|
||||
private HTableMultiplexer mockMultiplexer;
|
||||
private ClusterConnection mockConnection;
|
||||
private HRegionLocation mockRegionLocation;
|
||||
private HRegionInfo mockRegionInfo;
|
||||
|
||||
private TableName tableName;
|
||||
private Put put;
|
||||
|
||||
@Before
|
||||
public void setupTest() {
|
||||
mockMultiplexer = mock(HTableMultiplexer.class);
|
||||
mockConnection = mock(ClusterConnection.class);
|
||||
mockRegionLocation = mock(HRegionLocation.class);
|
||||
mockRegionInfo = mock(HRegionInfo.class);
|
||||
|
||||
tableName = TableName.valueOf("my_table");
|
||||
put = new Put(getBytes("row1"));
|
||||
put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11"));
|
||||
put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12"));
|
||||
put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21"));
|
||||
|
||||
// Call the real put(TableName, Put, int) method
|
||||
when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod();
|
||||
|
||||
// Return the mocked ClusterConnection
|
||||
when(mockMultiplexer.getConnection()).thenReturn(mockConnection);
|
||||
|
||||
// Return the regionInfo from the region location
|
||||
when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||
|
||||
// Make sure this RegionInfo points to our table
|
||||
when(mockRegionInfo.getTable()).thenReturn(tableName);
|
||||
}
|
||||
|
||||
@Test public void useCacheOnInitialPut() throws Exception {
|
||||
mockMultiplexer.put(tableName, put, NUM_RETRIES);
|
||||
|
||||
verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false);
|
||||
}
|
||||
|
||||
@Test public void nonNullLocationQueuesPut() throws Exception {
|
||||
final LinkedBlockingQueue<PutStatus> queue = new LinkedBlockingQueue<>();
|
||||
|
||||
// Call the real method for _put(TableName, Put, int, boolean)
|
||||
when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod();
|
||||
|
||||
// Return a region location
|
||||
when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation);
|
||||
when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue);
|
||||
|
||||
assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES));
|
||||
|
||||
assertEquals(1, queue.size());
|
||||
final PutStatus ps = queue.take();
|
||||
assertEquals(put, ps.put);
|
||||
assertEquals(mockRegionInfo, ps.regionInfo);
|
||||
}
|
||||
|
||||
@Test public void ignoreCacheOnRetriedPut() throws Exception {
|
||||
FlushWorker mockFlushWorker = mock(FlushWorker.class);
|
||||
ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
|
||||
final AtomicInteger retryInQueue = new AtomicInteger(0);
|
||||
final AtomicLong totalFailedPuts = new AtomicLong(0L);
|
||||
final int maxRetryInQueue = 20;
|
||||
final long delay = 100L;
|
||||
|
||||
final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES);
|
||||
|
||||
// Call the real resubmitFailedPut(PutStatus, HRegionLocation) method
|
||||
when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod();
|
||||
// Succeed on the re-submit without caching
|
||||
when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true);
|
||||
|
||||
// Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation)
|
||||
when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor);
|
||||
when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay);
|
||||
when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer);
|
||||
when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue);
|
||||
when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue);
|
||||
when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts);
|
||||
|
||||
// When a Runnable is scheduled, run that Runnable
|
||||
when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer(
|
||||
new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
// Before we run this, should have one retry in progress.
|
||||
assertEquals(1L, retryInQueue.get());
|
||||
|
||||
Object[] args = invocation.getArguments();
|
||||
assertEquals(3, args.length);
|
||||
assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable);
|
||||
Runnable runnable = (Runnable) args[0];
|
||||
runnable.run();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
// The put should be rescheduled
|
||||
assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation));
|
||||
|
||||
verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true);
|
||||
assertEquals(0L, totalFailedPuts.get());
|
||||
// Net result should be zero (added one before rerunning, subtracted one after running).
|
||||
assertEquals(0L, retryInQueue.get());
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
|
@ -183,11 +73,4 @@ public class TestHTableMultiplexerViaMocks {
|
|||
// We should not close it again
|
||||
verify(mockConnection, times(0)).close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return UTF-8 byte representation for {@code str}
|
||||
*/
|
||||
private static byte[] getBytes(String str) {
|
||||
return str.getBytes(UTF_8);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,16 +24,21 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category({ LargeTests.class, ClientTests.class })
|
||||
|
@ -115,4 +120,59 @@ public class TestHTableMultiplexerFlushCache {
|
|||
checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnRegionMove() throws Exception {
|
||||
// This test is doing near exactly the same thing that testOnRegionChange but avoiding the
|
||||
// potential to get a ConnectionClosingException. By moving the region, we can be certain that
|
||||
// the connection is still valid and that the implementation is correctly handling an invalid
|
||||
// Region cache (and not just tearing down the entire connection).
|
||||
TableName TABLE = TableName.valueOf("testOnRegionMove");
|
||||
final int NUM_REGIONS = 10;
|
||||
HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
|
||||
Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
|
||||
|
||||
HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
|
||||
PER_REGIONSERVER_QUEUE_SIZE);
|
||||
|
||||
final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
|
||||
Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys();
|
||||
byte[] row = startEndRows.getFirst()[1];
|
||||
assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
|
||||
|
||||
Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1);
|
||||
assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
|
||||
|
||||
checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
|
||||
|
||||
final HRegionLocation loc = regionLocator.getRegionLocation(row);
|
||||
final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
|
||||
// The current server for the region we're writing to
|
||||
final ServerName originalServer = loc.getServerName();
|
||||
ServerName newServer = null;
|
||||
// Find a new server to move that region to
|
||||
for (int i = 0; i < SLAVES; i++) {
|
||||
HRegionServer rs = hbaseCluster.getRegionServer(0);
|
||||
if (!rs.getServerName().equals(originalServer.getServerName())) {
|
||||
newServer = rs.getServerName();
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull("Did not find a new RegionServer to use", newServer);
|
||||
|
||||
// Move the region
|
||||
LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer
|
||||
+ " to " + newServer);
|
||||
TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(newServer.getServerName()));
|
||||
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
|
||||
|
||||
// Send a new Put
|
||||
put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2);
|
||||
assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
|
||||
|
||||
// We should see the update make it to the new server eventually
|
||||
checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue