HBASE-19139 Create Async Admin methods for Clear Block Cache
This commit is contained in:
parent
eb88b69845
commit
f458b89c05
|
@ -0,0 +1,42 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to merge CacheEvictionStats. Thread safe for concurrent accessing.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class CacheEvictionStatsAggregator {
|
||||||
|
|
||||||
|
private final CacheEvictionStatsBuilder builder;
|
||||||
|
|
||||||
|
public CacheEvictionStatsAggregator() {
|
||||||
|
this.builder = new CacheEvictionStatsBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void append(CacheEvictionStats stats) {
|
||||||
|
this.builder.append(stats);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized CacheEvictionStats sum() {
|
||||||
|
return this.builder.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import com.google.protobuf.RpcChannel;
|
import com.google.protobuf.RpcChannel;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -27,6 +28,8 @@ import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
|
@ -1211,4 +1214,14 @@ public interface AsyncAdmin {
|
||||||
* @return - returns a list of servers that not cleared wrapped by a {@link CompletableFuture}.
|
* @return - returns a list of servers that not cleared wrapped by a {@link CompletableFuture}.
|
||||||
*/
|
*/
|
||||||
CompletableFuture<List<ServerName>> clearDeadServers(final List<ServerName> servers);
|
CompletableFuture<List<ServerName>> clearDeadServers(final List<ServerName> servers);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear all the blocks corresponding to this table from BlockCache. For expert-admins. Calling
|
||||||
|
* this API will drop all the cached blocks specific to a table from BlockCache. This can
|
||||||
|
* significantly impact the query performance as the subsequent queries will have to retrieve the
|
||||||
|
* blocks from underlying filesystem.
|
||||||
|
* @param tableName table to clear block cache
|
||||||
|
* @return CacheEvictionStats related to the eviction wrapped by a {@link CompletableFuture}.
|
||||||
|
*/
|
||||||
|
CompletableFuture<CacheEvictionStats> clearBlockCache(final TableName tableName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
|
@ -734,4 +736,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
|
public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
|
||||||
return wrap(rawAdmin.clearDeadServers(servers));
|
return wrap(rawAdmin.clearDeadServers(servers));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
|
||||||
|
return wrap(rawAdmin.clearBlockCache(tableName));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,8 @@ import java.util.stream.Stream;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||||
|
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
|
import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||||
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
||||||
|
@ -98,6 +100,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
|
||||||
|
@ -3397,4 +3401,51 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
});
|
});
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
|
||||||
|
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
|
||||||
|
getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Map<ServerName, List<RegionInfo>> regionInfoByServerName =
|
||||||
|
locations.stream().filter(l -> l.getRegion() != null)
|
||||||
|
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
|
||||||
|
.collect(Collectors.groupingBy(l -> l.getServerName(),
|
||||||
|
Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
|
||||||
|
List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
|
||||||
|
CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
|
||||||
|
for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
|
||||||
|
futures
|
||||||
|
.add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
aggregator.append(stats);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
|
||||||
|
.whenComplete((ret, err3) -> {
|
||||||
|
if (err3 != null) {
|
||||||
|
future.completeExceptionally(err3);
|
||||||
|
} else {
|
||||||
|
future.complete(aggregator.sum());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
|
||||||
|
List<RegionInfo> hris) {
|
||||||
|
return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
|
||||||
|
.<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
|
||||||
|
controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
|
||||||
|
(s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
|
||||||
|
resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
|
||||||
|
.serverName(serverName).call();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,6 @@ public class TestInterfaceAlign {
|
||||||
adminMethodNames.remove("getConfiguration");
|
adminMethodNames.remove("getConfiguration");
|
||||||
adminMethodNames.removeAll(getMethodNames(Abortable.class));
|
adminMethodNames.removeAll(getMethodNames(Abortable.class));
|
||||||
adminMethodNames.removeAll(getMethodNames(Closeable.class));
|
adminMethodNames.removeAll(getMethodNames(Closeable.class));
|
||||||
// TODO: Remove this after HBASE-19139
|
|
||||||
adminMethodNames.remove("clearBlockCache");
|
|
||||||
|
|
||||||
adminMethodNames.forEach(method -> {
|
adminMethodNames.forEach(method -> {
|
||||||
boolean contains = asyncAdminMethodNames.contains(method);
|
boolean contains = asyncAdminMethodNames.contains(method);
|
||||||
|
|
|
@ -19,11 +19,15 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||||
|
@ -35,6 +39,8 @@ import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -44,6 +50,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class TestClearRegionBlockCache {
|
public class TestClearRegionBlockCache {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestClearRegionBlockCache.class);
|
||||||
private static final TableName TABLE_NAME = TableName.valueOf("testClearRegionBlockCache");
|
private static final TableName TABLE_NAME = TableName.valueOf("testClearRegionBlockCache");
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("family");
|
private static final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("5") };
|
private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("5") };
|
||||||
|
@ -77,6 +84,9 @@ public class TestClearRegionBlockCache {
|
||||||
|
|
||||||
// Create table
|
// Create table
|
||||||
table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY);
|
table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY);
|
||||||
|
|
||||||
|
HTU.loadNumericRows(table, FAMILY, 1, 10);
|
||||||
|
HTU.flush(TABLE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -86,9 +96,6 @@ public class TestClearRegionBlockCache {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClearBlockCache() throws Exception {
|
public void testClearBlockCache() throws Exception {
|
||||||
HTU.loadNumericRows(table, FAMILY, 1, 10);
|
|
||||||
HTU.flush(TABLE_NAME);
|
|
||||||
|
|
||||||
BlockCache blockCache1 = rs1.getCacheConfig().getBlockCache();
|
BlockCache blockCache1 = rs1.getCacheConfig().getBlockCache();
|
||||||
BlockCache blockCache2 = rs2.getCacheConfig().getBlockCache();
|
BlockCache blockCache2 = rs2.getCacheConfig().getBlockCache();
|
||||||
|
|
||||||
|
@ -98,18 +105,65 @@ public class TestClearRegionBlockCache {
|
||||||
// scan will cause blocks to be added in BlockCache
|
// scan will cause blocks to be added in BlockCache
|
||||||
scanAllRegionsForRS(rs1);
|
scanAllRegionsForRS(rs1);
|
||||||
assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
|
assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
|
||||||
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
|
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
|
||||||
clearRegionBlockCache(rs1);
|
clearRegionBlockCache(rs1);
|
||||||
|
|
||||||
scanAllRegionsForRS(rs2);
|
scanAllRegionsForRS(rs2);
|
||||||
assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
|
assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
|
||||||
HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
|
HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
|
||||||
clearRegionBlockCache(rs2);
|
clearRegionBlockCache(rs2);
|
||||||
|
|
||||||
assertEquals(initialBlockCount1, blockCache1.getBlockCount());
|
assertEquals(initialBlockCount1, blockCache1.getBlockCount());
|
||||||
assertEquals(initialBlockCount2, blockCache2.getBlockCount());
|
assertEquals(initialBlockCount2, blockCache2.getBlockCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearBlockCacheFromAdmin() throws Exception {
|
||||||
|
Admin admin = HTU.getAdmin();
|
||||||
|
|
||||||
|
// All RS run in a same process, so the block cache is same for rs1 and rs2
|
||||||
|
BlockCache blockCache = rs1.getCacheConfig().getBlockCache();
|
||||||
|
long initialBlockCount = blockCache.getBlockCount();
|
||||||
|
|
||||||
|
// scan will cause blocks to be added in BlockCache
|
||||||
|
scanAllRegionsForRS(rs1);
|
||||||
|
assertEquals(blockCache.getBlockCount() - initialBlockCount,
|
||||||
|
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
|
||||||
|
scanAllRegionsForRS(rs2);
|
||||||
|
assertEquals(blockCache.getBlockCount() - initialBlockCount,
|
||||||
|
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
|
||||||
|
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
|
||||||
|
|
||||||
|
CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME);
|
||||||
|
assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
|
||||||
|
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
|
||||||
|
assertEquals(initialBlockCount, blockCache.getBlockCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearBlockCacheFromAsyncAdmin() throws Exception {
|
||||||
|
AsyncAdmin admin =
|
||||||
|
ConnectionFactory.createAsyncConnection(HTU.getConfiguration()).get().getAdmin();
|
||||||
|
|
||||||
|
// All RS run in a same process, so the block cache is same for rs1 and rs2
|
||||||
|
BlockCache blockCache = rs1.getCacheConfig().getBlockCache();
|
||||||
|
long initialBlockCount = blockCache.getBlockCount();
|
||||||
|
|
||||||
|
// scan will cause blocks to be added in BlockCache
|
||||||
|
scanAllRegionsForRS(rs1);
|
||||||
|
assertEquals(blockCache.getBlockCount() - initialBlockCount,
|
||||||
|
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
|
||||||
|
scanAllRegionsForRS(rs2);
|
||||||
|
assertEquals(blockCache.getBlockCount() - initialBlockCount,
|
||||||
|
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
|
||||||
|
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
|
||||||
|
|
||||||
|
CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
|
||||||
|
assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
|
||||||
|
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
|
||||||
|
assertEquals(initialBlockCount, blockCache.getBlockCount());
|
||||||
|
}
|
||||||
|
|
||||||
private void scanAllRegionsForRS(HRegionServer rs) throws IOException {
|
private void scanAllRegionsForRS(HRegionServer rs) throws IOException {
|
||||||
for (Region region : rs.getRegions(TABLE_NAME)) {
|
for (Region region : rs.getRegions(TABLE_NAME)) {
|
||||||
RegionScanner scanner = region.getScanner(new Scan());
|
RegionScanner scanner = region.getScanner(new Scan());
|
||||||
|
|
Loading…
Reference in New Issue