HBASE-15232 Handle region location cache mgmt in AsyncProcess for multi()'s

Further investigation after HBASE-15221 lead to some findings that
AsyncProcess should have been managing the contents of the region
location cache, appropriately clearing it when necessary (e.g. an
RPC to a server fails because the server doesn't host that region)

For multi() RPCs, the tableName argument is null since there is no
single table that the updates are destined to. This inadvertently
caused the existing region location cache updates to fail on 1.x
branches. AsyncProcess needs to handle when tableName is null
and perform the necessary cache evictions.

As such, much of the new retry logic in HTableMultiplexer is
unnecessary and is removed with this commit. Getters which were
added as a part of testing were left since that are mostly
harmless and should contain no negative impact.

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Josh Elser 2016-02-08 14:25:37 -05:00 committed by stack
parent 49cb0cfccb
commit d747188f2c
5 changed files with 89 additions and 141 deletions

View File

@ -1360,8 +1360,15 @@ class AsyncProcess {
errorsByServer.reportServerError(server); errorsByServer.reportServerError(server);
canRetry = errorsByServer.canRetryMore(numAttempt); canRetry = errorsByServer.canRetryMore(numAttempt);
} }
connection.updateCachedLocations( if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
tableName, region, actions.get(0).getAction().getRow(), throwable, server); // For multi-actions, we don't have a table name, but we want to make sure to clear the
// cache in case there were location-related exceptions. We don't to clear the cache
// for every possible exception that comes through, however.
connection.clearCaches(server);
} else {
connection.updateCachedLocations(
tableName, region, actions.get(0).getAction().getRow(), throwable, server);
}
failureCount += actions.size(); failureCount += actions.size();
for (Action<Row> action : actions) { for (Action<Row> action : actions) {

View File

@ -192,21 +192,8 @@ public class HTableMultiplexer {
* Return false if the queue is already full. * Return false if the queue is already full.
* @return true if the request can be accepted by its corresponding buffer queue. * @return true if the request can be accepted by its corresponding buffer queue.
*/ */
public boolean put(final TableName tableName, final Put put, int retry) { public boolean put(final TableName tableName, final Put put, int maxAttempts) {
return _put(tableName, put, retry, false); if (maxAttempts <= 0) {
}
/**
* Internal "put" which exposes a boolean flag to control whether or not the region location
* cache should be reloaded when trying to queue the {@link Put}.
* @param tableName Destination table for the Put
* @param put The Put to send
* @param retry Number of attempts to retry the {@code put}
* @param reloadCache Should the region location cache be reloaded
* @return true if the request was accepted in the queue, otherwise false
*/
boolean _put(final TableName tableName, final Put put, int retry, boolean reloadCache) {
if (retry <= 0) {
return false; return false;
} }
@ -214,13 +201,15 @@ public class HTableMultiplexer {
HTable.validatePut(put, maxKeyValueSize); HTable.validatePut(put, maxKeyValueSize);
// Allow mocking to get at the connection, but don't expose the connection to users. // Allow mocking to get at the connection, but don't expose the connection to users.
ClusterConnection conn = (ClusterConnection) getConnection(); ClusterConnection conn = (ClusterConnection) getConnection();
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache); // AsyncProcess in the FlushWorker should take care of refreshing the location cache
// as necessary. We shouldn't have to do that here.
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
if (loc != null) { if (loc != null) {
// Add the put pair into its corresponding queue. // Add the put pair into its corresponding queue.
LinkedBlockingQueue<PutStatus> queue = getQueue(loc); LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
// Generate a MultiPutStatus object and offer it into the queue // Generate a MultiPutStatus object and offer it into the queue
PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry); PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts);
return queue.offer(s); return queue.offer(s);
} }
@ -511,12 +500,16 @@ public class HTableMultiplexer {
LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
} }
// HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
// the region location cache when the Put original failed with some exception. If we keep
// re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
// that we expect it to.
getExecutor().schedule(new Runnable() { getExecutor().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
boolean succ = false; boolean succ = false;
try { try {
succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true); succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
} finally { } finally {
FlushWorker.this.getRetryInQueue().decrementAndGet(); FlushWorker.this.getRetryInQueue().decrementAndGet();
if (!succ) { if (!succ) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.exceptions;
import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -40,14 +41,15 @@ public final class ClientExceptionsUtil {
if (cur == null) { if (cur == null) {
return true; return true;
} }
return !isSpecialException(cur) || (cur instanceof RegionMovedException); return !isSpecialException(cur) || (cur instanceof RegionMovedException)
|| cur instanceof NotServingRegionException;
} }
public static boolean isSpecialException(Throwable cur) { public static boolean isSpecialException(Throwable cur) {
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|| cur instanceof CallQueueTooBigException); || cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException);
} }

View File

@ -16,34 +16,17 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker;
import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -52,112 +35,19 @@ import static org.mockito.Mockito.when;
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestHTableMultiplexerViaMocks { public class TestHTableMultiplexerViaMocks {
private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
private HTableMultiplexer mockMultiplexer; private HTableMultiplexer mockMultiplexer;
private ClusterConnection mockConnection; private ClusterConnection mockConnection;
private HRegionLocation mockRegionLocation;
private HRegionInfo mockRegionInfo;
private TableName tableName;
private Put put;
@Before @Before
public void setupTest() { public void setupTest() {
mockMultiplexer = mock(HTableMultiplexer.class); mockMultiplexer = mock(HTableMultiplexer.class);
mockConnection = mock(ClusterConnection.class); mockConnection = mock(ClusterConnection.class);
mockRegionLocation = mock(HRegionLocation.class);
mockRegionInfo = mock(HRegionInfo.class);
tableName = TableName.valueOf("my_table");
put = new Put(getBytes("row1"));
put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11"));
put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12"));
put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21"));
// Call the real put(TableName, Put, int) method // Call the real put(TableName, Put, int) method
when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod(); when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod();
// Return the mocked ClusterConnection // Return the mocked ClusterConnection
when(mockMultiplexer.getConnection()).thenReturn(mockConnection); when(mockMultiplexer.getConnection()).thenReturn(mockConnection);
// Return the regionInfo from the region location
when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
// Make sure this RegionInfo points to our table
when(mockRegionInfo.getTable()).thenReturn(tableName);
}
@Test public void useCacheOnInitialPut() throws Exception {
mockMultiplexer.put(tableName, put, NUM_RETRIES);
verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false);
}
@Test public void nonNullLocationQueuesPut() throws Exception {
final LinkedBlockingQueue<PutStatus> queue = new LinkedBlockingQueue<>();
// Call the real method for _put(TableName, Put, int, boolean)
when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod();
// Return a region location
when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation);
when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue);
assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES));
assertEquals(1, queue.size());
final PutStatus ps = queue.take();
assertEquals(put, ps.put);
assertEquals(mockRegionInfo, ps.regionInfo);
}
@Test public void ignoreCacheOnRetriedPut() throws Exception {
FlushWorker mockFlushWorker = mock(FlushWorker.class);
ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
final AtomicInteger retryInQueue = new AtomicInteger(0);
final AtomicLong totalFailedPuts = new AtomicLong(0L);
final int maxRetryInQueue = 20;
final long delay = 100L;
final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES);
// Call the real resubmitFailedPut(PutStatus, HRegionLocation) method
when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod();
// Succeed on the re-submit without caching
when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true);
// Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation)
when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor);
when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay);
when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer);
when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue);
when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue);
when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts);
// When a Runnable is scheduled, run that Runnable
when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
// Before we run this, should have one retry in progress.
assertEquals(1L, retryInQueue.get());
Object[] args = invocation.getArguments();
assertEquals(3, args.length);
assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable);
Runnable runnable = (Runnable) args[0];
runnable.run();
return null;
}
});
// The put should be rescheduled
assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation));
verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true);
assertEquals(0L, totalFailedPuts.get());
// Net result should be zero (added one before rerunning, subtracted one after running).
assertEquals(0L, retryInQueue.get());
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -183,11 +73,4 @@ public class TestHTableMultiplexerViaMocks {
// We should not close it again // We should not close it again
verify(mockConnection, times(0)).close(); verify(mockConnection, times(0)).close();
} }
/**
* @return UTF-8 byte representation for {@code str}
*/
private static byte[] getBytes(String str) {
return str.getBytes(UTF_8);
}
} }

