HBASE-14108 Procedure V2 - Administrative Task: provide an API to abort a procedure (Stephen Yuan Jiang)

This commit is contained in:
Stephen Yuan Jiang 2015-09-03 07:01:30 -07:00
parent a2dab027f6
commit 90b8a3c894
13 changed files with 1631 additions and 153 deletions

View File

@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
@ -933,6 +934,33 @@ public interface Admin extends Abortable, Closeable {
HTableDescriptor[] getTableDescriptors(List<String> names)
throws IOException;
/**
* abort a procedure
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
boolean abortProcedure(
final long procId,
final boolean mayInterruptIfRunning) throws IOException;
/**
* Abort a procedure but does not block and wait for it be completely removed.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
* or TimeoutException in case the wait timeout was not long enough to allow the
* operation to complete.
*
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
Future<Boolean> abortProcedureAsync(
final long procId,
final boolean mayInterruptIfRunning) throws IOException;
/**
* Roll the log writer. I.e. for filesystem based write ahead logs, start writing to a new file.
*

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.*;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
@ -1719,6 +1720,12 @@ class ConnectionManager {
return new MasterKeepAliveConnection() {
MasterServiceState mss = masterServiceState;
@Override
public MasterProtos.AbortProcedureResponse abortProcedure(
RpcController controller,
MasterProtos.AbortProcedureRequest request) throws ServiceException {
return stub.abortProcedure(controller, request);
}
@Override
public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
throws ServiceException {
return stub.addColumn(controller, request);

View File

@ -92,6 +92,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescripti
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
@ -282,6 +284,86 @@ public class HBaseAdmin implements Admin {
return this.aborted;
}
/**
* Abort a procedure
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
@Override
public boolean abortProcedure(
final long procId,
final boolean mayInterruptIfRunning) throws IOException {
Future<Boolean> future = abortProcedureAsync(procId, mayInterruptIfRunning);
try {
return future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when waiting for procedure to be cancelled");
} catch (TimeoutException e) {
throw new TimeoutIOException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException)e.getCause();
} else {
throw new IOException(e.getCause());
}
}
}
/**
* Abort a procedure but does not block and wait for it be completely removed.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
* or TimeoutException in case the wait timeout was not long enough to allow the
* operation to complete.
*
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
@Override
public Future<Boolean> abortProcedureAsync(
final long procId,
final boolean mayInterruptIfRunning) throws IOException {
Boolean abortProcResponse = executeCallable(
new MasterCallable<AbortProcedureResponse>(getConnection()) {
@Override
public AbortProcedureResponse call(int callTimeout) throws ServiceException {
AbortProcedureRequest abortProcRequest =
AbortProcedureRequest.newBuilder().setProcId(procId).build();
return master.abortProcedure(null,abortProcRequest);
}
}).getIsProcedureAborted();
AbortProcedureFuture abortProcFuture =
new AbortProcedureFuture(this, procId, abortProcResponse);
return abortProcFuture;
}
private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
private boolean isAbortInProgress;
public AbortProcedureFuture(
final HBaseAdmin admin,
final Long procId,
final Boolean abortProcResponse) {
super(admin, procId);
this.isAbortInProgress = abortProcResponse;
}
@Override
public Boolean get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (!this.isAbortInProgress) {
return false;
}
super.get(timeout, unit);
return true;
}
}
/** @return HConnection used by this object. */
@Override
public HConnection getConnection() {
@ -4169,6 +4251,7 @@ public class HBaseAdmin implements Admin {
private ExecutionException exception = null;
private boolean procResultFound = false;
private boolean done = false;
private boolean cancelled = false;
private V result = null;
private final HBaseAdmin admin;
@ -4181,13 +4264,39 @@ public class HBaseAdmin implements Admin {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
.setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
try {
cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
if (cancelled) {
done = true;
}
} catch (IOException e) {
// Cancell thrown exception for some reason. At this time, we are not sure whether
// the cancell succeeds or fails. We assume that it is failed, but print out a warning
// for debugging purpose.
LOG.warn(
"Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
e);
cancelled = false;
}
return cancelled;
}
@Override
public boolean isCancelled() {
// TODO: Abort not implemented yet
return false;
return cancelled;
}
protected AbortProcedureResponse abortProcedureResult(
final AbortProcedureRequest request) throws IOException {
return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
admin.getConnection()) {
@Override
public AbortProcedureResponse call(int callTimeout) throws ServiceException {
return master.abortProcedure(null, request);
}
});
}
@Override

View File

@ -696,9 +696,24 @@ public class ProcedureExecutor<TEnvironment> {
* @return true if the procedure exist and has received the abort, otherwise false.
*/
public boolean abort(final long procId) {
return abort(procId, true);
}
/**
* Send an abort notification the specified procedure.
* Depending on the procedure implementation the abort can be considered or ignored.
* @param procId the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if the procedure exist and has received the abort, otherwise false.
*/
public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
Procedure proc = procedures.get(procId);
if (proc != null) {
return proc.abort(getEnvironment());
if (!mayInterruptIfRunning && proc.wasExecuted()) {
return false;
} else {
return proc.abort(getEnvironment());
}
}
return false;
}

View File

@ -37,21 +37,18 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category(SmallTests.class)
public class TestProcedureRecovery {
private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
private static final Procedure NULL_PROC = null;
private static TestProcEnv procEnv;
private static ProcedureExecutor<TestProcEnv> procExecutor;

View File

@ -386,21 +386,21 @@ message IsMasterRunningResponse {
}
message ExecProcedureRequest {
required ProcedureDescription procedure = 1;
required ProcedureDescription procedure = 1;
}
message ExecProcedureResponse {
optional int64 expected_timeout = 1;
optional bytes return_data = 2;
optional int64 expected_timeout = 1;
optional bytes return_data = 2;
}
message IsProcedureDoneRequest {
optional ProcedureDescription procedure = 1;
optional ProcedureDescription procedure = 1;
}
message IsProcedureDoneResponse {
optional bool done = 1 [default = false];
optional ProcedureDescription snapshot = 2;
optional bool done = 1 [default = false];
optional ProcedureDescription snapshot = 2;
}
message GetProcedureResultRequest {
@ -421,6 +421,15 @@ message GetProcedureResultResponse {
optional ForeignExceptionMessage exception = 5;
}
message AbortProcedureRequest {
required uint64 proc_id = 1;
optional bool mayInterruptIfRunning = 2 [default = true];
}
message AbortProcedureResponse {
required bool is_procedure_aborted = 1;
}
message SetQuotaRequest {
optional string user_name = 1;
optional string user_group = 2;
@ -693,4 +702,8 @@ service MasterService {
/** Returns the security capabilities in effect on the cluster */
rpc getSecurityCapabilities(SecurityCapabilitiesRequest)
returns(SecurityCapabilitiesResponse);
/** Abort a procedure */
rpc AbortProcedure(AbortProcedureRequest)
returns(AbortProcedureResponse);
}

View File

@ -378,7 +378,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// should we check encryption settings at master side, default true
this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
// preload table descriptor at startup
this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
@ -2477,6 +2477,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
return descriptors;
}
@Override
public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning) {
return this.procedureExecutor.abort(procId, mayInterruptIfRunning);
}
@Override
public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
ensureNamespaceExists(name);

View File

@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
@ -1030,6 +1032,17 @@ public class MasterRpcServices extends RSRpcServices
}
}
@Override
public AbortProcedureResponse abortProcedure(
RpcController rpcController,
AbortProcedureRequest request) {
AbortProcedureResponse.Builder response = AbortProcedureResponse.newBuilder();
boolean abortResult =
master.abortProcedure(request.getProcId(), request.getMayInterruptIfRunning());
response.setIsProcedureAborted(abortResult);
return response.build();
}
@Override
public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController c,
ListNamespaceDescriptorsRequest request) throws ServiceException {

View File

@ -315,6 +315,14 @@ public interface MasterServices extends Server {
final long nonceGroup,
final long nonce) throws IOException;
/**
* Abort a procedure.
* @param procId ID of the procedure
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
*/
public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning);
/**
* Get a namespace descriptor by name
* @param name name of namespace descriptor

View File

@ -25,6 +25,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -752,4 +754,13 @@ public class TestAdmin2 {
// Current state should be the original state again
assertEquals(initialState, admin.isBalancerEnabled());
}
@Test(timeout = 30000)
public void testAbortProcedureFail() throws Exception {
Random randomGenerator = new Random();
long procId = randomGenerator.nextLong();
boolean abortResult = admin.abortProcedure(procId, true);
assertFalse(abortResult);
}
}

View File

@ -418,6 +418,11 @@ public class TestCatalogJanitor {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning) {
return false; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
return null; //To change body of implemented methods use File | Settings | File Templates.

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
@Category(MediumTests.class)
public class TestProcedureAdmin {
private static final Log LOG = LogFactory.getLog(TestProcedureAdmin.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
}
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(1);
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@Before
public void setup() throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
assertTrue("expected executor to be running", procExec.isRunning());
nonceGroup =
MasterProcedureTestingUtility.generateNonceGroup(UTIL.getHBaseCluster().getMaster());
nonce = MasterProcedureTestingUtility.generateNonce(UTIL.getHBaseCluster().getMaster());
}
@After
public void tearDown() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) {
LOG.info("Tear down, remove table=" + htd.getTableName());
UTIL.deleteTable(htd.getTableName());
}
}
@Test(timeout=60000)
public void testAbortProcedureSuccess() throws Exception {
final TableName tableName = TableName.valueOf("testAbortProcedureSuccess");
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f");
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
// Submit an abortable procedure
long procId = procExec.submitProcedure(
new DisableTableProcedure(procExec.getEnvironment(), tableName, false), nonceGroup, nonce);
boolean abortResult = procExec.abort(procId, true);
assertTrue(abortResult);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
ProcedureTestingUtility.restart(procExec);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
// Validate the disable table procedure was aborted successfully
MasterProcedureTestingUtility.validateTableIsEnabled(
UTIL.getHBaseCluster().getMaster(),
tableName);
}
@Test(timeout=60000)
public void testAbortProcedureFailure() throws Exception {
final TableName tableName = TableName.valueOf("testAbortProcedureFailure");
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
HRegionInfo[] regions =
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f");
UTIL.getHBaseAdmin().disableTable(tableName);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
// Submit an un-abortable procedure
long procId = procExec.submitProcedure(
new DeleteTableProcedure(procExec.getEnvironment(), tableName), nonceGroup, nonce);
boolean abortResult = procExec.abort(procId, true);
assertFalse(abortResult);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
ProcedureTestingUtility.restart(procExec);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
// Validate the delete table procedure was not aborted
MasterProcedureTestingUtility.validateTableDeletion(
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f");
}
@Test(timeout=60000)
public void testAbortProcedureInterruptedNotAllowed() throws Exception {
final TableName tableName = TableName.valueOf("testAbortProcedureInterruptedNotAllowed");
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
HRegionInfo[] regions =
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f");
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
// Submit a procedure
long procId = procExec.submitProcedure(
new DisableTableProcedure(procExec.getEnvironment(), tableName, true), nonceGroup, nonce);
// Wait for one step to complete
ProcedureTestingUtility.waitProcedure(procExec, procId);
// Set the mayInterruptIfRunning flag to false
boolean abortResult = procExec.abort(procId, false);
assertFalse(abortResult);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
ProcedureTestingUtility.restart(procExec);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
// Validate the delete table procedure was not aborted
MasterProcedureTestingUtility.validateTableIsDisabled(
UTIL.getHBaseCluster().getMaster(), tableName);
}
@Test(timeout=60000)
public void testAbortNonExistProcedure() throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
Random randomGenerator = new Random();
long procId;
// Generate a non-existing procedure
do {
procId = randomGenerator.nextLong();
} while (procExec.getResult(procId) != null);
boolean abortResult = procExec.abort(procId, true);
assertFalse(abortResult);
}
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
}
}