HBASE-13204 Procedure v2 - client create/delete table sync

This commit is contained in:
Matteo Bertozzi 2015-04-09 21:01:20 +01:00
parent b5f1f98a25
commit 6a6e3f46fd
11 changed files with 2734 additions and 367 deletions

View File

@ -1597,6 +1597,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return stub.isProcedureDone(controller, request);
}
@Override
public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
MasterProtos.GetProcedureResultRequest request) throws ServiceException {
return stub.getProcedureResult(controller, request);
}
@Override
public MasterProtos.IsMasterRunningResponse isMasterRunning(
RpcController controller, MasterProtos.IsMasterRunningRequest request)

View File

@ -31,6 +31,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@ -62,6 +66,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
@ -89,10 +94,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
@ -101,6 +108,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResp
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
@ -142,6 +151,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -186,6 +196,7 @@ public class HBaseAdmin implements Admin {
// numRetries is for 'normal' stuff... Multiply by this factor when
// want to wait a long time.
private final int retryLongerMultiplier;
private final int syncWaitTimeout;
private boolean aborted;
private boolean cleanupConnectionOnClose = false; // close the connection in close()
private boolean closed = false;
@ -242,6 +253,8 @@ public class HBaseAdmin implements Admin {
"hbase.client.retries.longer.multiplier", 10);
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.syncWaitTimeout = this.conf.getInt(
"hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
}
@ -541,92 +554,23 @@ public class HBaseAdmin implements Admin {
*/
@Override
public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
throws IOException {
throws IOException {
Future<Void> future = createTableAsyncV2(desc, splitKeys);
try {
createTableAsync(desc, splitKeys);
} catch (SocketTimeoutException ste) {
LOG.warn("Creating " + desc.getTableName() + " took too long", ste);
}
int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
int prevRegCount = 0;
boolean tableWasEnabled = false;
for (int tries = 0; tries < this.numRetries * this.retryLongerMultiplier;
++tries) {
if (tableWasEnabled) {
// Wait all table regions comes online
final AtomicInteger actualRegCount = new AtomicInteger(0);
MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
@Override
public boolean visit(Result rowResult) throws IOException {
RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
if (list == null) {
LOG.warn("No serialized HRegionInfo in " + rowResult);
return true;
}
HRegionLocation l = list.getRegionLocation();
if (l == null) {
return true;
}
if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
return false;
}
if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
HRegionLocation[] locations = list.getRegionLocations();
for (HRegionLocation location : locations) {
if (location == null) continue;
ServerName serverName = location.getServerName();
// Make sure that regions are assigned to server
if (serverName != null && serverName.getHostAndPort() != null) {
actualRegCount.incrementAndGet();
}
}
return true;
}
};
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, desc.getTableName());
if (actualRegCount.get() < numRegs) {
if (tries == this.numRetries * this.retryLongerMultiplier - 1) {
throw new RegionOfflineException("Only " + actualRegCount.get() +
" of " + numRegs + " regions are online; retries exhausted.");
}
try { // Sleep
Thread.sleep(getPauseTime(tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when opening" +
" regions; " + actualRegCount.get() + " of " + numRegs +
" regions processed so far");
}
if (actualRegCount.get() > prevRegCount) { // Making progress
prevRegCount = actualRegCount.get();
tries = -1;
}
} else {
return;
}
// TODO: how long should we wait? spin forever?
future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when waiting" +
" for table to be enabled; meta scan was done");
} catch (TimeoutException e) {
throw new TimeoutIOException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException)e.getCause();
} else {
try {
tableWasEnabled = isTableAvailable(desc.getTableName());
} catch (TableNotFoundException tnfe) {
LOG.debug(
"Table " + desc.getTableName() + " was not enabled, sleeping, still " + numRetries
+ " retries left");
}
if (tableWasEnabled) {
// no we will scan meta to ensure all regions are online
tries = -1;
} else {
try { // Sleep
Thread.sleep(getPauseTime(tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when waiting" +
" for table to be enabled; meta scan was done");
}
}
throw new IOException(e.getCause());
}
}
throw new TableNotEnabledException(
"Retries exhausted while still waiting for table: "
+ desc.getTableName() + " to be enabled");
}
/**
@ -646,22 +590,42 @@ public class HBaseAdmin implements Admin {
* @throws IOException
*/
@Override
public void createTableAsync(
final HTableDescriptor desc, final byte [][] splitKeys)
throws IOException {
if(desc.getTableName() == null) {
public void createTableAsync(final HTableDescriptor desc, final byte [][] splitKeys)
throws IOException {
createTableAsyncV2(desc, splitKeys);
}
/**
* Creates a new table but does not block and wait for it to come online.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
* or TimeoutException in case the wait timeout was not long enough to allow the
* operation to complete.
*
* @param desc table descriptor for table
* @param splitKeys keys to check if the table has been created with all split keys
* @throws IllegalArgumentException Bad table name, if the split keys
* are repeated and if the split key has empty byte array.
* @throws IOException if a remote or network exception occurs
* @return the result of the async creation. You can use Future.get(long, TimeUnit)
* to wait on the operation to complete.
*/
// TODO: This should be called Async but it will break binary compatibility
private Future<Void> createTableAsyncV2(final HTableDescriptor desc, final byte[][] splitKeys)
throws IOException {
if (desc.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null");
}
if(splitKeys != null && splitKeys.length > 0) {
if (splitKeys != null && splitKeys.length > 0) {
Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
// Verify there are no duplicate split keys
byte [] lastKey = null;
for(byte [] splitKey : splitKeys) {
byte[] lastKey = null;
for (byte[] splitKey : splitKeys) {
if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
throw new IllegalArgumentException(
"Empty split key must not be passed in the split keys.");
}
if(lastKey != null && Bytes.equals(splitKey, lastKey)) {
if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
throw new IllegalArgumentException("All split keys must be unique, " +
"found duplicate: " + Bytes.toStringBinary(splitKey) +
", " + Bytes.toStringBinary(lastKey));
@ -670,14 +634,127 @@ public class HBaseAdmin implements Admin {
}
}
executeCallable(new MasterCallable<Void>(getConnection()) {
CreateTableResponse response = executeCallable(
new MasterCallable<CreateTableResponse>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
public CreateTableResponse call(int callTimeout) throws ServiceException {
CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
master.createTable(null, request);
return null;
return master.createTable(null, request);
}
});
return new CreateTableFuture(this, desc, splitKeys, response);
}
private static class CreateTableFuture extends ProcedureFuture<Void> {
private final HTableDescriptor desc;
private final byte[][] splitKeys;
public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc,
final byte[][] splitKeys, final CreateTableResponse response) {
super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null);
this.splitKeys = splitKeys;
this.desc = desc;
}
@Override
protected Void waitOperationResult(final long deadlineTs)
throws IOException, TimeoutException {
waitForTableEnabled(deadlineTs);
waitForAllRegionsOnline(deadlineTs);
return null;
}
@Override
protected Void postOperationResult(final Void result, final long deadlineTs)
throws IOException, TimeoutException {
LOG.info("Created " + desc.getTableName());
return result;
}
private void waitForTableEnabled(final long deadlineTs)
throws IOException, TimeoutException {
waitForState(deadlineTs, new WaitForStateCallable() {
@Override
public boolean checkState(int tries) throws IOException {
try {
if (getAdmin().isTableAvailable(desc.getTableName())) {
return true;
}
} catch (TableNotFoundException tnfe) {
LOG.debug("Table "+ desc.getTableName() +" was not enabled, sleeping. tries="+ tries);
}
return false;
}
@Override
public void throwInterruptedException() throws InterruptedIOException {
throw new InterruptedIOException("Interrupted when waiting for table " +
desc.getTableName() + " to be enabled");
}
@Override
public void throwTimeoutException(long elapsedTime) throws TimeoutException {
throw new TimeoutException("Table " + desc.getTableName() +
" not enabled after " + elapsedTime + "msec");
}
});
}
private void waitForAllRegionsOnline(final long deadlineTs)
throws IOException, TimeoutException {
final AtomicInteger actualRegCount = new AtomicInteger(0);
final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
@Override
public boolean visit(Result rowResult) throws IOException {
RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
if (list == null) {
LOG.warn("No serialized HRegionInfo in " + rowResult);
return true;
}
HRegionLocation l = list.getRegionLocation();
if (l == null) {
return true;
}
if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
return false;
}
if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
HRegionLocation[] locations = list.getRegionLocations();
for (HRegionLocation location : locations) {
if (location == null) continue;
ServerName serverName = location.getServerName();
// Make sure that regions are assigned to server
if (serverName != null && serverName.getHostAndPort() != null) {
actualRegCount.incrementAndGet();
}
}
return true;
}
};
int tries = 0;
IOException serverEx = null;
int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
actualRegCount.set(0);
MetaTableAccessor.scanMetaForTableRegions(
getAdmin().getConnection(), visitor, desc.getTableName());
if (actualRegCount.get() == numRegs) {
// all the regions are online
return;
}
try {
Thread.sleep(getAdmin().getPauseTime(tries++));
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when opening" +
" regions; " + actualRegCount.get() + " of " + numRegs +
" regions processed so far");
}
}
throw new TimeoutException("Only " + actualRegCount.get() +
" of " + numRegs + " regions are online; retries exhausted.");
}
}
public void deleteTable(final String tableName) throws IOException {
@ -697,48 +774,93 @@ public class HBaseAdmin implements Admin {
*/
@Override
public void deleteTable(final TableName tableName) throws IOException {
boolean tableExists = true;
Future<Void> future = deleteTableAsyncV2(tableName);
try {
future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
} catch (TimeoutException e) {
throw new TimeoutIOException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException)e.getCause();
} else {
throw new IOException(e.getCause());
}
}
}
executeCallable(new MasterCallable<Void>(getConnection()) {
/**
* Deletes the table but does not block and wait for it be completely removed.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
* or TimeoutException in case the wait timeout was not long enough to allow the
* operation to complete.
*
* @param desc table descriptor for table
* @param tableName name of table to delete
* @throws IOException if a remote or network exception occurs
* @return the result of the async delete. You can use Future.get(long, TimeUnit)
* to wait on the operation to complete.
*/
// TODO: This should be called Async but it will break binary compatibility
private Future<Void> deleteTableAsyncV2(final TableName tableName) throws IOException {
DeleteTableResponse response = executeCallable(
new MasterCallable<DeleteTableResponse>(getConnection()) {
@Override
public Void call(int callTimeout) throws ServiceException {
public DeleteTableResponse call(int callTimeout) throws ServiceException {
DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
master.deleteTable(null,req);
return null;
return master.deleteTable(null,req);
}
});
return new DeleteTableFuture(this, tableName, response);
}
int failures = 0;
for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
try {
tableExists = tableExists(tableName);
if (!tableExists)
break;
} catch (IOException ex) {
failures++;
if(failures >= numRetries - 1) { // no more tries left
if (ex instanceof RemoteException) {
throw ((RemoteException) ex).unwrapRemoteException();
} else {
throw ex;
}
private static class DeleteTableFuture extends ProcedureFuture<Void> {
private final TableName tableName;
public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
final DeleteTableResponse response) {
super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null);
this.tableName = tableName;
}
@Override
protected Void waitOperationResult(final long deadlineTs)
throws IOException, TimeoutException {
waitTableNotFound(deadlineTs);
return null;
}
@Override
protected Void postOperationResult(final Void result, final long deadlineTs)
throws IOException, TimeoutException {
// Delete cached information to prevent clients from using old locations
getAdmin().getConnection().clearRegionCache(tableName);
LOG.info("Deleted " + tableName);
return result;
}
private void waitTableNotFound(final long deadlineTs)
throws IOException, TimeoutException {
waitForState(deadlineTs, new WaitForStateCallable() {
@Override
public boolean checkState(int tries) throws IOException {
return !getAdmin().tableExists(tableName);
}
}
try {
Thread.sleep(getPauseTime(tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when waiting" +
" for table to be deleted");
}
}
if (tableExists) {
throw new IOException("Retries exhausted, it took too long to wait"+
" for the table " + tableName + " to be deleted.");
@Override
public void throwInterruptedException() throws InterruptedIOException {
throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
}
@Override
public void throwTimeoutException(long elapsedTime) throws TimeoutException {
throw new TimeoutException("Table " + tableName + " not yet deleted after " +
elapsedTime + "msec");
}
});
}
// Delete cached information to prevent clients from using old locations
this.connection.clearRegionCache(tableName);
LOG.info("Deleted " + tableName);
}
/**
@ -3834,4 +3956,236 @@ public class HBaseAdmin implements Admin {
}
});
}
/**
* Future that waits on a procedure result.
* Returned by the async version of the Admin calls,
* and used internally by the sync calls to wait on the result of the procedure.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
protected static class ProcedureFuture<V> implements Future<V> {
private ExecutionException exception = null;
private boolean procResultFound = false;
private boolean done = false;
private V result = null;
private final HBaseAdmin admin;
private final Long procId;
public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
this.admin = admin;
this.procId = procId;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
@Override
public boolean isCancelled() {
// TODO: Abort not implemented yet
return false;
}
@Override
public V get() throws InterruptedException, ExecutionException {
// TODO: should we ever spin forever?
throw new UnsupportedOperationException();
}
@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
try {
try {
// if the master support procedures, try to wait the result
if (procId != null) {
result = waitProcedureResult(procId, deadlineTs);
}
// if we don't have a proc result, try the compatibility wait
if (!procResultFound) {
result = waitOperationResult(deadlineTs);
}
result = postOperationResult(result, deadlineTs);
done = true;
} catch (IOException e) {
result = postOpeartionFailure(e, deadlineTs);
done = true;
}
} catch (IOException e) {
exception = new ExecutionException(e);
done = true;
}
}
if (exception != null) {
throw exception;
}
return result;
}
@Override
public boolean isDone() {
return done;
}
protected HBaseAdmin getAdmin() {
return admin;
}
private V waitProcedureResult(long procId, long deadlineTs)
throws IOException, TimeoutException, InterruptedException {
GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
.setProcId(procId)
.build();
int tries = 0;
IOException serviceEx = null;
while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
GetProcedureResultResponse response = null;
try {
// Try to fetch the result
response = getProcedureResult(request);
} catch (IOException e) {
serviceEx = unwrapException(e);
// the master may be down
LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
// Not much to do, if we have a DoNotRetryIOException
if (serviceEx instanceof DoNotRetryIOException) {
// TODO: looks like there is no way to unwrap this exception and get the proper
// UnsupportedOperationException aside from looking at the message.
// anyway, if we fail here we just failover to the compatibility side
// and that is always a valid solution.
LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
procResultFound = false;
return null;
}
}
// If the procedure is no longer running, we should have a result
if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
return convertResult(response);
}
try {
Thread.sleep(getAdmin().getPauseTime(tries++));
} catch (InterruptedException e) {
throw new InterruptedException(
"Interrupted while waiting for the result of proc " + procId);
}
}
if (serviceEx != null) {
throw serviceEx;
} else {
throw new TimeoutException("The procedure " + procId + " is still running");
}
}
private static IOException unwrapException(IOException e) {
if (e instanceof RemoteException) {
return ((RemoteException)e).unwrapRemoteException();
}
return e;
}
protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
throws IOException {
return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
admin.getConnection()) {
@Override
public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
return master.getProcedureResult(null, request);
}
});
}
/**
* Convert the procedure result response to a specified type.
* @param response the procedure result object to parse
* @return the result data of the procedure.
*/
protected V convertResult(final GetProcedureResultResponse response) throws IOException {
if (response.hasException()) {
throw ForeignExceptionUtil.toIOException(response.getException());
}
return null;
}
/**
* Fallback implementation in case the procedure is not supported by the server.
* It should try to wait until the operation is completed.
* @param deadlineTs the timestamp after which this method should throw a TimeoutException
* @return the result data of the operation
*/
protected V waitOperationResult(final long deadlineTs)
throws IOException, TimeoutException {
return null;
}
/**
* Called after the operation is completed and the result fetched.
* this allows to perform extra steps after the procedure is completed.
* it allows to apply transformations to the result that will be returned by get().
* @param result the result of the procedure
* @param deadlineTs the timestamp after which this method should throw a TimeoutException
* @return the result of the procedure, which may be the same as the passed one
*/
protected V postOperationResult(final V result, final long deadlineTs)
throws IOException, TimeoutException {
return result;
}
/**
* Called after the operation is terminated with a failure.
* this allows to perform extra steps after the procedure is terminated.
* it allows to apply transformations to the result that will be returned by get().
* The default implementation will rethrow the exception
* @param exception the exception got from fetching the result
* @param deadlineTs the timestamp after which this method should throw a TimeoutException
* @return the result of the procedure, which may be the same as the passed one
*/
protected V postOpeartionFailure(final IOException exception, final long deadlineTs)
throws IOException, TimeoutException {
throw exception;
}
protected interface WaitForStateCallable {
boolean checkState(int tries) throws IOException;
void throwInterruptedException() throws InterruptedIOException;
void throwTimeoutException(long elapsed) throws TimeoutException;
}
protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
throws IOException, TimeoutException {
int tries = 0;
IOException serverEx = null;
long startTime = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
serverEx = null;
try {
if (callable.checkState(tries)) {
return;
}
} catch (IOException e) {
serverEx = e;
}
try {
Thread.sleep(getAdmin().getPauseTime(tries++));
} catch (InterruptedException e) {
callable.throwInterruptedException();
}
}
if (serverEx != null) {
throw unwrapException(serverEx);
} else {
callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
}
}
}
}

