From c33ceb23d3e1b7d1b9b6d957342b5fa3aa70b29f Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 25 Feb 2019 16:01:38 +0800 Subject: [PATCH] HBASE-21943 The usage of RegionLocations.mergeRegionLocations is wrong for async client Signed-off-by: Guanghao Zhang --- .../client/AsyncNonMetaRegionLocator.java | 95 +++++++++------- .../client/AsyncRegionLocatorHelper.java | 14 --- ...AsyncTableLocateRegionForDeletedTable.java | 105 ++++++++++++++++++ 3 files changed, 159 insertions(+), 55 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 1f23a1d985b..9246adbe986 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; -import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; @@ -218,7 +217,7 @@ class AsyncNonMetaRegionLocator { if (loc1.getSeqNum() != loc2.getSeqNum()) { return false; } - if (Objects.equal(loc1.getServerName(), loc2.getServerName())) { + if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) { return false; } } @@ -226,25 +225,42 @@ class AsyncNonMetaRegionLocator { return true; } - // return whether we add this loc to cache - private boolean addToCache(TableCache tableCache, RegionLocations locs) { + // if we successfully add the locations to cache, return the locations, otherwise return the one + // which prevents us being added. The upper layer can use this value to complete pending requests. + private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) { LOG.trace("Try adding {} to cache", locs); byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey(); for (;;) { RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs); if (oldLocs == null) { - return true; + return locs; } - RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs); - if (isEqual(mergedLocs, oldLocs)) { - // the merged one is the same with the old one, give up - LOG.trace("Will not add {} to cache because the old value {} " + - " is newer than us or has the same server name." + - " Maybe it is updated before we replace it", locs, oldLocs); - return false; - } - if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { - return true; + // check whether the regions are the same, this usually happens when table is split/merged, or + // deleted and recreated again. + RegionInfo region = locs.getRegionLocation().getRegion(); + RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion(); + if (region.getEncodedName().equals(oldRegion.getEncodedName())) { + RegionLocations mergedLocs = oldLocs.mergeLocations(locs); + if (isEqual(mergedLocs, oldLocs)) { + // the merged one is the same with the old one, give up + LOG.trace("Will not add {} to cache because the old value {} " + + " is newer than us or has the same server name." + + " Maybe it is updated before we replace it", locs, oldLocs); + return oldLocs; + } + if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { + return mergedLocs; + } + } else { + // the region is different, here we trust the one we fetched. This maybe wrong but finally + // the upper layer can detect this and trigger removal of the wrong locations + if (LOG.isDebugEnabled()) { + LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," + + " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey)); + } + if (tableCache.cache.replace(startKey, oldLocs, locs)) { + return locs; + } } } } @@ -258,37 +274,35 @@ class AsyncNonMetaRegionLocator { Optional toSend = Optional.empty(); TableCache tableCache = getTableCache(tableName); if (locs != null) { - if (!addToCache(tableCache, locs)) { - // someone is ahead of us. - synchronized (tableCache) { - tableCache.pendingRequests.remove(req); - tableCache.clearCompletedRequests(Optional.empty()); - // Remove a complete locate request in a synchronized block, so the table cache must have - // quota to send a candidate request. - toSend = tableCache.getCandidate(); - toSend.ifPresent(r -> tableCache.send(r)); - } - toSend.ifPresent(r -> locateInMeta(tableName, r)); - return; + RegionLocations addedLocs = addToCache(tableCache, locs); + synchronized (tableCache) { + tableCache.pendingRequests.remove(req); + tableCache.clearCompletedRequests(Optional.of(addedLocs)); + // Remove a complete locate request in a synchronized block, so the table cache must have + // quota to send a candidate request. + toSend = tableCache.getCandidate(); + toSend.ifPresent(r -> tableCache.send(r)); } - } - synchronized (tableCache) { - tableCache.pendingRequests.remove(req); - if (error != null) { + toSend.ifPresent(r -> locateInMeta(tableName, r)); + } else { + // we meet an error + assert error != null; + synchronized (tableCache) { + tableCache.pendingRequests.remove(req); // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have // already retried several times CompletableFuture future = tableCache.allRequests.remove(req); if (future != null) { future.completeExceptionally(error); } + tableCache.clearCompletedRequests(Optional.empty()); + // Remove a complete locate request in a synchronized block, so the table cache must have + // quota to send a candidate request. + toSend = tableCache.getCandidate(); + toSend.ifPresent(r -> tableCache.send(r)); } - tableCache.clearCompletedRequests(Optional.ofNullable(locs)); - // Remove a complete locate request in a synchronized block, so the table cache must have - // quota to send a candidate request. - toSend = tableCache.getCandidate(); - toSend.ifPresent(r -> tableCache.send(r)); + toSend.ifPresent(r -> locateInMeta(tableName, r)); } - toSend.ifPresent(r -> locateInMeta(tableName, r)); } // return whether we should stop the scan @@ -443,10 +457,9 @@ class AsyncNonMetaRegionLocator { if (info == null || info.isOffline() || info.isSplitParent()) { continue; } - if (addToCache(tableCache, locs)) { - synchronized (tableCache) { - tableCache.clearCompletedRequests(Optional.of(locs)); - } + RegionLocations addedLocs = addToCache(tableCache, locs); + synchronized (tableCache) { + tableCache.clearCompletedRequests(Optional.of(addedLocs)); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java index dd516ec42e2..4dde1bbc89c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java @@ -129,20 +129,6 @@ final class AsyncRegionLocatorHelper { } } - /** - * Create a new {@link RegionLocations} which is the merging result for the given two - * {@link RegionLocations}. - *

- * All the {@link RegionLocations} in async locator related class are immutable because we want to - * access them concurrently, so here we need to create a new one, instead of calling - * {@link RegionLocations#mergeLocations(RegionLocations)} directly. - */ - static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) { - RegionLocations locs = new RegionLocations(newLocs.getRegionLocations()); - locs.mergeLocations(oldLocs); - return locs; - } - static boolean isGood(RegionLocations locs, int replicaId) { if (locs == null) { return false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java new file mode 100644 index 00000000000..6ccd9bc46f1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Fix an infinite loop in {@link AsyncNonMetaRegionLocator}, see the comments on HBASE-21943 for + * more details. + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableLocateRegionForDeletedTable { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableLocateRegionForDeletedTable.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + TEST_UTIL.getAdmin().balancerSwitch(false, true); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + assertFalse(ASYNC_CONN.isClosed()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + assertTrue(ASYNC_CONN.isClosed()); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws IOException, InterruptedException { + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, VALUE)); + } + } + TEST_UTIL.getAdmin().split(TABLE_NAME, Bytes.toBytes(50)); + TEST_UTIL.waitFor(60000, + () -> TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size() == 2); + // make sure we can access the split regions + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < 100; i++) { + assertFalse(table.get(new Get(Bytes.toBytes(i))).isEmpty()); + } + } + // let's cache the two old locations + AsyncTableRegionLocator locator = ASYNC_CONN.getRegionLocator(TABLE_NAME); + locator.getRegionLocation(Bytes.toBytes(0)).join(); + locator.getRegionLocation(Bytes.toBytes(99)).join(); + // recreate the table + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + TEST_UTIL.getAdmin().deleteTable(TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + // confirm that we can still get the correct location + assertFalse(ASYNC_CONN.getTable(TABLE_NAME).exists(new Get(Bytes.toBytes(99))).join()); + } +}