View File

@ -26,16 +26,24 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category(LargeTests.class) import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@Category({ LargeTests.class, ClientTests.class })
public class TestHTableMultiplexerFlushCache { public class TestHTableMultiplexerFlushCache {
private static final Log LOG = LogFactory.getLog(TestHTableMultiplexerFlushCache.class); private static final Log LOG = LogFactory.getLog(TestHTableMultiplexerFlushCache.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -111,4 +119,59 @@ public class TestHTableMultiplexerFlushCache {
checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
} }
@Test
public void testOnRegionMove() throws Exception {
// This test is doing near exactly the same thing that testOnRegionChange but avoiding the
// potential to get a ConnectionClosingException. By moving the region, we can be certain that
// the connection is still valid and that the implementation is correctly handling an invalid
// Region cache (and not just tearing down the entire connection).
TableName TABLE = TableName.valueOf("testOnRegionMove");
final int NUM_REGIONS = 10;
HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
PER_REGIONSERVER_QUEUE_SIZE);
final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys();
byte[] row = startEndRows.getFirst()[1];
assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1);
assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
final HRegionLocation loc = regionLocator.getRegionLocation(row);
final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
// The current server for the region we're writing to
final ServerName originalServer = loc.getServerName();
ServerName newServer = null;
// Find a new server to move that region to
for (int i = 0; i < SLAVES; i++) {
HRegionServer rs = hbaseCluster.getRegionServer(0);
if (!rs.getServerName().equals(originalServer.getServerName())) {
newServer = rs.getServerName();
break;
}
}
assertNotNull("Did not find a new RegionServer to use", newServer);
// Move the region
LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer
+ " to " + newServer);
TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
Bytes.toBytes(newServer.getServerName()));
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
// Send a new Put
put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2);
assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
// We should see the update make it to the new server eventually
checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
}
} }