View File

@ -0,0 +1,186 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({ClientTests.class, SmallTests.class})
public class TestProcedureFuture {
private static class TestFuture extends HBaseAdmin.ProcedureFuture<Void> {
private boolean postOperationResultCalled = false;
private boolean waitOperationResultCalled = false;
private boolean getProcedureResultCalled = false;
private boolean convertResultCalled = false;
public TestFuture(final HBaseAdmin admin, final Long procId) {
super(admin, procId);
}
public boolean wasPostOperationResultCalled() {
return postOperationResultCalled;
}
public boolean wasWaitOperationResultCalled() {
return waitOperationResultCalled;
}
public boolean wasGetProcedureResultCalled() {
return getProcedureResultCalled;
}
public boolean wasConvertResultCalled() {
return convertResultCalled;
}
@Override
protected GetProcedureResultResponse getProcedureResult(
final GetProcedureResultRequest request) throws IOException {
getProcedureResultCalled = true;
return GetProcedureResultResponse.newBuilder()
.setState(GetProcedureResultResponse.State.FINISHED)
.build();
}
@Override
protected Void convertResult(final GetProcedureResultResponse response) throws IOException {
convertResultCalled = true;
return null;
}
@Override
protected Void waitOperationResult(final long deadlineTs)
throws IOException, TimeoutException {
waitOperationResultCalled = true;
return null;
}
@Override
protected Void postOperationResult(final Void result, final long deadlineTs)
throws IOException, TimeoutException {
postOperationResultCalled = true;
return result;
}
}
/**
* When a master return a result with procId,
* we are skipping the waitOperationResult() call,
* since we are getting the procedure result.
*/
@Test(timeout=60000)
public void testWithProcId() throws Exception {
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
TestFuture f = new TestFuture(admin, 100L);
f.get(1, TimeUnit.MINUTES);
assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
}
/**
* Verify that the spin loop for the procedure running works.
*/
@Test(timeout=60000)
public void testWithProcIdAndSpinning() throws Exception {
final AtomicInteger spinCount = new AtomicInteger(0);
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
TestFuture f = new TestFuture(admin, 100L) {
@Override
protected GetProcedureResultResponse getProcedureResult(
final GetProcedureResultRequest request) throws IOException {
boolean done = spinCount.incrementAndGet() >= 10;
return GetProcedureResultResponse.newBuilder()
.setState(done ? GetProcedureResultResponse.State.FINISHED :
GetProcedureResultResponse.State.RUNNING)
.build();
}
};
f.get(1, TimeUnit.MINUTES);
assertEquals(10, spinCount.get());
assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
}
/**
* When a master return a result without procId,
* we are skipping the getProcedureResult() call.
*/
@Test(timeout=60000)
public void testWithoutProcId() throws Exception {
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
TestFuture f = new TestFuture(admin, null);
f.get(1, TimeUnit.MINUTES);
assertFalse("unexpected getProcedureResult() called", f.wasGetProcedureResultCalled());
assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
}
/**
* When a new client with procedure support tries to ask an old-master without proc-support
* the procedure result we get a DoNotRetryIOException (which is an UnsupportedOperationException)
* The future should trap that and fallback to the waitOperationResult().
*
* This happens when the operation calls happens on a "new master" but while we are waiting
* the operation to be completed, we failover on an "old master".
*/
@Test(timeout=60000)
public void testOnServerWithNoProcedureSupport() throws Exception {
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
TestFuture f = new TestFuture(admin, 100L) {
@Override
protected GetProcedureResultResponse getProcedureResult(
final GetProcedureResultRequest request) throws IOException {
super.getProcedureResult(request);
throw new DoNotRetryIOException(new UnsupportedOperationException("getProcedureResult"));
}
};
f.get(1, TimeUnit.MINUTES);
assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
}
}

