HBASE-19139 Create Async Admin methods for Clear Block Cache

This commit is contained in:
Guanghao Zhang 2018-01-08 14:56:13 +08:00
parent bc4e49ffaa
commit 895267d09c
6 changed files with 172 additions and 7 deletions

View File

@ -0,0 +1,42 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to merge CacheEvictionStats. Thread safe for concurrent accessing.
*/
@InterfaceAudience.Private
public class CacheEvictionStatsAggregator {
private final CacheEvictionStatsBuilder builder;
public CacheEvictionStatsAggregator() {
this.builder = new CacheEvictionStatsBuilder();
}
public synchronized void append(CacheEvictionStats stats) {
this.builder.append(stats);
}
public synchronized CacheEvictionStats sum() {
return this.builder.build();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcChannel;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@ -27,6 +28,8 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@ -1211,4 +1214,14 @@ public interface AsyncAdmin {
* @return - returns a list of servers that not cleared wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<List<ServerName>> clearDeadServers(final List<ServerName> servers);
/**
* Clear all the blocks corresponding to this table from BlockCache. For expert-admins. Calling
* this API will drop all the cached blocks specific to a table from BlockCache. This can
* significantly impact the query performance as the subsequent queries will have to retrieve the
* blocks from underlying filesystem.
* @param tableName table to clear block cache
* @return CacheEvictionStats related to the eviction wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<CacheEvictionStats> clearBlockCache(final TableName tableName);
}

View File

@ -27,6 +27,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@ -734,4 +736,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
return wrap(rawAdmin.clearDeadServers(servers));
}
@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
return wrap(rawAdmin.clearBlockCache(tableName));
}
}

View File

@ -42,6 +42,8 @@ import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@ -97,6 +99,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
@ -3387,4 +3391,51 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
});
return future;
}
@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
Map<ServerName, List<RegionInfo>> regionInfoByServerName =
locations.stream().filter(l -> l.getRegion() != null)
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
.collect(Collectors.groupingBy(l -> l.getServerName(),
Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
futures
.add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
aggregator.append(stats);
}
}));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((ret, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
future.complete(aggregator.sum());
}
});
});
return future;
}
private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
List<RegionInfo> hris) {
return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
.<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
(s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
.serverName(serverName).call();
}
}

View File

@ -55,8 +55,6 @@ public class TestInterfaceAlign {
adminMethodNames.remove("getConfiguration");
adminMethodNames.removeAll(getMethodNames(Abortable.class));
adminMethodNames.removeAll(getMethodNames(Closeable.class));
// TODO: Remove this after HBASE-19139
adminMethodNames.remove("clearBlockCache");
adminMethodNames.forEach(method -> {
boolean contains = asyncAdminMethodNames.contains(method);

View File

@ -19,11 +19,15 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@ -35,6 +39,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@ -44,6 +50,7 @@ import static org.junit.Assert.assertEquals;
@Category(MediumTests.class)
@RunWith(Parameterized.class)
public class TestClearRegionBlockCache {
private static final Logger LOG = LoggerFactory.getLogger(TestClearRegionBlockCache.class);
private static final TableName TABLE_NAME = TableName.valueOf("testClearRegionBlockCache");
private static final byte[] FAMILY = Bytes.toBytes("family");
private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("5") };
@ -77,6 +84,9 @@ public class TestClearRegionBlockCache {
// Create table
table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY);
HTU.loadNumericRows(table, FAMILY, 1, 10);
HTU.flush(TABLE_NAME);
}
@After
@ -86,9 +96,6 @@ public class TestClearRegionBlockCache {
@Test
public void testClearBlockCache() throws Exception {
HTU.loadNumericRows(table, FAMILY, 1, 10);
HTU.flush(TABLE_NAME);
BlockCache blockCache1 = rs1.getCacheConfig().getBlockCache();
BlockCache blockCache2 = rs2.getCacheConfig().getBlockCache();
@ -110,6 +117,53 @@ public class TestClearRegionBlockCache {
assertEquals(initialBlockCount2, blockCache2.getBlockCount());
}
@Test
public void testClearBlockCacheFromAdmin() throws Exception {
Admin admin = HTU.getAdmin();
// All RS run in a same process, so the block cache is same for rs1 and rs2
BlockCache blockCache = rs1.getCacheConfig().getBlockCache();
long initialBlockCount = blockCache.getBlockCount();
// scan will cause blocks to be added in BlockCache
scanAllRegionsForRS(rs1);
assertEquals(blockCache.getBlockCount() - initialBlockCount,
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
scanAllRegionsForRS(rs2);
assertEquals(blockCache.getBlockCount() - initialBlockCount,
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME);
assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
assertEquals(initialBlockCount, blockCache.getBlockCount());
}
@Test
public void testClearBlockCacheFromAsyncAdmin() throws Exception {
AsyncAdmin admin =
ConnectionFactory.createAsyncConnection(HTU.getConfiguration()).get().getAdmin();
// All RS run in a same process, so the block cache is same for rs1 and rs2
BlockCache blockCache = rs1.getCacheConfig().getBlockCache();
long initialBlockCount = blockCache.getBlockCount();
// scan will cause blocks to be added in BlockCache
scanAllRegionsForRS(rs1);
assertEquals(blockCache.getBlockCount() - initialBlockCount,
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
scanAllRegionsForRS(rs2);
assertEquals(blockCache.getBlockCount() - initialBlockCount,
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)
+ HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
assertEquals(initialBlockCount, blockCache.getBlockCount());
}
private void scanAllRegionsForRS(HRegionServer rs) throws IOException {
for (Region region : rs.getRegions(TABLE_NAME)) {
RegionScanner scanner = region.getScanner(new Scan());