HBASE-17867: Implement async procedure RPC API(list/exec/abort/isFinished)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
huzheng 2017-05-02 15:56:27 +08:00 committed by Guanghao Zhang
parent 91995749c2
commit ff998ef74f
5 changed files with 316 additions and 42 deletions

View File

@ -25,6 +25,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
@ -741,4 +742,58 @@ public interface AsyncAdmin {
*/
CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
Pattern snapshotNamePattern);
/**
* Execute a distributed procedure on a cluster.
* @param signature A distributed procedure is uniquely identified by its signature (default the
* root ZK node name of the procedure).
* @param instance The instance name of the procedure. For some procedures, this parameter is
* optional.
* @param props Property/Value pairs of properties passing to the procedure
*/
CompletableFuture<Void> execProcedure(String signature, String instance,
Map<String, String> props);
/**
* Execute a distributed procedure on a cluster.
* @param signature A distributed procedure is uniquely identified by its signature (default the
* root ZK node name of the procedure).
* @param instance The instance name of the procedure. For some procedures, this parameter is
* optional.
* @param props Property/Value pairs of properties passing to the procedure
* @return data returned after procedure execution. null if no return data.
*/
CompletableFuture<byte[]> execProcedureWithRet(String signature, String instance,
Map<String, String> props);
/**
* Check the current state of the specified procedure. There are three possible states:
* <ol>
* <li>running - returns <tt>false</tt></li>
* <li>finished - returns <tt>true</tt></li>
* <li>finished with error - throws the exception that caused the procedure to fail</li>
* </ol>
* @param signature The signature that uniquely identifies a procedure
* @param instance The instance name of the procedure
* @param props Property/Value pairs of properties passing to the procedure
* @return true if the specified procedure is finished successfully, false if it is still running.
* The value is vrapped by {@link CompletableFuture}
*/
CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
Map<String, String> props);
/**
* abort a procedure
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist. the value is
* wrapped by {@link CompletableFuture}
*/
CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning);
/**
* List procedures
* @return procedure list wrapped by {@link CompletableFuture}
*/
CompletableFuture<ProcedureInfo[]> listProcedures();
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@ -82,7 +83,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
@ -101,6 +106,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
@ -119,10 +126,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
@ -1764,6 +1775,105 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return future;
}
@Override
public CompletableFuture<Void> execProcedure(String signature, String instance,
Map<String, String> props) {
CompletableFuture<Void> future = new CompletableFuture<>();
ProcedureDescription procDesc =
ProtobufUtil.buildProcedureDescription(signature, instance, props);
this.<Long> newMasterCaller()
.action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
(s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
.call().whenComplete((expectedTimeout, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
TimerTask pollingTask = new TimerTask() {
int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;
@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
isProcedureFinished(signature, instance, props).whenComplete((done, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
if (done) {
future.complete(null);
} else {
// retry again after pauseTime.
long pauseTime = ConnectionUtils
.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
TimeUnit.MICROSECONDS);
}
});
} else {
future.completeExceptionally(new IOException("Procedure '" + signature + " : "
+ instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
}
}
};
// Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
});
return future;
}
@Override
public CompletableFuture<byte[]> execProcedureWithRet(String signature, String instance,
Map<String, String> props) {
ProcedureDescription proDesc =
ProtobufUtil.buildProcedureDescription(signature, instance, props);
return this.<byte[]> newMasterCaller()
.action(
(controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
(s, c, req, done) -> s.execProcedureWithRet(c, req, done),
resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
.call();
}
@Override
public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
Map<String, String> props) {
ProcedureDescription proDesc =
ProtobufUtil.buildProcedureDescription(signature, instance, props);
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this
.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
(s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
.call();
}
@Override
public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
return this.<Boolean> newMasterCaller().action(
(controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
(s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
.call();
}
@Override
public CompletableFuture<ProcedureInfo[]> listProcedures() {
return this.<ProcedureInfo[]> newMasterCaller()
.action((controller, stub) -> this
.<ListProceduresRequest, ListProceduresResponse, ProcedureInfo[]> call(controller, stub,
ListProceduresRequest.newBuilder().build(),
(s, c, req, done) -> s.listProcedures(c, req, done), resp -> resp.getProcedureList()
.stream().map(ProtobufUtil::toProcedureInfo).toArray(ProcedureInfo[]::new)))
.call();
}
private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this

View File

@ -2657,24 +2657,17 @@ public class HBaseAdmin implements Admin {
@Override
public byte[] execProcedureWithRet(String signature, String instance, Map<String, String> props)
throws IOException {
ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
builder.setSignature(signature).setInstance(instance);
for (Entry<String, String> entry : props.entrySet()) {
NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
.setValue(entry.getValue()).build();
builder.addConfiguration(pair);
}
final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
.setProcedure(builder.build()).build();
ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
final ExecProcedureRequest request =
ExecProcedureRequest.newBuilder().setProcedure(desc).build();
// run the procedure on the master
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
getConnection(), getRpcControllerFactory()) {
@Override
protected ExecProcedureResponse rpcCall() throws Exception {
return master.execProcedureWithRet(getRpcController(), request);
}
});
ExecProcedureResponse response = executeCallable(
new MasterCallable<ExecProcedureResponse>(getConnection(), getRpcControllerFactory()) {
@Override
protected ExecProcedureResponse rpcCall() throws Exception {
return master.execProcedureWithRet(getRpcController(), request);
}
});
return response.hasReturnData() ? response.getReturnData().toByteArray() : null;
}
@ -2682,16 +2675,9 @@ public class HBaseAdmin implements Admin {
@Override
public void execProcedure(String signature, String instance, Map<String, String> props)
throws IOException {
ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
builder.setSignature(signature).setInstance(instance);
for (Entry<String, String> entry : props.entrySet()) {
NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
.setValue(entry.getValue()).build();
builder.addConfiguration(pair);
}
final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
.setProcedure(builder.build()).build();
ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
final ExecProcedureRequest request =
ExecProcedureRequest.newBuilder().setProcedure(desc).build();
// run the procedure on the master
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
getConnection(), getRpcControllerFactory()) {
@ -2732,22 +2718,15 @@ public class HBaseAdmin implements Admin {
@Override
public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
throws IOException {
final ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
builder.setSignature(signature).setInstance(instance);
for (Entry<String, String> entry : props.entrySet()) {
NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
.setValue(entry.getValue()).build();
builder.addConfiguration(pair);
}
final ProcedureDescription desc = builder.build();
ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
return executeCallable(
new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
@Override
protected IsProcedureDoneResponse rpcCall() throws Exception {
return master.isProcedureDone(getRpcController(), IsProcedureDoneRequest
.newBuilder().setProcedure(desc).build());
}
}).getDone();
new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
@Override
protected IsProcedureDoneResponse rpcCall() throws Exception {
return master.isProcedureDone(getRpcController(),
IsProcedureDoneRequest.newBuilder().setProcedure(desc).build());
}
}).getDone();
}
/**

View File

@ -148,6 +148,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.BytesBytesP
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilySchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -3267,6 +3268,17 @@ public final class ProtobufUtil {
return builder.build();
}
public static ProcedureDescription buildProcedureDescription(String signature, String instance,
Map<String, String> props) {
ProcedureDescription.Builder builder =
ProcedureDescription.newBuilder().setSignature(signature).setInstance(instance);
if (props != null && !props.isEmpty()) {
props.entrySet().forEach(entry -> builder.addConfiguration(
NameStringPair.newBuilder().setName(entry.getKey()).setValue(entry.getValue()).build()));
}
return builder.build();
}
/**
* Get a ServerName from the passed in data bytes.
* @param data Data with a serialize server name in it; can handle the old style

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.SimpleMasterProcedureManager;
import org.apache.hadoop.hbase.procedure.SimpleRSProcedureManager;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Class to test asynchronous procedure admin operations.
*/
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PAUSE, 10);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000);
TEST_UTIL.getConfiguration().set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
SimpleMasterProcedureManager.class.getName());
TEST_UTIL.getConfiguration().set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
SimpleRSProcedureManager.class.getName());
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
TEST_UTIL.startMiniCluster(2);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@Test
public void testExecProcedure() throws Exception {
TableName tableName = TableName.valueOf("testExecProcedure");
try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("cf"));
for (int i = 0; i < 100; i++) {
Put put = new Put(Bytes.toBytes(i)).addColumn(Bytes.toBytes("cf"), null, Bytes.toBytes(i));
table.put(put);
}
// take a snapshot of the enabled table
String snapshotString = "offlineTableSnapshot";
Map<String, String> props = new HashMap<>();
props.put("table", tableName.getNameAsString());
admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, snapshotString,
props).get();
LOG.debug("Snapshot completed.");
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
@Test
public void testExecProcedureWithRet() throws Exception {
byte[] result = admin.execProcedureWithRet(SimpleMasterProcedureManager.SIMPLE_SIGNATURE,
"myTest2", new HashMap<>()).get();
assertArrayEquals("Incorrect return data from execProcedure",
SimpleMasterProcedureManager.SIMPLE_DATA.getBytes(), result);
}
@Test
public void listProcedure() throws Exception {
ProcedureInfo[] procList = admin.listProcedures().get();
assertTrue(procList.length >= 0);
}
@Test
public void isProcedureFinished() throws Exception {
boolean failed = false;
try {
admin.isProcedureFinished("fake-signature", "fake-instance", new HashMap<>()).get();
} catch (Exception e) {
failed = true;
}
Assert.assertTrue(failed);
}
@Test
public void abortProcedure() throws Exception {
Random randomGenerator = new Random();
long procId = randomGenerator.nextLong();
boolean abortResult = admin.abortProcedure(procId, true).get();
assertFalse(abortResult);
}
}