View File

@ -28,6 +28,7 @@ option optimize_for = SPEED;
import "HBase.proto";
import "Client.proto";
import "ClusterStatus.proto";
import "ErrorHandling.proto";
import "Quota.proto";
/* Column-level protobufs */
@ -108,6 +109,7 @@ message CreateTableRequest {
}
message CreateTableResponse {
optional uint64 proc_id = 1;
}
message DeleteTableRequest {
@ -115,6 +117,7 @@ message DeleteTableRequest {
}
message DeleteTableResponse {
optional uint64 proc_id = 1;
}
message TruncateTableRequest {
@ -380,6 +383,24 @@ message IsProcedureDoneResponse {
optional ProcedureDescription snapshot = 2;
}
message GetProcedureResultRequest {
required uint64 proc_id = 1;
}
message GetProcedureResultResponse {
enum State {
NOT_FOUND = 0;
RUNNING = 1;
FINISHED = 2;
}
required State state = 1;
optional uint64 start_time = 2;
optional uint64 last_update = 3;
optional bytes result = 4;
optional ForeignExceptionMessage exception = 5;
}
message SetQuotaRequest {
optional string user_name = 1;
optional string user_group = 2;
@ -634,4 +655,7 @@ service MasterService {
/** Returns the timestamp of the last major compaction */
rpc getLastMajorCompactionTimestampForRegion(MajorCompactionTimestampForRegionRequest)
returns(MajorCompactionTimestampResponse);
rpc getProcedureResult(GetProcedureResultRequest)
returns(GetProcedureResultResponse);
}

View File

@ -1326,7 +1326,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
@Override
public void createTable(HTableDescriptor hTableDescriptor,
public long createTable(HTableDescriptor hTableDescriptor,
byte [][] splitKeys) throws IOException {
if (isStopped()) {
throw new MasterNotRunningException();
@ -1357,9 +1357,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
cpHost.postCreateTable(hTableDescriptor, newRegions);
}
// TODO: change the interface to return the procId,
// and add it to the response protobuf.
//return procId;
return procId;
}
/**
@ -1571,7 +1569,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
@Override
public void deleteTable(final TableName tableName) throws IOException {
public long deleteTable(final TableName tableName) throws IOException {
checkInitialized();
if (cpHost != null) {
cpHost.preDeleteTable(tableName);
@ -1588,9 +1586,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
cpHost.postDeleteTable(tableName);
}
// TODO: change the interface to return the procId,
// and add it to the response protobuf.
//return procId;
return procId;
}
@Override

View File

@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureResult;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@ -86,6 +88,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
@ -158,6 +162,7 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
@ -405,11 +410,11 @@ public class MasterRpcServices extends RSRpcServices
HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
try {
master.createTable(hTableDescriptor, splitKeys);
long procId = master.createTable(hTableDescriptor, splitKeys);
return CreateTableResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return CreateTableResponse.newBuilder().build();
}
@Override
@ -461,11 +466,11 @@ public class MasterRpcServices extends RSRpcServices
public DeleteTableResponse deleteTable(RpcController controller,
DeleteTableRequest request) throws ServiceException {
try {
master.deleteTable(ProtobufUtil.toTableName(request.getTableName()));
long procId = master.deleteTable(ProtobufUtil.toTableName(request.getTableName()));
return DeleteTableResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return DeleteTableResponse.newBuilder().build();
}
@Override
@ -961,6 +966,44 @@ public class MasterRpcServices extends RSRpcServices
}
}
@Override
public GetProcedureResultResponse getProcedureResult(RpcController controller,
GetProcedureResultRequest request) throws ServiceException {
LOG.debug("Checking to see if procedure is done procId=" + request.getProcId());
try {
master.checkInitialized();
GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder();
Pair<ProcedureResult, Procedure> v = master.getMasterProcedureExecutor()
.getResultOrProcedure(request.getProcId());
if (v.getFirst() != null) {
ProcedureResult result = v.getFirst();
builder.setState(GetProcedureResultResponse.State.FINISHED);
builder.setStartTime(result.getStartTime());
builder.setLastUpdate(result.getLastUpdate());
if (result.isFailed()) {
builder.setException(result.getException().convert());
}
if (result.hasResultData()) {
builder.setResult(ByteStringer.wrap(result.getResult()));
}
master.getMasterProcedureExecutor().removeResult(request.getProcId());
} else {
Procedure proc = v.getSecond();
if (proc == null) {
builder.setState(GetProcedureResultResponse.State.NOT_FOUND);
} else {
builder.setState(GetProcedureResultResponse.State.RUNNING);
builder.setStartTime(proc.getStartTime());
builder.setLastUpdate(proc.getLastUpdate());
}
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController c,
ListNamespaceDescriptorsRequest request) throws ServiceException {

View File

@ -105,7 +105,7 @@ public interface MasterServices extends Server {
* @param splitKeys Starting row keys for the initial table regions. If null
* a single region is created.
*/
void createTable(HTableDescriptor desc, byte[][] splitKeys)
long createTable(HTableDescriptor desc, byte[][] splitKeys)
throws IOException;
/**
@ -113,7 +113,7 @@ public interface MasterServices extends Server {
* @param tableName The table name
* @throws IOException
*/
void deleteTable(final TableName tableName) throws IOException;
long deleteTable(final TableName tableName) throws IOException;
/**
* Truncate a table

View File

@ -125,6 +125,7 @@ public class DeleteTableProcedure
LOG.debug("delete '" + getTableName() + "' from filesystem");
DeleteTableProcedure.deleteFromFs(env, getTableName(), regions, true);
setNextState(DeleteTableState.DELETE_TABLE_UPDATE_DESC_CACHE);
regions = null;
break;
case DELETE_TABLE_UPDATE_DESC_CACHE:
LOG.debug("delete '" + getTableName() + "' descriptor");

View File

@ -227,9 +227,10 @@ public class TestCatalogJanitor {
}
@Override
public void createTable(HTableDescriptor desc, byte[][] splitKeys)
public long createTable(HTableDescriptor desc, byte[][] splitKeys)
throws IOException {
// no-op
return -1;
}
@Override
@ -427,7 +428,9 @@ public class TestCatalogJanitor {
}
@Override
public void deleteTable(TableName tableName) throws IOException { }
public long deleteTable(TableName tableName) throws IOException {
return -1;
}
@Override
public void truncateTable(TableName tableName, boolean preserveSplits) throws IOException { }

View File

@ -166,7 +166,7 @@ public class TestHBaseFsck {
conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2 * REGION_ONLINE_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
TEST_UTIL.startMiniCluster(3);
tableExecutorService = new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS,