Revert "HBASE-26783 ScannerCallable doubly clears meta cache on retries (#4168)"

This reverts commit eca7ea3b1c.
This commit is contained in:
Andrew Purtell 2022-03-10 10:22:33 -08:00
parent eca7ea3b1c
commit 70e695ba24
5 changed files with 75 additions and 317 deletions

View File

@ -20,19 +20,21 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
@ -41,8 +43,6 @@ import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private
public class ReversedScannerCallable extends ScannerCallable {
private byte[] locationSearchKey;
/**
* @param connection
* @param tableName
@ -70,18 +70,6 @@ public class ReversedScannerCallable extends ScannerCallable {
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
}
@Override
public void throwable(Throwable t, boolean retrying) {
// for reverse scans, we need to update cache using the search key found for the reverse scan
// range in prepare. Otherwise, we will see weird behavior at the table boundaries,
// when trying to clear cache for an empty row.
if (location != null && locationSearchKey != null) {
getConnection().updateCachedLocations(getTableName(),
location.getRegionInfo().getRegionName(),
locationSearchKey, t, location.getServerName());
}
}
/**
* @param reload force reload of server location
* @throws IOException
@ -91,37 +79,34 @@ public class ReversedScannerCallable extends ScannerCallable {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
&& getConnection().isTableDisabled(getTableName())) {
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
}
if (!instantiated || reload) {
// we should use range locate if
// 1. we do not want the start row
// 2. the start row is empty which means we need to locate to the last region.
if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
// Just locate the region with the row
RegionLocations rl = getRegionLocationsForPrepare(getRow());
this.location = getLocationForReplica(rl);
this.locationSearchKey = getRow();
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
getConnection(), getTableName(), getRow());
this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
throw new IOException("Failed to find location, tableName="
+ tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
+ reload);
}
} else {
// The locateStart row is an approximation. So we need to search between
// that and the actual row in order to really find the last region
// Need to locate the regions with the range, and the target location is
// the last one which is the previous region of last region scanner
byte[] locateStartRow = createCloseRowBefore(getRow());
Pair<HRegionLocation, byte[]> lastRegionAndKey = locateLastRegionInRange(
locateStartRow, getRow());
this.location = lastRegionAndKey.getFirst();
this.locationSearchKey = lastRegionAndKey.getSecond();
List<HRegionLocation> locatedRegions = locateRegionsInRange(
locateStartRow, row, reload);
if (locatedRegions.isEmpty()) {
throw new DoNotRetryIOException(
"Does hbase:meta exist hole? Couldn't get regions for the range from "
+ Bytes.toStringBinary(locateStartRow) + " to "
+ Bytes.toStringBinary(row));
}
this.location = locatedRegions.get(locatedRegions.size() - 1);
}
if (location == null || location.getServerName() == null) {
throw new IOException("Failed to find location, tableName="
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
+ reload);
}
setStub(getConnection().getClient(getLocation().getServerName()));
checkIfRegionServerIsRemote();
instantiated = true;
@ -139,14 +124,18 @@ public class ReversedScannerCallable extends ScannerCallable {
}
/**
* Get the last region before the endkey, which will be used to execute the reverse scan
* Get the corresponding regions for an arbitrary range of keys.
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range, exclusive
* @return The last location, and the rowKey used to find it. May be null,
* if a region could not be found.
* @param reload force reload of server location
* @return A list of HRegionLocation corresponding to the regions that contain
* the specified range
* @throws IOException
*/
private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey)
throws IOException {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="I thought I'd fixed it but FB still complains; see below")
private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
byte[] endKey, boolean reload) throws IOException {
final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
@ -154,17 +143,14 @@ public class ReversedScannerCallable extends ScannerCallable {
+ Bytes.toStringBinary(startKey) + " > "
+ Bytes.toStringBinary(endKey));
}
HRegionLocation lastRegion = null;
byte[] lastFoundKey = null;
List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
byte[] currentKey = startKey;
do {
RegionLocations rl = getRegionLocationsForPrepare(currentKey);
HRegionLocation regionLocation = getLocationForReplica(rl);
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
lastFoundKey = currentKey;
lastRegion = regionLocation;
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
getConnection(), getTableName(), currentKey);
HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
regionList.add(regionLocation);
} else {
// FindBugs: NP_NULL_ON_SOME_PATH Complaining about regionLocation
throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
@ -174,8 +160,7 @@ public class ReversedScannerCallable extends ScannerCallable {
currentKey = regionLocation.getRegionInfo().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
&& (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
return new Pair<>(lastRegion, lastFoundKey);
return regionList;
}
@Override

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@ -148,32 +147,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
return controller;
}
protected final HRegionLocation getLocationForReplica(RegionLocations locs)
throws HBaseIOException {
HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null;
if (loc == null || loc.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
return loc;
}
/**
* Fetch region locations for the row. Since this is for prepare, we always useCache.
* This is because we can be sure that RpcRetryingCaller will have cleared the cache
* in error handling if this is a retry.
*
* @param row the row to look up region location for
*/
protected final RegionLocations getRegionLocationsForPrepare(byte[] row)
throws IOException {
// always use cache, because cache will have been cleared if necessary
// in the try/catch before retrying
return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(),
getTableName(), row);
}
/**
* @param reload force reload of server location
* @throws IOException
@ -183,14 +156,14 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
&& getConnection().isTableDisabled(getTableName())) {
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
id, getConnection(), getTableName(), getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
RegionLocations rl = getRegionLocationsForPrepare(getRow());
location = getLocationForReplica(rl);
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
if (!instantiated || reload) {

View File

@ -17,22 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -42,89 +32,64 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@Category({ ClientTests.class, SmallTests.class })
public class TestReversedScannerCallable {
private static final TableName TABLE_NAME = TableName.valueOf("TestReversedScannerCallable");
private static final String HOSTNAME = "localhost";
private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123);
private static final byte[] ROW = Bytes.toBytes("row1");
private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true).setReversed(true);
@Mock
private ClusterConnection connection;
@Mock
private Scan scan;
@Mock
private RpcControllerFactory rpcFactory;
@Mock
private RegionLocations regionLocations;
@Mock
private HRegionLocation regionLocation;
private final byte[] ROW = Bytes.toBytes("row1");
@Before
public void setUp() throws Exception {
when(connection.getConfiguration()).thenReturn(new Configuration());
when(regionLocations.size()).thenReturn(1);
when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
when(regionLocation.getHostname()).thenReturn(HOSTNAME);
when(regionLocation.getServerName()).thenReturn(SERVERNAME);
byte[] ROW_BEFORE = ConnectionUtils.createCloseRowBefore(ROW);
HRegionLocation regionLocation = Mockito.mock(HRegionLocation.class);
ServerName serverName = Mockito.mock(ServerName.class);
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(connection.getConfiguration()).thenReturn(new Configuration());
Mockito.when(regionLocations.size()).thenReturn(1);
Mockito.when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
Mockito.when(regionLocation.getHostname()).thenReturn("localhost");
Mockito.when(regionLocation.getRegionInfo()).thenReturn(regionInfo);
Mockito.when(regionLocation.getServerName()).thenReturn(serverName);
Mockito.when(regionInfo.containsRow(ROW_BEFORE)).thenReturn(true);
Mockito.when(scan.includeStartRow()).thenReturn(true);
Mockito.when(scan.getStartRow()).thenReturn(ROW);
}
@Test
public void testPrepareAlwaysUsesCache() throws Exception {
when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0))
.thenReturn(regionLocations);
public void testPrepareDoesNotUseCache() throws Exception {
TableName tableName = TableName.valueOf("MyTable");
Mockito.when(connection.relocateRegion(tableName, ROW, 0)).thenReturn(regionLocations);
ReversedScannerCallable callable =
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
callable.prepare(false);
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory);
callable.prepare(true);
verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0);
Mockito.verify(connection).relocateRegion(tableName, ROW, 0);
}
@Test
public void testHandleDisabledTable() throws IOException {
when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true);
ReversedScannerCallable callable =
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
try {
callable.prepare(true);
fail("should have thrown TableNotEnabledException");
} catch (TableNotEnabledException e) {
// pass
}
}
@Test
public void testUpdateSearchKeyCacheLocation() throws IOException {
byte[] regionName = HRegionInfo.createRegionName(TABLE_NAME,
ConnectionUtils.createCloseRowBefore(ConnectionUtils.MAX_BYTE_ARRAY), "123", false);
HRegionInfo mockRegionInfo = mock(HRegionInfo.class);
when(mockRegionInfo.containsRow(ConnectionUtils.MAX_BYTE_ARRAY)).thenReturn(true);
when(mockRegionInfo.getEndKey()).thenReturn(HConstants.EMPTY_END_ROW);
when(mockRegionInfo.getRegionName()).thenReturn(regionName);
when(regionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
IOException testThrowable = new IOException("test throwable");
when(connection.locateRegion(TABLE_NAME, ConnectionUtils.MAX_BYTE_ARRAY, true, true, 0))
public void testPrepareUsesCache() throws Exception {
TableName tableName = TableName.valueOf("MyTable");
Mockito.when(connection.locateRegion(tableName, ROW, true, true, 0))
.thenReturn(regionLocations);
Scan scan = new Scan().setReversed(true);
ReversedScannerCallable callable =
new ReversedScannerCallable(connection, TABLE_NAME, scan, null, rpcFactory, 0);
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory);
callable.prepare(false);
callable.throwable(testThrowable, true);
verify(connection).updateCachedLocations(TABLE_NAME, regionName,
ConnectionUtils.MAX_BYTE_ARRAY, testThrowable, SERVERNAME);
Mockito.verify(connection).locateRegion(tableName, ROW, true, true, 0);
}
}

View File

@ -1,100 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@Category({ ClientTests.class, SmallTests.class })
public class TestScannerCallable {
private static final TableName TABLE_NAME = TableName.valueOf("TestScannerCallable");
private static final String HOSTNAME = "localhost";
private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123);
private static final byte[] ROW = Bytes.toBytes("row1");
private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true);
@Mock
private ClusterConnection connection;
@Mock
private RpcControllerFactory rpcFactory;
@Mock
private RegionLocations regionLocations;
@Mock
private HRegionLocation regionLocation;
@Before
public void setUp() throws Exception {
when(connection.getConfiguration()).thenReturn(new Configuration());
when(regionLocations.size()).thenReturn(1);
when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
when(regionLocation.getHostname()).thenReturn(HOSTNAME);
when(regionLocation.getServerName()).thenReturn(SERVERNAME);
}
@Test
public void testPrepareAlwaysUsesCache() throws Exception {
when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0))
.thenReturn(regionLocations);
ScannerCallable callable =
new ScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
callable.prepare(false);
callable.prepare(true);
verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0);
}
@Test
public void testHandleDisabledTable() throws IOException {
when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true);
ScannerCallable callable =
new ScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
try {
callable.prepare(true);
fail("should have thrown TableNotEnabledException");
} catch (TableNotEnabledException e) {
// pass
}
}
}

View File

@ -22,7 +22,6 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -149,70 +148,6 @@ public class TestMetaCache {
}
}
@Test
public void testClearsCacheOnScanException() throws Exception {
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
new RoundRobinExceptionInjector());
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set("hbase.client.retries.number", "1");
try (HConnectionImplementation conn =
(HConnectionImplementation) ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TABLE_NAME)) {
byte[] row = Bytes.toBytes("row2");
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
Scan scan = new Scan();
scan.withStartRow(row);
scan.setLimit(1);
scan.setCaching(1);
populateCache(table, row);
assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
assertTrue(executeUntilCacheClearingException(table, scan));
assertNull(conn.getCachedLocation(TABLE_NAME, row));
// repopulate cache so we can test with reverse scan too
populateCache(table, row);
assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
// run with reverse scan
scan.setReversed(true);
assertTrue(executeUntilCacheClearingException(table, scan));
assertNull(conn.getCachedLocation(TABLE_NAME, row));
}
}
private void populateCache(Table table, byte[] row) {
for (int i = 0; i < 50; i++) {
try {
table.get(new Get(row));
return;
} catch (Exception e) {
// pass, we just want this to succeed so that region location will be cached
}
}
}
private boolean executeUntilCacheClearingException(Table table, Scan scan) {
for (int i = 0; i < 50; i++) {
try {
try (ResultScanner scanner = table.getScanner(scan)) {
scanner.next();
}
} catch (Exception ex) {
// Only keep track of the last exception that updated the meta cache
if (ClientExceptionsUtil.isMetaClearingException(ex)) {
return true;
}
}
}
return false;
}
@Test
public void testCacheClearingOnCallQueueTooBig() throws Exception {
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(