diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 4ceb89aa5a0..65c15ce90ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1360,8 +1360,15 @@ class AsyncProcess { errorsByServer.reportServerError(server); canRetry = errorsByServer.canTryMore(numAttempt); } - connection.updateCachedLocations( - tableName, region, actions.get(0).getAction().getRow(), throwable, server); + if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { + // For multi-actions, we don't have a table name, but we want to make sure to clear the + // cache in case there were location-related exceptions. We don't to clear the cache + // for every possible exception that comes through, however. + connection.clearCaches(server); + } else { + connection.updateCachedLocations( + tableName, region, actions.get(0).getAction().getRow(), throwable, server); + } failureCount += actions.size(); for (Action action : actions) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 13e9b85274e..f1bbcb37e6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -194,19 +194,6 @@ public class HTableMultiplexer { * @return true if the request can be accepted by its corresponding buffer queue. */ public boolean put(final TableName tableName, final Put put, int maxAttempts) { - return _put(tableName, put, maxAttempts, false); - } - - /** - * Internal "put" which exposes a boolean flag to control whether or not the region location - * cache should be reloaded when trying to queue the {@link Put}. - * @param tableName Destination table for the Put - * @param put The Put to send - * @param maxAttempts Number of attempts to retry the {@code put} - * @param reloadCache Should the region location cache be reloaded - * @return true if the request was accepted in the queue, otherwise false - */ - boolean _put(final TableName tableName, final Put put, int maxAttempts, boolean reloadCache) { if (maxAttempts <= 0) { return false; } @@ -215,7 +202,9 @@ public class HTableMultiplexer { HTable.validatePut(put, maxKeyValueSize); // Allow mocking to get at the connection, but don't expose the connection to users. ClusterConnection conn = (ClusterConnection) getConnection(); - HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache); + // AsyncProcess in the FlushWorker should take care of refreshing the location cache + // as necessary. We shouldn't have to do that here. + HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); if (loc != null) { // Add the put pair into its corresponding queue. LinkedBlockingQueue queue = getQueue(loc); @@ -512,12 +501,16 @@ public class HTableMultiplexer { LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); } + // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating + // the region location cache when the Put original failed with some exception. If we keep + // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff + // that we expect it to. getExecutor().schedule(new Runnable() { @Override public void run() { boolean succ = false; try { - succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true); + succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); } finally { FlushWorker.this.getRetryInQueue().decrementAndGet(); if (!succ) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index ebf14994960..1d6f5d674b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.exceptions; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.MultiActionResultTooLarge; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -40,14 +41,15 @@ public final class ClientExceptionsUtil { if (cur == null) { return true; } - return !isSpecialException(cur) || (cur instanceof RegionMovedException); + return !isSpecialException(cur) || (cur instanceof RegionMovedException) + || cur instanceof NotServingRegionException; } public static boolean isSpecialException(Throwable cur) { return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException - || cur instanceof CallQueueTooBigException); + || cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java index 38ddeb92cf1..7e68c2147e3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerViaMocks.java @@ -16,34 +16,17 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTableMultiplexer.FlushWorker; -import org.apache.hadoop.hbase.client.HTableMultiplexer.PutStatus; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -52,112 +35,19 @@ import static org.mockito.Mockito.when; @Category(SmallTests.class) public class TestHTableMultiplexerViaMocks { - private static final int NUM_RETRIES = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; private HTableMultiplexer mockMultiplexer; private ClusterConnection mockConnection; - private HRegionLocation mockRegionLocation; - private HRegionInfo mockRegionInfo; - - private TableName tableName; - private Put put; @Before public void setupTest() { mockMultiplexer = mock(HTableMultiplexer.class); mockConnection = mock(ClusterConnection.class); - mockRegionLocation = mock(HRegionLocation.class); - mockRegionInfo = mock(HRegionInfo.class); - - tableName = TableName.valueOf("my_table"); - put = new Put(getBytes("row1")); - put.addColumn(getBytes("f1"), getBytes("q1"), getBytes("v11")); - put.addColumn(getBytes("f1"), getBytes("q2"), getBytes("v12")); - put.addColumn(getBytes("f2"), getBytes("q1"), getBytes("v21")); // Call the real put(TableName, Put, int) method when(mockMultiplexer.put(any(TableName.class), any(Put.class), anyInt())).thenCallRealMethod(); // Return the mocked ClusterConnection when(mockMultiplexer.getConnection()).thenReturn(mockConnection); - - // Return the regionInfo from the region location - when(mockRegionLocation.getRegionInfo()).thenReturn(mockRegionInfo); - - // Make sure this RegionInfo points to our table - when(mockRegionInfo.getTable()).thenReturn(tableName); - } - - @Test public void useCacheOnInitialPut() throws Exception { - mockMultiplexer.put(tableName, put, NUM_RETRIES); - - verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES, false); - } - - @Test public void nonNullLocationQueuesPut() throws Exception { - final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); - - // Call the real method for _put(TableName, Put, int, boolean) - when(mockMultiplexer._put(any(TableName.class), any(Put.class), anyInt(), anyBoolean())).thenCallRealMethod(); - - // Return a region location - when(mockConnection.getRegionLocation(tableName, put.getRow(), false)).thenReturn(mockRegionLocation); - when(mockMultiplexer.getQueue(mockRegionLocation)).thenReturn(queue); - - assertTrue("Put should have been queued", mockMultiplexer.put(tableName, put, NUM_RETRIES)); - - assertEquals(1, queue.size()); - final PutStatus ps = queue.take(); - assertEquals(put, ps.put); - assertEquals(mockRegionInfo, ps.regionInfo); - } - - @Test public void ignoreCacheOnRetriedPut() throws Exception { - FlushWorker mockFlushWorker = mock(FlushWorker.class); - ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class); - final AtomicInteger retryInQueue = new AtomicInteger(0); - final AtomicLong totalFailedPuts = new AtomicLong(0L); - final int maxRetryInQueue = 20; - final long delay = 100L; - - final PutStatus ps = new PutStatus(mockRegionInfo, put, NUM_RETRIES); - - // Call the real resubmitFailedPut(PutStatus, HRegionLocation) method - when(mockFlushWorker.resubmitFailedPut(any(PutStatus.class), any(HRegionLocation.class))).thenCallRealMethod(); - // Succeed on the re-submit without caching - when(mockMultiplexer._put(tableName, put, NUM_RETRIES - 1, true)).thenReturn(true); - - // Stub out the getters for resubmitFailedPut(PutStatus, HRegionLocation) - when(mockFlushWorker.getExecutor()).thenReturn(mockExecutor); - when(mockFlushWorker.getNextDelay(anyInt())).thenReturn(delay); - when(mockFlushWorker.getMultiplexer()).thenReturn(mockMultiplexer); - when(mockFlushWorker.getRetryInQueue()).thenReturn(retryInQueue); - when(mockFlushWorker.getMaxRetryInQueue()).thenReturn(maxRetryInQueue); - when(mockFlushWorker.getTotalFailedPutCount()).thenReturn(totalFailedPuts); - - // When a Runnable is scheduled, run that Runnable - when(mockExecutor.schedule(any(Runnable.class), eq(delay), eq(TimeUnit.MILLISECONDS))).thenAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - // Before we run this, should have one retry in progress. - assertEquals(1L, retryInQueue.get()); - - Object[] args = invocation.getArguments(); - assertEquals(3, args.length); - assertTrue("Argument should be an instance of Runnable", args[0] instanceof Runnable); - Runnable runnable = (Runnable) args[0]; - runnable.run(); - return null; - } - }); - - // The put should be rescheduled - assertTrue("Put should have been rescheduled", mockFlushWorker.resubmitFailedPut(ps, mockRegionLocation)); - - verify(mockMultiplexer)._put(tableName, put, NUM_RETRIES - 1, true); - assertEquals(0L, totalFailedPuts.get()); - // Net result should be zero (added one before rerunning, subtracted one after running). - assertEquals(0L, retryInQueue.get()); } @SuppressWarnings("deprecation") @@ -183,11 +73,4 @@ public class TestHTableMultiplexerViaMocks { // We should not close it again verify(mockConnection, times(0)).close(); } - - /** - * @return UTF-8 byte representation for {@code str} - */ - private static byte[] getBytes(String str) { - return str.getBytes(UTF_8); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java index 063e376280f..05c9caa71b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java @@ -24,16 +24,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @Category({ LargeTests.class, ClientTests.class }) @@ -115,4 +120,59 @@ public class TestHTableMultiplexerFlushCache { checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); } } + + @Test + public void testOnRegionMove() throws Exception { + // This test is doing near exactly the same thing that testOnRegionChange but avoiding the + // potential to get a ConnectionClosingException. By moving the region, we can be certain that + // the connection is still valid and that the implementation is correctly handling an invalid + // Region cache (and not just tearing down the entire connection). + TableName TABLE = TableName.valueOf("testOnRegionMove"); + final int NUM_REGIONS = 10; + HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3, + Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); + + HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), + PER_REGIONSERVER_QUEUE_SIZE); + + final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE); + Pair startEndRows = regionLocator.getStartEndKeys(); + byte[] row = startEndRows.getFirst()[1]; + assertTrue("2nd region should not start with empty row", row != null && row.length > 0); + + Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); + assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put)); + + checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); + + final HRegionLocation loc = regionLocator.getRegionLocation(row); + final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); + // The current server for the region we're writing to + final ServerName originalServer = loc.getServerName(); + ServerName newServer = null; + // Find a new server to move that region to + for (int i = 0; i < SLAVES; i++) { + HRegionServer rs = hbaseCluster.getRegionServer(0); + if (!rs.getServerName().equals(originalServer.getServerName())) { + newServer = rs.getServerName(); + break; + } + } + assertNotNull("Did not find a new RegionServer to use", newServer); + + // Move the region + LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer + + " to " + newServer); + TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(newServer.getServerName())); + + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE); + + // Send a new Put + put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); + assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put)); + + // We should see the update make it to the new server eventually + checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); + } }