HubSpot Backport: HBASE-26783 ScannerCallable doubly clears meta cache on retries (#4147)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
35ec27d079
commit
fa033e4ac6
|
@ -24,14 +24,14 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
|
@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReversedScannerCallable extends ScannerCallable {
|
public class ReversedScannerCallable extends ScannerCallable {
|
||||||
|
|
||||||
|
private byte[] locationSearchKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param connection which connection
|
* @param connection which connection
|
||||||
* @param tableName table callable is on
|
* @param tableName table callable is on
|
||||||
|
@ -59,6 +61,18 @@ public class ReversedScannerCallable extends ScannerCallable {
|
||||||
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
|
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void throwable(Throwable t, boolean retrying) {
|
||||||
|
// for reverse scans, we need to update cache using the search key found for the reverse scan
|
||||||
|
// range in prepare. Otherwise, we will see weird behavior at the table boundaries,
|
||||||
|
// when trying to clear cache for an empty row.
|
||||||
|
if (location != null && locationSearchKey != null) {
|
||||||
|
getConnection().updateCachedLocations(getTableName(),
|
||||||
|
location.getRegionInfo().getRegionName(),
|
||||||
|
locationSearchKey, t, location.getServerName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param reload force reload of server location
|
* @param reload force reload of server location
|
||||||
*/
|
*/
|
||||||
|
@ -67,33 +81,37 @@ public class ReversedScannerCallable extends ScannerCallable {
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
throw new InterruptedIOException();
|
throw new InterruptedIOException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
|
||||||
|
&& getConnection().isTableDisabled(getTableName())) {
|
||||||
|
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
|
||||||
|
}
|
||||||
|
|
||||||
if (!instantiated || reload) {
|
if (!instantiated || reload) {
|
||||||
// we should use range locate if
|
// we should use range locate if
|
||||||
// 1. we do not want the start row
|
// 1. we do not want the start row
|
||||||
// 2. the start row is empty which means we need to locate to the last region.
|
// 2. the start row is empty which means we need to locate to the last region.
|
||||||
if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
|
if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
|
||||||
// Just locate the region with the row
|
// Just locate the region with the row
|
||||||
RegionLocations rl = getRegionLocations(reload, getRow());
|
RegionLocations rl = getRegionLocationsForPrepare(getRow());
|
||||||
this.location = getLocationForReplica(rl);
|
this.location = getLocationForReplica(rl);
|
||||||
|
this.locationSearchKey = getRow();
|
||||||
|
} else {
|
||||||
|
// The locateStart row is an approximation. So we need to search between
|
||||||
|
// that and the actual row in order to really find the last region
|
||||||
|
byte[] locateStartRow = createCloseRowBefore(getRow());
|
||||||
|
Pair<HRegionLocation, byte[]> lastRegionAndKey = locateLastRegionInRange(
|
||||||
|
locateStartRow, getRow());
|
||||||
|
this.location = lastRegionAndKey.getFirst();
|
||||||
|
this.locationSearchKey = lastRegionAndKey.getSecond();
|
||||||
|
}
|
||||||
|
|
||||||
if (location == null || location.getServerName() == null) {
|
if (location == null || location.getServerName() == null) {
|
||||||
throw new IOException("Failed to find location, tableName="
|
throw new IOException("Failed to find location, tableName="
|
||||||
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
|
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
|
||||||
+ reload);
|
+ reload);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// Need to locate the regions with the range, and the target location is
|
|
||||||
// the last one which is the previous region of last region scanner
|
|
||||||
byte[] locateStartRow = createCloseRowBefore(getRow());
|
|
||||||
List<HRegionLocation> locatedRegions = locateRegionsInRange(
|
|
||||||
locateStartRow, getRow(), reload);
|
|
||||||
if (locatedRegions.isEmpty()) {
|
|
||||||
throw new DoNotRetryIOException(
|
|
||||||
"Does hbase:meta exist hole? Couldn't get regions for the range from "
|
|
||||||
+ Bytes.toStringBinary(locateStartRow) + " to "
|
|
||||||
+ Bytes.toStringBinary(getRow()));
|
|
||||||
}
|
|
||||||
this.location = locatedRegions.get(locatedRegions.size() - 1);
|
|
||||||
}
|
|
||||||
setStub(getConnection().getClient(getLocation().getServerName()));
|
setStub(getConnection().getClient(getLocation().getServerName()));
|
||||||
checkIfRegionServerIsRemote();
|
checkIfRegionServerIsRemote();
|
||||||
instantiated = true;
|
instantiated = true;
|
||||||
|
@ -106,15 +124,14 @@ public class ReversedScannerCallable extends ScannerCallable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the corresponding regions for an arbitrary range of keys.
|
* Get the last region before the endkey, which will be used to execute the reverse scan
|
||||||
* @param startKey Starting row in range, inclusive
|
* @param startKey Starting row in range, inclusive
|
||||||
* @param endKey Ending row in range, exclusive
|
* @param endKey Ending row in range, exclusive
|
||||||
* @param reload force reload of server location
|
* @return The last location, and the rowKey used to find it. May be null,
|
||||||
* @return A list of HRegionLocation corresponding to the regions that contain
|
* if a region could not be found.
|
||||||
* the specified range
|
|
||||||
*/
|
*/
|
||||||
private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
|
private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey)
|
||||||
byte[] endKey, boolean reload) throws IOException {
|
throws IOException {
|
||||||
final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
|
final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
|
||||||
HConstants.EMPTY_END_ROW);
|
HConstants.EMPTY_END_ROW);
|
||||||
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
|
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
|
||||||
|
@ -122,13 +139,17 @@ public class ReversedScannerCallable extends ScannerCallable {
|
||||||
+ Bytes.toStringBinary(startKey) + " > "
|
+ Bytes.toStringBinary(startKey) + " > "
|
||||||
+ Bytes.toStringBinary(endKey));
|
+ Bytes.toStringBinary(endKey));
|
||||||
}
|
}
|
||||||
List<HRegionLocation> regionList = new ArrayList<>();
|
|
||||||
|
HRegionLocation lastRegion = null;
|
||||||
|
byte[] lastFoundKey = null;
|
||||||
byte[] currentKey = startKey;
|
byte[] currentKey = startKey;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
RegionLocations rl = getRegionLocations(reload, currentKey);
|
RegionLocations rl = getRegionLocationsForPrepare(currentKey);
|
||||||
HRegionLocation regionLocation = getLocationForReplica(rl);
|
HRegionLocation regionLocation = getLocationForReplica(rl);
|
||||||
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
|
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
|
||||||
regionList.add(regionLocation);
|
lastFoundKey = currentKey;
|
||||||
|
lastRegion = regionLocation;
|
||||||
} else {
|
} else {
|
||||||
throw new DoNotRetryIOException(
|
throw new DoNotRetryIOException(
|
||||||
"Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) +
|
"Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) +
|
||||||
|
@ -137,7 +158,8 @@ public class ReversedScannerCallable extends ScannerCallable {
|
||||||
currentKey = regionLocation.getRegionInfo().getEndKey();
|
currentKey = regionLocation.getRegionInfo().getEndKey();
|
||||||
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
|
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
|
||||||
&& (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
|
&& (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
|
||||||
return regionList;
|
|
||||||
|
return new Pair<>(lastRegion, lastFoundKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||||
|
@ -128,9 +129,18 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
|
||||||
return loc;
|
return loc;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final RegionLocations getRegionLocations(boolean reload, byte[] row)
|
/**
|
||||||
|
* Fetch region locations for the row. Since this is for prepare, we always useCache.
|
||||||
|
* This is because we can be sure that RpcRetryingCaller will have cleared the cache
|
||||||
|
* in error handling if this is a retry.
|
||||||
|
*
|
||||||
|
* @param row the row to look up region location for
|
||||||
|
*/
|
||||||
|
protected final RegionLocations getRegionLocationsForPrepare(byte[] row)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id, getConnection(),
|
// always use cache, because cache will have been cleared if necessary
|
||||||
|
// in the try/catch before retrying
|
||||||
|
return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(),
|
||||||
getTableName(), row);
|
getTableName(), row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +152,13 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
throw new InterruptedIOException();
|
throw new InterruptedIOException();
|
||||||
}
|
}
|
||||||
RegionLocations rl = getRegionLocations(reload, getRow());
|
|
||||||
|
if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
|
||||||
|
&& getConnection().isTableDisabled(getTableName())) {
|
||||||
|
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
|
||||||
|
}
|
||||||
|
|
||||||
|
RegionLocations rl = getRegionLocationsForPrepare(getRow());
|
||||||
location = getLocationForReplica(rl);
|
location = getLocationForReplica(rl);
|
||||||
ServerName dest = location.getServerName();
|
ServerName dest = location.getServerName();
|
||||||
setStub(super.getConnection().getClient(dest));
|
setStub(super.getConnection().getClient(dest));
|
||||||
|
|
|
@ -17,12 +17,23 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
@ -33,7 +44,6 @@ import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
@ -44,53 +54,78 @@ public class TestReversedScannerCallable {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestReversedScannerCallable.class);
|
HBaseClassTestRule.forClass(TestReversedScannerCallable.class);
|
||||||
|
|
||||||
|
private static final TableName TABLE_NAME = TableName.valueOf("TestReversedScannerCallable");
|
||||||
|
|
||||||
|
private static final String HOSTNAME = "localhost";
|
||||||
|
private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123);
|
||||||
|
private static final byte[] ROW = Bytes.toBytes("row1");
|
||||||
|
private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true).setReversed(true);
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private ClusterConnection connection;
|
private ClusterConnection connection;
|
||||||
@Mock
|
@Mock
|
||||||
private Scan scan;
|
|
||||||
@Mock
|
|
||||||
private RpcControllerFactory rpcFactory;
|
private RpcControllerFactory rpcFactory;
|
||||||
@Mock
|
@Mock
|
||||||
private RegionLocations regionLocations;
|
private RegionLocations regionLocations;
|
||||||
|
@Mock
|
||||||
private final byte[] ROW = Bytes.toBytes("row1");
|
private HRegionLocation regionLocation;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
HRegionLocation regionLocation = Mockito.mock(HRegionLocation.class);
|
when(connection.getConfiguration()).thenReturn(new Configuration());
|
||||||
ServerName serverName = Mockito.mock(ServerName.class);
|
when(regionLocations.size()).thenReturn(1);
|
||||||
|
when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
|
||||||
Mockito.when(connection.getConfiguration()).thenReturn(new Configuration());
|
when(regionLocation.getHostname()).thenReturn(HOSTNAME);
|
||||||
Mockito.when(regionLocations.size()).thenReturn(1);
|
when(regionLocation.getServerName()).thenReturn(SERVERNAME);
|
||||||
Mockito.when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
|
|
||||||
Mockito.when(regionLocation.getHostname()).thenReturn("localhost");
|
|
||||||
Mockito.when(regionLocation.getServerName()).thenReturn(serverName);
|
|
||||||
Mockito.when(scan.includeStartRow()).thenReturn(true);
|
|
||||||
Mockito.when(scan.getStartRow()).thenReturn(ROW);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPrepareDoesNotUseCache() throws Exception {
|
public void testPrepareAlwaysUsesCache() throws Exception {
|
||||||
TableName tableName = TableName.valueOf("MyTable");
|
when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0))
|
||||||
Mockito.when(connection.relocateRegion(tableName, ROW, 0)).thenReturn(regionLocations);
|
|
||||||
|
|
||||||
ReversedScannerCallable callable =
|
|
||||||
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0);
|
|
||||||
callable.prepare(true);
|
|
||||||
|
|
||||||
Mockito.verify(connection).relocateRegion(tableName, ROW, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testPrepareUsesCache() throws Exception {
|
|
||||||
TableName tableName = TableName.valueOf("MyTable");
|
|
||||||
Mockito.when(connection.locateRegion(tableName, ROW, true, true, 0))
|
|
||||||
.thenReturn(regionLocations);
|
.thenReturn(regionLocations);
|
||||||
|
|
||||||
ReversedScannerCallable callable =
|
ReversedScannerCallable callable =
|
||||||
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0);
|
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
|
||||||
|
callable.prepare(false);
|
||||||
|
callable.prepare(true);
|
||||||
|
|
||||||
|
verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandleDisabledTable() throws IOException {
|
||||||
|
when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true);
|
||||||
|
|
||||||
|
ReversedScannerCallable callable =
|
||||||
|
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
|
||||||
|
|
||||||
|
assertThrows(TableNotEnabledException.class, () -> callable.prepare(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateSearchKeyCacheLocation() throws IOException {
|
||||||
|
byte[] regionName = RegionInfo.createRegionName(TABLE_NAME,
|
||||||
|
ConnectionUtils.createCloseRowBefore(ConnectionUtils.MAX_BYTE_ARRAY), "123", false);
|
||||||
|
HRegionInfo mockRegionInfo = mock(HRegionInfo.class);
|
||||||
|
when(mockRegionInfo.containsRow(ConnectionUtils.MAX_BYTE_ARRAY)).thenReturn(true);
|
||||||
|
when(mockRegionInfo.getEndKey()).thenReturn(HConstants.EMPTY_END_ROW);
|
||||||
|
when(mockRegionInfo.getRegionName()).thenReturn(regionName);
|
||||||
|
when(regionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||||
|
|
||||||
|
IOException testThrowable = new IOException("test throwable");
|
||||||
|
|
||||||
|
when(connection.locateRegion(TABLE_NAME, ConnectionUtils.MAX_BYTE_ARRAY, true, true, 0))
|
||||||
|
.thenReturn(regionLocations);
|
||||||
|
|
||||||
|
Scan scan = new Scan().setReversed(true);
|
||||||
|
ReversedScannerCallable callable =
|
||||||
|
new ReversedScannerCallable(connection, TABLE_NAME, scan, null, rpcFactory, 0);
|
||||||
|
|
||||||
callable.prepare(false);
|
callable.prepare(false);
|
||||||
|
|
||||||
Mockito.verify(connection).locateRegion(tableName, ROW, true, true, 0);
|
callable.throwable(testThrowable, true);
|
||||||
|
|
||||||
|
verify(connection).updateCachedLocations(TABLE_NAME, regionName,
|
||||||
|
ConnectionUtils.MAX_BYTE_ARRAY, testThrowable, SERVERNAME);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
@Category({ ClientTests.class, SmallTests.class })
|
||||||
|
public class TestScannerCallable {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestScannerCallable.class);
|
||||||
|
|
||||||
|
private static final TableName TABLE_NAME = TableName.valueOf("TestScannerCallable");
|
||||||
|
|
||||||
|
private static final String HOSTNAME = "localhost";
|
||||||
|
private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123);
|
||||||
|
private static final byte[] ROW = Bytes.toBytes("row1");
|
||||||
|
private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true);
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private ClusterConnection connection;
|
||||||
|
@Mock
|
||||||
|
private RpcControllerFactory rpcFactory;
|
||||||
|
@Mock
|
||||||
|
private RegionLocations regionLocations;
|
||||||
|
@Mock
|
||||||
|
private HRegionLocation regionLocation;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
when(connection.getConfiguration()).thenReturn(new Configuration());
|
||||||
|
when(regionLocations.size()).thenReturn(1);
|
||||||
|
when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
|
||||||
|
when(regionLocation.getHostname()).thenReturn(HOSTNAME);
|
||||||
|
when(regionLocation.getServerName()).thenReturn(SERVERNAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrepareAlwaysUsesCache() throws Exception {
|
||||||
|
when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0))
|
||||||
|
.thenReturn(regionLocations);
|
||||||
|
|
||||||
|
ScannerCallable callable =
|
||||||
|
new ScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
|
||||||
|
callable.prepare(false);
|
||||||
|
callable.prepare(true);
|
||||||
|
|
||||||
|
verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandleDisabledTable() throws IOException {
|
||||||
|
when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true);
|
||||||
|
|
||||||
|
ScannerCallable callable =
|
||||||
|
new ScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
|
||||||
|
|
||||||
|
assertThrows(TableNotEnabledException.class, () -> callable.prepare(true));
|
||||||
|
}
|
||||||
|
}
|
|
@ -281,7 +281,7 @@ class FromClientSideBase {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
regions = locator.getAllRegionLocations();
|
regions = locator.getAllRegionLocations();
|
||||||
if (regions.size() > originalCount) {
|
if (regions.size() > originalCount && allRegionsHaveHostnames(regions)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -289,6 +289,18 @@ class FromClientSideBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We need to check for null serverNames due to https://issues.apache.org/jira/browse/HBASE-26790,
|
||||||
|
// because the null serverNames cause the ScannerCallable to fail.
|
||||||
|
// we can remove this check once that is resolved
|
||||||
|
private boolean allRegionsHaveHostnames(List<HRegionLocation> regions) {
|
||||||
|
for (HRegionLocation region : regions) {
|
||||||
|
if (region.getServerName() == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
protected Result getSingleScanResult(Table ht, Scan scan) throws IOException {
|
protected Result getSingleScanResult(Table ht, Scan scan) throws IOException {
|
||||||
ResultScanner scanner = ht.getScanner(scan);
|
ResultScanner scanner = ht.getScanner(scan);
|
||||||
Result result = scanner.next();
|
Result result = scanner.next();
|
||||||
|
|
|
@ -161,6 +161,70 @@ public class TestMetaCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearsCacheOnScanException() throws Exception {
|
||||||
|
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
|
||||||
|
new RoundRobinExceptionInjector());
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
conf.set("hbase.client.retries.number", "1");
|
||||||
|
|
||||||
|
try (ConnectionImplementation conn =
|
||||||
|
(ConnectionImplementation) ConnectionFactory.createConnection(conf);
|
||||||
|
Table table = conn.getTable(TABLE_NAME)) {
|
||||||
|
|
||||||
|
byte[] row = Bytes.toBytes("row2");
|
||||||
|
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
|
||||||
|
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.withStartRow(row);
|
||||||
|
scan.setLimit(1);
|
||||||
|
scan.setCaching(1);
|
||||||
|
|
||||||
|
populateCache(table, row);
|
||||||
|
assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
|
||||||
|
assertTrue(executeUntilCacheClearingException(table, scan));
|
||||||
|
assertNull(conn.getCachedLocation(TABLE_NAME, row));
|
||||||
|
|
||||||
|
// repopulate cache so we can test with reverse scan too
|
||||||
|
populateCache(table, row);
|
||||||
|
assertNotNull(conn.getCachedLocation(TABLE_NAME, row));
|
||||||
|
|
||||||
|
// run with reverse scan
|
||||||
|
scan.setReversed(true);
|
||||||
|
assertTrue(executeUntilCacheClearingException(table, scan));
|
||||||
|
assertNull(conn.getCachedLocation(TABLE_NAME, row));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void populateCache(Table table, byte[] row) {
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
try {
|
||||||
|
table.get(new Get(row));
|
||||||
|
return;
|
||||||
|
} catch (Exception e) {
|
||||||
|
// pass, we just want this to succeed so that region location will be cached
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean executeUntilCacheClearingException(Table table, Scan scan) {
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
try {
|
||||||
|
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||||
|
scanner.next();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// Only keep track of the last exception that updated the meta cache
|
||||||
|
if (ClientExceptionsUtil.isMetaClearingException(ex)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCacheClearingOnCallQueueTooBig() throws Exception {
|
public void testCacheClearingOnCallQueueTooBig() throws Exception {
|
||||||
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
|
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(
|
||||||
|
|
Loading…
Reference in New Issue