From a8073c4a9819953a2dd13a26bb4dd9405ac8750c Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 8 Feb 2016 14:25:37 -0500 Subject: [PATCH] HBASE-15232 Handle region location cache mgmt in AsyncProcess for multi()'s Further investigation after HBASE-15221 lead to some findings that AsyncProcess should have been managing the contents of the region location cache, appropriately clearing it when necessary (e.g. an RPC to a server fails because the server doesn't host that region) For multi() RPCs, the tableName argument is null since there is no single table that the updates are destined to. This inadvertently caused the existing region location cache updates to fail on 1.x branches. AsyncProcess needs to handle when tableName is null and perform the necessary cache evictions. As such, much of the new retry logic in HTableMultiplexer is unnecessary and is removed with this commit. Getters which were added as a part of testing were left since that are mostly harmless and should contain no negative impact. Signed-off-by: stack --- .../hadoop/hbase/client/AsyncProcess.java | 11 +- .../hbase/client/HTableMultiplexer.java | 23 ++-- .../exceptions/ClientExceptionsUtil.java | 6 +- .../client/TestHTableMultiplexerViaMocks.java | 117 ------------------ .../TestHTableMultiplexerFlushCache.java | 60 +++++++++ 5 files changed, 81 insertions(+), 136 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 4ceb89aa5a0..65c15ce90ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1360,8 +1360,15 @@ class AsyncProcess { errorsByServer.reportServerError(server); canRetry = errorsByServer.canTryMore(numAttempt); } - connection.updateCachedLocations( - tableName, region, actions.get(0).getAction().getRow(), throwable, server); + if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { + // For multi-actions, we don't have a table name, but we want to make sure to clear the + // cache in case there were location-related exceptions. We don't to clear the cache + // for every possible exception that comes through, however. + connection.clearCaches(server); + } else { + connection.updateCachedLocations( + tableName, region, actions.get(0).getAction().getRow(), throwable, server); + } failureCount += actions.size(); for (Action action : actions) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 13e9b85274e..f1bbcb37e6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -194,19 +194,6 @@ public class HTableMultiplexer { * @return true if the request can be accepted by its corresponding buffer queue. */ public boolean put(final TableName tableName, final Put put, int maxAttempts) { - return _put(tableName, put, maxAttempts, false); - } - - /** - * Internal "put" which exposes a boolean flag to control whether or not the region location - * cache should be reloaded when trying to queue the {@link Put}. - * @param tableName Destination table for the Put - * @param put The Put to send - * @param maxAttempts Number of attempts to retry the {@code put} - * @param reloadCache Should the region location cache be reloaded - * @return true if the request was accepted in the queue, otherwise false - */ - boolean _put(final TableName tableName, final Put put, int maxAttempts, boolean reloadCache) { if (maxAttempts <= 0) { return false; } @@ -215,7 +202,9 @@ public class HTableMultiplexer { HTable.validatePut(put, maxKeyValueSize); // Allow mocking to get at the connection, but don't expose the connection to users. ClusterConnection conn = (ClusterConnection) getConnection(); - HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache); + // AsyncProcess in the FlushWorker should take care of refreshing the location cache + // as necessary. We shouldn't have to do that here. + HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); if (loc != null) { // Add the put pair into its corresponding queue. LinkedBlockingQueue queue = getQueue(loc); @@ -512,12 +501,16 @@ public class HTableMultiplexer { LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); } + // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating + // the region location cache when the Put original failed with some exception. If we keep + // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff + // that we expect it to. getExecutor().schedule(new Runnable() { @Override public void run() { boolean succ = false; try { - succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true); + succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); } finally { FlushWorker.this.getRetryInQueue().decrementAndGet(); if (!succ) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index ebf14994960..1d6f5d674b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.exceptions; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.MultiActionResultTooLarge; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -40,14 +41,15 @@ public final class ClientExceptionsUtil { if (cur == null) { return true; } - return !isSpecialException(cur) || (cur instanceof RegionMovedException); + return !isSpecialException(cur) || (cur instanceof RegionMovedException) + || cur instanceof NotServingRegionException; } public static boolean isSpecialException(Throwable cur) { return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException - || cur instanceof CallQueueTooBigException); + || cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java index 38ddeb92cf1..7e68c2147e3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java @@ -16,34 +16,17 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker; -import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -52,112 +35,19 @@ import static org.mockito.Mockito.when; @Category(SmallTests.class) public class TestHTableMultiplexerViaMocks { - private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; private HTableMultiplexer mockMultiplexer; private ClusterConnection mockConnection; - private HRegionLocation mockRegionLocation; - private HRegionInfo mockRegionInfo; - - private TableName tableName; - private Put put; @Before public void setupTest() { mockMultiplexer = mock(HTableMultiplexer.class); mockConnection = mock(ClusterConnection.class); - mockRegionLocation = mock(HRegionLocation.class); - mockRegionInfo = mock(HRegionInfo.class); - - tableName = TableName.valueOf("my_table"); - put = new Put(getBytes("row1")); - put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11")); - put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12")); - put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21")); // Call the real put(TableName, Put, int) method when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod(); // Return the mocked ClusterConnection when(mockMultiplexer.getConnection()).thenReturn(mockConnection); - - // Return the regionInfo from the region location - when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo); - - // Make sure this RegionInfo points to our table - when(mockRegionInfo.getTable()).thenReturn(tableName); - } - - @Test public void useCacheOnInitialPut() throws Exception { - mockMultiplexer.put(tableName, put, NUM_RETRIES); - - verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false); - } - - @Test public void nonNullLocationQueuesPut() throws Exception { - final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); - - // Call the real method for _put(TableName, Put, int, boolean) - when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod(); - - // Return a region location - when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation); - when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue); - - assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES)); - - assertEquals(1, queue.size()); - final PutStatus ps = queue.take(); - assertEquals(put, ps.put); - assertEquals(mockRegionInfo, ps.regionInfo); - } - - @Test public void ignoreCacheOnRetriedPut() throws Exception { - FlushWorker mockFlushWorker = mock(FlushWorker.class); - ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class); - final AtomicInteger retryInQueue = new AtomicInteger(0); - final AtomicLong totalFailedPuts = new AtomicLong(0L); - final int maxRetryInQueue = 20; - final long delay = 100L; - - final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES); - - // Call the real resubmitFailedPut(PutStatus, HRegionLocation) method - when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod(); - // Succeed on the re-submit without caching - when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true); - - // Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation) - when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor); - when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay); - when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer); - when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue); - when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue); - when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts); - - // When a Runnable is scheduled, run that Runnable - when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - // Before we run this, should have one retry in progress. - assertEquals(1L, retryInQueue.get()); - - Object[] args = invocation.getArguments(); - assertEquals(3, args.length); - assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable); - Runnable runnable = (Runnable) args[0]; - runnable.run(); - return null; - } - }); - - // The put should be rescheduled - assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation)); - - verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true); - assertEquals(0L, totalFailedPuts.get()); - // Net result should be zero (added one before rerunning, subtracted one after running). - assertEquals(0L, retryInQueue.get()); } @SuppressWarnings("deprecation") @@ -183,11 +73,4 @@ public class TestHTableMultiplexerViaMocks { // We should not close it again verify(mockConnection, times(0)).close(); } - - /** - * @return UTF-8 byte representation for {@code str} - */ - private static byte[] getBytes(String str) { - return str.getBytes(UTF_8); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java index 063e376280f..05c9caa71b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java @@ -24,16 +24,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @Category({ LargeTests.class, ClientTests.class }) @@ -115,4 +120,59 @@ public class TestHTableMultiplexerFlushCache { checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); } } + + @Test + public void testOnRegionMove() throws Exception { + // This test is doing near exactly the same thing that testOnRegionChange but avoiding the + // potential to get a ConnectionClosingException. By moving the region, we can be certain that + // the connection is still valid and that the implementation is correctly handling an invalid + // Region cache (and not just tearing down the entire connection). + TableName TABLE = TableName.valueOf("testOnRegionMove"); + final int NUM_REGIONS = 10; + HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3, + Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); + + HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), + PER_REGIONSERVER_QUEUE_SIZE); + + final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE); + Pair startEndRows = regionLocator.getStartEndKeys(); + byte[] row = startEndRows.getFirst()[1]; + assertTrue("2nd region should not start with empty row", row != null && row.length > 0); + + Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); + assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put)); + + checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); + + final HRegionLocation loc = regionLocator.getRegionLocation(row); + final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); + // The current server for the region we're writing to + final ServerName originalServer = loc.getServerName(); + ServerName newServer = null; + // Find a new server to move that region to + for (int i = 0; i < SLAVES; i++) { + HRegionServer rs = hbaseCluster.getRegionServer(0); + if (!rs.getServerName().equals(originalServer.getServerName())) { + newServer = rs.getServerName(); + break; + } + } + assertNotNull("Did not find a new RegionServer to use", newServer); + + // Move the region + LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer + + " to " + newServer); + TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(newServer.getServerName())); + + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE); + + // Send a new Put + put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); + assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put)); + + // We should see the update make it to the new server eventually + checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); + } }