HBASE-4224 Need a flush by regionserver rather than by table option

This commit is contained in:
Chia-Ping Tsai 2017-12-17 01:59:19 +08:00
parent b1269ec57f
commit f952779ba2
6 changed files with 254 additions and 24 deletions

View File

@ -825,6 +825,13 @@ public interface Admin extends Abortable, Closeable {
*/ */
void flushRegion(byte[] regionName) throws IOException; void flushRegion(byte[] regionName) throws IOException;
/**
* Flush all regions on the region server. Synchronous operation.
* @param serverName the region server name to flush
* @throws IOException if a remote or network exception occurs
*/
void flushRegionServer(ServerName serverName) throws IOException;
/** /**
* Compact a table. Asynchronous operation in that this method requests that a * Compact a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction * Compaction run and then it returns. It does not wait on the completion of Compaction

View File

@ -294,6 +294,12 @@ public interface AsyncAdmin {
*/ */
CompletableFuture<Void> flushRegion(byte[] regionName); CompletableFuture<Void> flushRegion(byte[] regionName);
/**
* Flush all region on the region server.
* @param serverName server to flush
*/
CompletableFuture<Void> flushRegionServer(ServerName serverName);
/** /**
* Compact a table. When the returned CompletableFuture is done, it only means the compact request * Compact a table. When the returned CompletableFuture is done, it only means the compact request
* was sent to HBase and may need some time to finish the compact operation. * was sent to HBase and may need some time to finish the compact operation.

View File

@ -242,6 +242,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.flushRegion(regionName)); return wrap(rawAdmin.flushRegion(regionName));
} }
@Override
public CompletableFuture<Void> flushRegionServer(ServerName sn) {
return wrap(rawAdmin.flushRegionServer(sn));
}
@Override @Override
public CompletableFuture<Void> compact(TableName tableName, public CompletableFuture<Void> compact(TableName tableName,
CompactType compactType) { CompactType compactType) {

View File

@ -1182,21 +1182,28 @@ public class HBaseAdmin implements Admin {
if (regionServerPair.getSecond() == null) { if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
} }
final RegionInfo hRegionInfo = regionServerPair.getFirst(); final RegionInfo regionInfo = regionServerPair.getFirst();
ServerName serverName = regionServerPair.getSecond(); ServerName serverName = regionServerPair.getSecond();
final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); flush(this.connection.getAdmin(serverName), regionInfo);
Callable<Void> callable = new Callable<Void>() { }
@Override
public Void call() throws Exception { private void flush(AdminService.BlockingInterface admin, final RegionInfo info)
throws IOException {
ProtobufUtil.call(() -> {
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
HBaseRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
FlushRegionRequest request = FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); RequestConverter.buildFlushRegionRequest(info.getRegionName());
admin.flushRegion(controller, request); admin.flushRegion(controller, request);
return null; return null;
});
}
@Override
public void flushRegionServer(ServerName serverName) throws IOException {
for (RegionInfo region : getRegions(serverName)) {
flush(this.connection.getAdmin(serverName), region);
} }
};
ProtobufUtil.call(callable);
} }
/** /**

View File

@ -826,15 +826,44 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.toStringBinary(regionName))); .toStringBinary(regionName)));
return; return;
} }
flush(serverName, location.getRegion())
.whenComplete((ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
});
return future;
}
RegionInfo regionInfo = location.getRegion(); private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
this.<Void> newAdminCaller() return this.<Void> newAdminCaller()
.serverName(serverName) .serverName(serverName)
.action( .action(
(controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
.getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
resp -> null)).call().whenComplete((ret, err2) -> { resp -> null))
.call();
}
@Override
public CompletableFuture<Void> flushRegionServer(ServerName sn) {
CompletableFuture<Void> future = new CompletableFuture<>();
getRegions(sn).whenComplete((hRegionInfos, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
}
CompletableFuture
.allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
.whenComplete((ret, err2) -> {
if (err2 != null) { if (err2 != null) {
future.completeExceptionally(err2); future.completeExceptionally(err2);
} else { } else {

View File

@ -0,0 +1,176 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({MediumTests.class, ClientTests.class})
public class TestFlushFromClient {
private static final Log LOG = LogFactory.getLog(TestFlushFromClient.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncConnection asyncConn;
private static final byte[][] SPLITS = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("7")};
private static final List<byte[]> ROWS = Arrays.asList(
Bytes.toBytes("1"),
Bytes.toBytes("4"),
Bytes.toBytes("8"));
private static final byte[] FAMILY = Bytes.toBytes("f1");
@Rule
public TestName name = new TestName();
public TableName tableName;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(ROWS.size());
asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
IOUtils.cleanup(null, asyncConn);
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
tableName = TableName.valueOf(name.getMethodName());
try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) {
List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
for (int i = 0; i != 20; ++i) {
byte[] value = Bytes.toBytes(i);
puts.forEach(p -> p.addColumn(FAMILY, value, value));
}
t.put(puts);
}
assertFalse(getRegionInfo().isEmpty());
assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreSize() != 0));
}
@After
public void tearDown() throws Exception {
for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
LOG.info("Tear down, remove table=" + htd.getTableName());
TEST_UTIL.deleteTable(htd.getTableName());
}
}
@Test
public void testFlushTable() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.flush(tableName);
assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0));
}
}
@Test
public void testAsyncFlushTable() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
admin.flush(tableName).get();
assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0));
}
@Test
public void testFlushRegion() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
for (HRegion r : getRegionInfo()) {
admin.flushRegion(r.getRegionInfo().getRegionName());
TimeUnit.SECONDS.sleep(1);
assertEquals(0, r.getMemStoreSize());
}
}
}
@Test
public void testAsyncFlushRegion() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
for (HRegion r : getRegionInfo()) {
admin.flushRegion(r.getRegionInfo().getRegionName()).get();
TimeUnit.SECONDS.sleep(1);
assertEquals(0, r.getMemStoreSize());
}
}
@Test
public void testFlushRegionServer() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
for (HRegionServer rs : TEST_UTIL.getHBaseCluster()
.getLiveRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.collect(Collectors.toList())) {
admin.flushRegionServer(rs.getServerName());
assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0));
}
}
}
@Test
public void testAsyncFlushRegionServer() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
for (HRegionServer rs : TEST_UTIL.getHBaseCluster()
.getLiveRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.collect(Collectors.toList())) {
admin.flushRegionServer(rs.getServerName()).get();
assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0));
}
}
private List<HRegion> getRegionInfo() {
return TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer)
.flatMap(r -> r.getRegions().stream())
.filter(r -> r.getTableDescriptor().getTableName().equals(tableName))
.collect(Collectors.toList());
}
private List<HRegion> getRegionInfo(HRegionServer rs) {
return rs.getRegions().stream()
.filter(v -> v.getTableDescriptor().getTableName().equals(tableName))
.collect(Collectors.toList());
}
}