From 541f8ad8a8515886351cfaa1aacd9cfed1296bf9 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sun, 17 Dec 2017 01:59:19 +0800 Subject: [PATCH] HBASE-4224 Need a flush by regionserver rather than by table option --- .../org/apache/hadoop/hbase/client/Admin.java | 7 + .../hadoop/hbase/client/AsyncAdmin.java | 6 + .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 + .../hadoop/hbase/client/HBaseAdmin.java | 35 ++-- .../hbase/client/RawAsyncHBaseAdmin.java | 49 ++++- .../hbase/client/TestFlushFromClient.java | 176 ++++++++++++++++++ 6 files changed, 254 insertions(+), 24 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 67294733002..b8546fa4533 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -825,6 +825,13 @@ public interface Admin extends Abortable, Closeable { */ void flushRegion(byte[] regionName) throws IOException; + /** + * Flush all regions on the region server. Synchronous operation. + * @param serverName the region server name to flush + * @throws IOException if a remote or network exception occurs + */ + void flushRegionServer(ServerName serverName) throws IOException; + /** * Compact a table. Asynchronous operation in that this method requests that a * Compaction run and then it returns. It does not wait on the completion of Compaction diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index a375265bf68..35cdd3fd86b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -294,6 +294,12 @@ public interface AsyncAdmin { */ CompletableFuture flushRegion(byte[] regionName); + /** + * Flush all region on the region server. + * @param serverName server to flush + */ + CompletableFuture flushRegionServer(ServerName serverName); + /** * Compact a table. When the returned CompletableFuture is done, it only means the compact request * was sent to HBase and may need some time to finish the compact operation. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index d0d19c19ef4..9b2390c3198 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -242,6 +242,11 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.flushRegion(regionName)); } + @Override + public CompletableFuture flushRegionServer(ServerName sn) { + return wrap(rawAdmin.flushRegionServer(sn)); + } + @Override public CompletableFuture compact(TableName tableName, CompactType compactType) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 1a87b48dd4f..86859847be5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1188,21 +1188,28 @@ public class HBaseAdmin implements Admin { if (regionServerPair.getSecond() == null) { throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); } - final RegionInfo hRegionInfo = regionServerPair.getFirst(); + final RegionInfo regionInfo = regionServerPair.getFirst(); ServerName serverName = regionServerPair.getSecond(); - final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); - Callable callable = new Callable() { - @Override - public Void call() throws Exception { - // TODO: There is no timeout on this controller. Set one! - HBaseRpcController controller = rpcControllerFactory.newController(); - FlushRegionRequest request = - RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); - admin.flushRegion(controller, request); - return null; - } - }; - ProtobufUtil.call(callable); + flush(this.connection.getAdmin(serverName), regionInfo); + } + + private void flush(AdminService.BlockingInterface admin, final RegionInfo info) + throws IOException { + ProtobufUtil.call(() -> { + // TODO: There is no timeout on this controller. Set one! + HBaseRpcController controller = rpcControllerFactory.newController(); + FlushRegionRequest request = + RequestConverter.buildFlushRegionRequest(info.getRegionName()); + admin.flushRegion(controller, request); + return null; + }); + } + + @Override + public void flushRegionServer(ServerName serverName) throws IOException { + for (RegionInfo region : getRegions(serverName)) { + flush(this.connection.getAdmin(serverName), region); + } } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 5a18afe7c2d..050bfe22294 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -827,25 +827,54 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .toStringBinary(regionName))); return; } - - RegionInfo regionInfo = location.getRegion(); - this. newAdminCaller() - .serverName(serverName) - .action( - (controller, stub) -> this. adminCall( - controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), - resp -> null)).call().whenComplete((ret, err2) -> { + flush(serverName, location.getRegion()) + .whenComplete((ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { future.complete(ret); } - }); + }); }); return future; } + private CompletableFuture flush(final ServerName serverName, final RegionInfo regionInfo) { + return this. newAdminCaller() + .serverName(serverName) + .action( + (controller, stub) -> this. adminCall( + controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), + resp -> null)) + .call(); + } + + @Override + public CompletableFuture flushRegionServer(ServerName sn) { + CompletableFuture future = new CompletableFuture<>(); + getRegions(sn).whenComplete((hRegionInfos, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List> compactFutures = new ArrayList<>(); + if (hRegionInfos != null) { + hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); + } + CompletableFuture + .allOf(compactFutures.toArray(new CompletableFuture[compactFutures.size()])) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + @Override public CompletableFuture compact(TableName tableName, CompactType compactType) { return compact(tableName, null, false, compactType); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java new file mode 100644 index 00000000000..9085fa5e322 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -0,0 +1,176 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({MediumTests.class, ClientTests.class}) +public class TestFlushFromClient { + private static final Log LOG = LogFactory.getLog(TestFlushFromClient.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static AsyncConnection asyncConn; + private static final byte[][] SPLITS = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("7")}; + private static final List ROWS = Arrays.asList( + Bytes.toBytes("1"), + Bytes.toBytes("4"), + Bytes.toBytes("8")); + private static final byte[] FAMILY = Bytes.toBytes("f1"); + + @Rule + public TestName name = new TestName(); + + public TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(ROWS.size()); + asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.cleanup(null, asyncConn); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + tableName = TableName.valueOf(name.getMethodName()); + try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) { + List puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); + for (int i = 0; i != 20; ++i) { + byte[] value = Bytes.toBytes(i); + puts.forEach(p -> p.addColumn(FAMILY, value, value)); + } + t.put(puts); + } + assertFalse(getRegionInfo().isEmpty()); + assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreSize() != 0)); + } + + @After + public void tearDown() throws Exception { + for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @Test + public void testFlushTable() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.flush(tableName); + assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + } + + @Test + public void testAsyncFlushTable() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + admin.flush(tableName).get(); + assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + + @Test + public void testFlushRegion() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + for (HRegion r : getRegionInfo()) { + admin.flushRegion(r.getRegionInfo().getRegionName()); + TimeUnit.SECONDS.sleep(1); + assertEquals(0, r.getMemStoreSize()); + } + } + } + + @Test + public void testAsyncFlushRegion() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + for (HRegion r : getRegionInfo()) { + admin.flushRegion(r.getRegionInfo().getRegionName()).get(); + TimeUnit.SECONDS.sleep(1); + assertEquals(0, r.getMemStoreSize()); + } + } + + @Test + public void testFlushRegionServer() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + for (HRegionServer rs : TEST_UTIL.getHBaseCluster() + .getLiveRegionServerThreads() + .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) + .collect(Collectors.toList())) { + admin.flushRegionServer(rs.getServerName()); + assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + } + } + + @Test + public void testAsyncFlushRegionServer() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + for (HRegionServer rs : TEST_UTIL.getHBaseCluster() + .getLiveRegionServerThreads() + .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) + .collect(Collectors.toList())) { + admin.flushRegionServer(rs.getServerName()).get(); + assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + } + + private List getRegionInfo() { + return TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() + .map(JVMClusterUtil.RegionServerThread::getRegionServer) + .flatMap(r -> r.getRegions().stream()) + .filter(r -> r.getTableDescriptor().getTableName().equals(tableName)) + .collect(Collectors.toList()); + } + + private List getRegionInfo(HRegionServer rs) { + return rs.getRegions().stream() + .filter(v -> v.getTableDescriptor().getTableName().equals(tableName)) + .collect(Collectors.toList()); + } +}