HBASE-14108 Procedure V2 Administrative Task: provide an API to abort a procedure (Stephen Yuan Jiang)

This commit is contained in:
Stephen Yuan Jiang 2015-09-02 19:18:46 -07:00
parent a5261b6f44
commit 3341f13e71
13 changed files with 1652 additions and 166 deletions

View File

@ -257,7 +257,8 @@ public interface Admin extends Abortable, Closeable {
* @return the result of the async creation. You can use Future.get(long, TimeUnit)
* to wait on the operation to complete.
*/
Future<Void> createTableAsync(final HTableDescriptor desc, final byte[][] splitKeys) throws IOException;
Future<Void> createTableAsync(final HTableDescriptor desc, final byte[][] splitKeys)
throws IOException;
/**
* Deletes a table. Synchronous operation.
@ -1016,6 +1017,33 @@ public interface Admin extends Abortable, Closeable {
HTableDescriptor[] getTableDescriptors(List<String> names)
throws IOException;
/**
* abort a procedure
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
boolean abortProcedure(
final long procId,
final boolean mayInterruptIfRunning) throws IOException;
/**
* Abort a procedure but does not block and wait for it be completely removed.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
* or TimeoutException in case the wait timeout was not long enough to allow the
* operation to complete.
*
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
Future<Boolean> abortProcedureAsync(
final long procId,
final boolean mayInterruptIfRunning) throws IOException;
/**
* Roll the log writer. I.e. for filesystem based write ahead logs, start writing to a new file.
*

View File

@ -1295,7 +1295,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (isDeadServer(sn)) {
throw new RegionServerStoppedException(sn + " is dead.");
}
String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn.getHostname(),
String key = getStubKey(
ClientProtos.ClientService.BlockingInterface.class.getName(), sn.getHostname(),
sn.getPort());
this.connectionLock.putIfAbsent(key, key);
ClientProtos.ClientService.BlockingInterface stub = null;
@ -1409,9 +1410,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
return new MasterKeepAliveConnection() {
MasterServiceState mss = masterServiceState;
@Override
public MasterProtos.AddColumnResponse addColumn(RpcController controller, MasterProtos.AddColumnRequest request)
throws ServiceException {
public MasterProtos.AbortProcedureResponse abortProcedure(
RpcController controller,
MasterProtos.AbortProcedureRequest request) throws ServiceException {
return stub.abortProcedure(controller, request);
}
@Override
public MasterProtos.AddColumnResponse addColumn(
RpcController controller,
MasterProtos.AddColumnRequest request) throws ServiceException {
return stub.addColumn(controller, request);
}
@ -1629,24 +1639,28 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public MasterProtos.CreateNamespaceResponse createNamespace(
RpcController controller, MasterProtos.CreateNamespaceRequest request) throws ServiceException {
RpcController controller,
MasterProtos.CreateNamespaceRequest request) throws ServiceException {
return stub.createNamespace(controller, request);
}
@Override
public MasterProtos.DeleteNamespaceResponse deleteNamespace(
RpcController controller, MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
RpcController controller,
MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
return stub.deleteNamespace(controller, request);
}
@Override
public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
RpcController controller,
MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
return stub.getNamespaceDescriptor(controller, request);
}
@Override
public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
RpcController controller,
MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
return stub.listNamespaceDescriptors(controller, request);
}
@ -2100,7 +2114,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* point, which would be the case if all of its consumers close the
* connection. However, on the off chance that someone is unable to close
* the connection, perhaps because it bailed out prematurely, the method
* below will ensure that this {@link org.apache.hadoop.hbase.client.HConnection} instance is cleaned up.
* below will ensure that this {@link org.apache.hadoop.hbase.client.HConnection} instance
* is cleaned up.
* Caveat: The JVM may take an unknown amount of time to call finalize on an
* unreachable object, so our hope is that every consumer cleans up after
* itself, like any good citizen.

View File

@ -88,6 +88,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescripti
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
@ -279,6 +281,86 @@ public class HBaseAdmin implements Admin {
return this.aborted;
}
/**
* Abort a procedure
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
@Override
public boolean abortProcedure(
final long procId,
final boolean mayInterruptIfRunning) throws IOException {
Future<Boolean> future = abortProcedureAsync(procId, mayInterruptIfRunning);
try {
return future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when waiting for procedure to be cancelled");
} catch (TimeoutException e) {
throw new TimeoutIOException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException)e.getCause();
} else {
throw new IOException(e.getCause());
}
}
}
/**
* Abort a procedure but does not block and wait for it be completely removed.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
* or TimeoutException in case the wait timeout was not long enough to allow the
* operation to complete.
*
* @param procId ID of the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
* @throws IOException
*/
@Override
public Future<Boolean> abortProcedureAsync(
final long procId,
final boolean mayInterruptIfRunning) throws IOException {
Boolean abortProcResponse = executeCallable(
new MasterCallable<AbortProcedureResponse>(getConnection()) {
@Override
public AbortProcedureResponse call(int callTimeout) throws ServiceException {
AbortProcedureRequest abortProcRequest =
AbortProcedureRequest.newBuilder().setProcId(procId).build();
return master.abortProcedure(null,abortProcRequest);
}
}).getIsProcedureAborted();
AbortProcedureFuture abortProcFuture =
new AbortProcedureFuture(this, procId, abortProcResponse);
return abortProcFuture;
}
private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
private boolean isAbortInProgress;
public AbortProcedureFuture(
final HBaseAdmin admin,
final Long procId,
final Boolean abortProcResponse) {
super(admin, procId);
this.isAbortInProgress = abortProcResponse;
}
@Override
public Boolean get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (!this.isAbortInProgress) {
return false;
}
super.get(timeout, unit);
return true;
}
}
/** @return HConnection used by this object. */
@Override
public HConnection getConnection() {
@ -4257,6 +4339,7 @@ public class HBaseAdmin implements Admin {
private ExecutionException exception = null;
private boolean procResultFound = false;
private boolean done = false;
private boolean cancelled = false;
private V result = null;
private final HBaseAdmin admin;
@ -4269,13 +4352,39 @@ public class HBaseAdmin implements Admin {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
.setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
try {
cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
if (cancelled) {
done = true;
}
} catch (IOException e) {
// Cancell thrown exception for some reason. At this time, we are not sure whether
// the cancell succeeds or fails. We assume that it is failed, but print out a warning
// for debugging purpose.
LOG.warn(
"Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
e);
cancelled = false;
}
return cancelled;
}
@Override
public boolean isCancelled() {
// TODO: Abort not implemented yet
return false;
return cancelled;
}
protected AbortProcedureResponse abortProcedureResult(
final AbortProcedureRequest request) throws IOException {
return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
admin.getConnection()) {
@Override
public AbortProcedureResponse call(int callTimeout) throws ServiceException {
return master.abortProcedure(null, request);
}
});
}
@Override

View File

@ -696,10 +696,25 @@ public class ProcedureExecutor<TEnvironment> {
* @return true if the procedure exist and has received the abort, otherwise false.
*/
public boolean abort(final long procId) {
return abort(procId, true);
}
/**
* Send an abort notification the specified procedure.
* Depending on the procedure implementation the abort can be considered or ignored.
* @param procId the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if the procedure exist and has received the abort, otherwise false.
*/
public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
Procedure proc = procedures.get(procId);
if (proc != null) {
if (!mayInterruptIfRunning && proc.wasExecuted()) {
return false;
} else {
return proc.abort(getEnvironment());
}
}
return false;
}

View File

@ -38,21 +38,18 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({MasterTests.class, SmallTests.class})
public class TestProcedureRecovery {
private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
private static final Procedure NULL_PROC = null;
private static TestProcEnv procEnv;
private static ProcedureExecutor<TestProcEnv> procExecutor;

View File

@ -431,6 +431,15 @@ message GetProcedureResultResponse {
optional ForeignExceptionMessage exception = 5;
}
message AbortProcedureRequest {
required uint64 proc_id = 1;
optional bool mayInterruptIfRunning = 2 [default = true];
}
message AbortProcedureResponse {
required bool is_procedure_aborted = 1;
}
message SetQuotaRequest {
optional string user_name = 1;
optional string user_group = 2;
@ -707,4 +716,8 @@ service MasterService {
/** Returns the security capabilities in effect on the cluster */
rpc getSecurityCapabilities(SecurityCapabilitiesRequest)
returns(SecurityCapabilitiesResponse);
/** Abort a procedure */
rpc AbortProcedure(AbortProcedureRequest)
returns(AbortProcedureResponse);
}

View File

@ -2480,6 +2480,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
return descriptors;
}
@Override
public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning) {
return this.procedureExecutor.abort(procId, mayInterruptIfRunning);
}
@Override
public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
ensureNamespaceExists(name);

View File

@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
@ -1060,6 +1062,17 @@ public class MasterRpcServices extends RSRpcServices
}
}
@Override
public AbortProcedureResponse abortProcedure(
RpcController rpcController,
AbortProcedureRequest request) {
AbortProcedureResponse.Builder response = AbortProcedureResponse.newBuilder();
boolean abortResult =
master.abortProcedure(request.getProcId(), request.getMayInterruptIfRunning());
response.setIsProcedureAborted(abortResult);
return response.build();
}
@Override
public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController c,
ListNamespaceDescriptorsRequest request) throws ServiceException {

View File

@ -320,6 +320,14 @@ public interface MasterServices extends Server {
final long nonceGroup,
final long nonce) throws IOException;
/**
* Abort a procedure.
* @param procId ID of the procedure
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if aborted, false if procedure already completed or does not exist
*/
public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning);
/**
* Get a namespace descriptor by name
* @param name name of namespace descriptor

View File

@ -25,6 +25,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -727,4 +729,13 @@ public class TestAdmin2 {
// Current state should be the original state again
assertEquals(initialState, admin.isBalancerEnabled());
}
@Test(timeout = 30000)
public void testAbortProcedureFail() throws Exception {
Random randomGenerator = new Random();
long procId = randomGenerator.nextLong();
boolean abortResult = admin.abortProcedure(procId, true);
assertFalse(abortResult);
}
}

View File

@ -436,6 +436,11 @@ public class TestCatalogJanitor {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning) {
return false; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
return null; //To change body of implemented methods use File | Settings | File Templates.

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
@Category({MasterTests.class, MediumTests.class})
public class TestProcedureAdmin {
private static final Log LOG = LogFactory.getLog(TestProcedureAdmin.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
}
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(1);
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@Before
public void setup() throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
assertTrue("expected executor to be running", procExec.isRunning());
nonceGroup =
MasterProcedureTestingUtility.generateNonceGroup(UTIL.getHBaseCluster().getMaster());
nonce = MasterProcedureTestingUtility.generateNonce(UTIL.getHBaseCluster().getMaster());
}
@After
public void tearDown() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) {
LOG.info("Tear down, remove table=" + htd.getTableName());
UTIL.deleteTable(htd.getTableName());
}
}
@Test(timeout=60000)
public void testAbortProcedureSuccess() throws Exception {
final TableName tableName = TableName.valueOf("testAbortProcedureSuccess");
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f");
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
// Submit an abortable procedure
long procId = procExec.submitProcedure(
new DisableTableProcedure(procExec.getEnvironment(), tableName, false), nonceGroup, nonce);
boolean abortResult = procExec.abort(procId, true);
assertTrue(abortResult);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
ProcedureTestingUtility.restart(procExec);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
// Validate the disable table procedure was aborted successfully
MasterProcedureTestingUtility.validateTableIsEnabled(
UTIL.getHBaseCluster().getMaster(),
tableName);
}
@Test(timeout=60000)
public void testAbortProcedureFailure() throws Exception {
final TableName tableName = TableName.valueOf("testAbortProcedureFailure");
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
HRegionInfo[] regions =
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f");
UTIL.getHBaseAdmin().disableTable(tableName);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
// Submit an un-abortable procedure
long procId = procExec.submitProcedure(
new DeleteTableProcedure(procExec.getEnvironment(), tableName), nonceGroup, nonce);
boolean abortResult = procExec.abort(procId, true);
assertFalse(abortResult);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
ProcedureTestingUtility.restart(procExec);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
// Validate the delete table procedure was not aborted
MasterProcedureTestingUtility.validateTableDeletion(
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f");
}
@Test(timeout=60000)
public void testAbortProcedureInterruptedNotAllowed() throws Exception {
final TableName tableName = TableName.valueOf("testAbortProcedureInterruptedNotAllowed");
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
HRegionInfo[] regions =
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f");
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
// Submit a procedure
long procId = procExec.submitProcedure(
new DisableTableProcedure(procExec.getEnvironment(), tableName, true), nonceGroup, nonce);
// Wait for one step to complete
ProcedureTestingUtility.waitProcedure(procExec, procId);
// Set the mayInterruptIfRunning flag to false
boolean abortResult = procExec.abort(procId, false);
assertFalse(abortResult);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
ProcedureTestingUtility.restart(procExec);
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
// Validate the delete table procedure was not aborted
MasterProcedureTestingUtility.validateTableIsDisabled(
UTIL.getHBaseCluster().getMaster(), tableName);
}
@Test(timeout=60000)
public void testAbortNonExistProcedure() throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
Random randomGenerator = new Random();
long procId;
// Generate a non-existing procedure
do {
procId = randomGenerator.nextLong();
} while (procExec.getResult(procId) != null);
boolean abortResult = procExec.abort(procId, true);
assertFalse(abortResult);
}
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
}
}