HBASE-13204 Procedure v2 - client create/delete table sync
This commit is contained in:
parent
58b1598b44
commit
67149d253b
|
@ -118,6 +118,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescripto
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
|
||||
|
@ -1906,6 +1908,12 @@ class ConnectionManager {
|
|||
return stub.isProcedureDone(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetProcedureResultResponse getProcedureResult(RpcController controller,
|
||||
GetProcedureResultRequest request) throws ServiceException {
|
||||
return stub.getProcedureResult(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsMasterRunningResponse isMasterRunning(
|
||||
RpcController controller, IsMasterRunningRequest request)
|
||||
|
@ -1990,7 +1998,7 @@ class ConnectionManager {
|
|||
throws ServiceException {
|
||||
return stub.getClusterStatus(controller, request);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
|
||||
throws ServiceException {
|
||||
|
|
|
@ -30,6 +30,10 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -63,6 +67,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
|
||||
|
@ -90,10 +95,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
|
||||
|
@ -102,6 +109,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResp
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
|
||||
|
@ -143,6 +152,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
|
|||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -186,6 +196,7 @@ public class HBaseAdmin implements Admin {
|
|||
// numRetries is for 'normal' stuff... Multiply by this factor when
|
||||
// want to wait a long time.
|
||||
private final int retryLongerMultiplier;
|
||||
private final int syncWaitTimeout;
|
||||
private boolean aborted;
|
||||
private boolean cleanupConnectionOnClose = false; // close the connection in close()
|
||||
private boolean closed = false;
|
||||
|
@ -244,6 +255,8 @@ public class HBaseAdmin implements Admin {
|
|||
"hbase.client.retries.longer.multiplier", 10);
|
||||
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
this.syncWaitTimeout = this.conf.getInt(
|
||||
"hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
|
||||
|
||||
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
|
||||
}
|
||||
|
@ -538,83 +551,23 @@ public class HBaseAdmin implements Admin {
|
|||
*/
|
||||
@Override
|
||||
public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
Future<Void> future = createTableAsyncV2(desc, splitKeys);
|
||||
try {
|
||||
createTableAsync(desc, splitKeys);
|
||||
} catch (SocketTimeoutException ste) {
|
||||
LOG.warn("Creating " + desc.getTableName() + " took too long", ste);
|
||||
}
|
||||
int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
|
||||
int prevRegCount = 0;
|
||||
boolean doneWithMetaScan = false;
|
||||
for (int tries = 0; tries < this.numRetries * this.retryLongerMultiplier;
|
||||
++tries) {
|
||||
if (!doneWithMetaScan) {
|
||||
// Wait for new table to come on-line
|
||||
final AtomicInteger actualRegCount = new AtomicInteger(0);
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
|
||||
@Override
|
||||
public boolean processRow(Result rowResult) throws IOException {
|
||||
RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
|
||||
if (list == null) {
|
||||
LOG.warn("No serialized HRegionInfo in " + rowResult);
|
||||
return true;
|
||||
}
|
||||
HRegionLocation l = list.getRegionLocation();
|
||||
if (l == null) {
|
||||
return true;
|
||||
}
|
||||
if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
|
||||
return false;
|
||||
}
|
||||
if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
|
||||
HRegionLocation[] locations = list.getRegionLocations();
|
||||
for (HRegionLocation location : locations) {
|
||||
if (location == null) continue;
|
||||
ServerName serverName = location.getServerName();
|
||||
// Make sure that regions are assigned to server
|
||||
if (serverName != null && serverName.getHostAndPort() != null) {
|
||||
actualRegCount.incrementAndGet();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
MetaScanner.metaScan(connection, visitor, desc.getTableName());
|
||||
if (actualRegCount.get() < numRegs) {
|
||||
if (tries == this.numRetries * this.retryLongerMultiplier - 1) {
|
||||
throw new RegionOfflineException("Only " + actualRegCount.get() +
|
||||
" of " + numRegs + " regions are online; retries exhausted.");
|
||||
}
|
||||
try { // Sleep
|
||||
Thread.sleep(getPauseTime(tries));
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when opening" +
|
||||
" regions; " + actualRegCount.get() + " of " + numRegs +
|
||||
" regions processed so far");
|
||||
}
|
||||
if (actualRegCount.get() > prevRegCount) { // Making progress
|
||||
prevRegCount = actualRegCount.get();
|
||||
tries = -1;
|
||||
}
|
||||
} else {
|
||||
doneWithMetaScan = true;
|
||||
tries = -1;
|
||||
}
|
||||
} else if (isTableEnabled(desc.getTableName())) {
|
||||
return;
|
||||
// TODO: how long should we wait? spin forever?
|
||||
future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when waiting" +
|
||||
" for table to be enabled; meta scan was done");
|
||||
} catch (TimeoutException e) {
|
||||
throw new TimeoutIOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof IOException) {
|
||||
throw (IOException)e.getCause();
|
||||
} else {
|
||||
try { // Sleep
|
||||
Thread.sleep(getPauseTime(tries));
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when waiting" +
|
||||
" for table to be enabled; meta scan was done");
|
||||
}
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
}
|
||||
throw new TableNotEnabledException(
|
||||
"Retries exhausted while still waiting for table: "
|
||||
+ desc.getTableName() + " to be enabled");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -634,22 +587,42 @@ public class HBaseAdmin implements Admin {
|
|||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void createTableAsync(
|
||||
final HTableDescriptor desc, final byte [][] splitKeys)
|
||||
throws IOException {
|
||||
if(desc.getTableName() == null) {
|
||||
public void createTableAsync(final HTableDescriptor desc, final byte [][] splitKeys)
|
||||
throws IOException {
|
||||
createTableAsyncV2(desc, splitKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new table but does not block and wait for it to come online.
|
||||
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
|
||||
* It may throw ExecutionException if there was an error while executing the operation
|
||||
* or TimeoutException in case the wait timeout was not long enough to allow the
|
||||
* operation to complete.
|
||||
*
|
||||
* @param desc table descriptor for table
|
||||
* @param splitKeys keys to check if the table has been created with all split keys
|
||||
* @throws IllegalArgumentException Bad table name, if the split keys
|
||||
* are repeated and if the split key has empty byte array.
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @return the result of the async creation. You can use Future.get(long, TimeUnit)
|
||||
* to wait on the operation to complete.
|
||||
*/
|
||||
// TODO: This should be called Async but it will break binary compatibility
|
||||
private Future<Void> createTableAsyncV2(final HTableDescriptor desc, final byte[][] splitKeys)
|
||||
throws IOException {
|
||||
if (desc.getTableName() == null) {
|
||||
throw new IllegalArgumentException("TableName cannot be null");
|
||||
}
|
||||
if(splitKeys != null && splitKeys.length > 0) {
|
||||
if (splitKeys != null && splitKeys.length > 0) {
|
||||
Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
|
||||
// Verify there are no duplicate split keys
|
||||
byte [] lastKey = null;
|
||||
for(byte [] splitKey : splitKeys) {
|
||||
byte[] lastKey = null;
|
||||
for (byte[] splitKey : splitKeys) {
|
||||
if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"Empty split key must not be passed in the split keys.");
|
||||
}
|
||||
if(lastKey != null && Bytes.equals(splitKey, lastKey)) {
|
||||
if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
|
||||
throw new IllegalArgumentException("All split keys must be unique, " +
|
||||
"found duplicate: " + Bytes.toStringBinary(splitKey) +
|
||||
", " + Bytes.toStringBinary(lastKey));
|
||||
|
@ -658,14 +631,126 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
}
|
||||
|
||||
executeCallable(new MasterCallable<Void>(getConnection()) {
|
||||
CreateTableResponse response = executeCallable(
|
||||
new MasterCallable<CreateTableResponse>(getConnection()) {
|
||||
@Override
|
||||
public Void call(int callTimeout) throws ServiceException {
|
||||
public CreateTableResponse call(int callTimeout) throws ServiceException {
|
||||
CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
|
||||
master.createTable(null, request);
|
||||
return null;
|
||||
return master.createTable(null, request);
|
||||
}
|
||||
});
|
||||
return new CreateTableFuture(this, desc, splitKeys, response);
|
||||
}
|
||||
|
||||
private static class CreateTableFuture extends ProcedureFuture<Void> {
|
||||
private final HTableDescriptor desc;
|
||||
private final byte[][] splitKeys;
|
||||
|
||||
public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc,
|
||||
final byte[][] splitKeys, final CreateTableResponse response) {
|
||||
super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null);
|
||||
this.splitKeys = splitKeys;
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Void waitOperationResult(final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
waitForTableEnabled(deadlineTs);
|
||||
waitForAllRegionsOnline(deadlineTs);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Void postOperationResult(final Void result, final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
LOG.info("Created " + desc.getTableName());
|
||||
return result;
|
||||
}
|
||||
|
||||
private void waitForTableEnabled(final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
waitForState(deadlineTs, new WaitForStateCallable() {
|
||||
@Override
|
||||
public boolean checkState(int tries) throws IOException {
|
||||
try {
|
||||
if (getAdmin().isTableAvailable(desc.getTableName())) {
|
||||
return true;
|
||||
}
|
||||
} catch (TableNotFoundException tnfe) {
|
||||
LOG.debug("Table "+ desc.getTableName() +" was not enabled, sleeping. tries="+ tries);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwInterruptedException() throws InterruptedIOException {
|
||||
throw new InterruptedIOException("Interrupted when waiting for table " +
|
||||
desc.getTableName() + " to be enabled");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwTimeoutException(long elapsedTime) throws TimeoutException {
|
||||
throw new TimeoutException("Table " + desc.getTableName() +
|
||||
" not enabled after " + elapsedTime + "msec");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void waitForAllRegionsOnline(final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
final AtomicInteger actualRegCount = new AtomicInteger(0);
|
||||
final MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
|
||||
@Override
|
||||
public boolean processRow(Result rowResult) throws IOException {
|
||||
RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
|
||||
if (list == null) {
|
||||
LOG.warn("No serialized HRegionInfo in " + rowResult);
|
||||
return true;
|
||||
}
|
||||
HRegionLocation l = list.getRegionLocation();
|
||||
if (l == null) {
|
||||
return true;
|
||||
}
|
||||
if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
|
||||
return false;
|
||||
}
|
||||
if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
|
||||
HRegionLocation[] locations = list.getRegionLocations();
|
||||
for (HRegionLocation location : locations) {
|
||||
if (location == null) continue;
|
||||
ServerName serverName = location.getServerName();
|
||||
// Make sure that regions are assigned to server
|
||||
if (serverName != null && serverName.getHostAndPort() != null) {
|
||||
actualRegCount.incrementAndGet();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
int tries = 0;
|
||||
IOException serverEx = null;
|
||||
int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
|
||||
while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
|
||||
actualRegCount.set(0);
|
||||
MetaScanner.metaScan(getAdmin().getConnection(), visitor, desc.getTableName());
|
||||
if (actualRegCount.get() == numRegs) {
|
||||
// all the regions are online
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(getAdmin().getPauseTime(tries++));
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when opening" +
|
||||
" regions; " + actualRegCount.get() + " of " + numRegs +
|
||||
" regions processed so far");
|
||||
}
|
||||
}
|
||||
throw new TimeoutException("Only " + actualRegCount.get() +
|
||||
" of " + numRegs + " regions are online; retries exhausted.");
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteTable(final String tableName) throws IOException {
|
||||
|
@ -685,61 +770,93 @@ public class HBaseAdmin implements Admin {
|
|||
*/
|
||||
@Override
|
||||
public void deleteTable(final TableName tableName) throws IOException {
|
||||
boolean tableExists = true;
|
||||
Future<Void> future = deleteTableAsyncV2(tableName);
|
||||
try {
|
||||
future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
|
||||
} catch (TimeoutException e) {
|
||||
throw new TimeoutIOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof IOException) {
|
||||
throw (IOException)e.getCause();
|
||||
} else {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
executeCallable(new MasterCallable<Void>(getConnection()) {
|
||||
/**
|
||||
* Deletes the table but does not block and wait for it be completely removed.
|
||||
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
|
||||
* It may throw ExecutionException if there was an error while executing the operation
|
||||
* or TimeoutException in case the wait timeout was not long enough to allow the
|
||||
* operation to complete.
|
||||
*
|
||||
* @param desc table descriptor for table
|
||||
* @param tableName name of table to delete
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @return the result of the async delete. You can use Future.get(long, TimeUnit)
|
||||
* to wait on the operation to complete.
|
||||
*/
|
||||
// TODO: This should be called Async but it will break binary compatibility
|
||||
private Future<Void> deleteTableAsyncV2(final TableName tableName) throws IOException {
|
||||
DeleteTableResponse response = executeCallable(
|
||||
new MasterCallable<DeleteTableResponse>(getConnection()) {
|
||||
@Override
|
||||
public Void call(int callTimeout) throws ServiceException {
|
||||
public DeleteTableResponse call(int callTimeout) throws ServiceException {
|
||||
DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
|
||||
master.deleteTable(null,req);
|
||||
return null;
|
||||
return master.deleteTable(null,req);
|
||||
}
|
||||
});
|
||||
return new DeleteTableFuture(this, tableName, response);
|
||||
}
|
||||
|
||||
int failures = 0;
|
||||
// Wait until all regions deleted
|
||||
for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
|
||||
try {
|
||||
// Find whether all regions are deleted.
|
||||
List<RegionLocations> regionLations =
|
||||
MetaScanner.listTableRegionLocations(conf, connection, tableName);
|
||||
private static class DeleteTableFuture extends ProcedureFuture<Void> {
|
||||
private final TableName tableName;
|
||||
|
||||
// let us wait until hbase:meta table is updated and
|
||||
// HMaster removes the table from its HTableDescriptors
|
||||
if (regionLations == null || regionLations.size() == 0) {
|
||||
HTableDescriptor htd = getTableDescriptorByTableName(tableName);
|
||||
|
||||
if (htd == null) {
|
||||
// table could not be found in master - we are done.
|
||||
tableExists = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
failures++;
|
||||
if(failures >= numRetries - 1) { // no more tries left
|
||||
if (ex instanceof RemoteException) {
|
||||
throw ((RemoteException) ex).unwrapRemoteException();
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(getPauseTime(tries));
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when waiting" +
|
||||
" for table to be deleted");
|
||||
}
|
||||
public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
|
||||
final DeleteTableResponse response) {
|
||||
super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null);
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
if (tableExists) {
|
||||
throw new IOException("Retries exhausted, it took too long to wait"+
|
||||
" for the table " + tableName + " to be deleted.");
|
||||
@Override
|
||||
protected Void waitOperationResult(final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
waitTableNotFound(deadlineTs);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Void postOperationResult(final Void result, final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
// Delete cached information to prevent clients from using old locations
|
||||
getAdmin().getConnection().clearRegionCache(tableName);
|
||||
LOG.info("Deleted " + tableName);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void waitTableNotFound(final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
waitForState(deadlineTs, new WaitForStateCallable() {
|
||||
@Override
|
||||
public boolean checkState(int tries) throws IOException {
|
||||
return !getAdmin().tableExists(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwInterruptedException() throws InterruptedIOException {
|
||||
throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwTimeoutException(long elapsedTime) throws TimeoutException {
|
||||
throw new TimeoutException("Table " + tableName + " not yet deleted after " +
|
||||
elapsedTime + "msec");
|
||||
}
|
||||
});
|
||||
}
|
||||
// Delete cached information to prevent clients from using old locations
|
||||
this.connection.clearRegionCache(tableName);
|
||||
LOG.info("Deleted " + tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3636,7 +3753,7 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Apply the new quota settings.
|
||||
* @param quota the quota settings
|
||||
|
@ -3800,4 +3917,236 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Future that waits on a procedure result.
|
||||
* Returned by the async version of the Admin calls,
|
||||
* and used internally by the sync calls to wait on the result of the procedure.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
protected static class ProcedureFuture<V> implements Future<V> {
|
||||
private ExecutionException exception = null;
|
||||
private boolean procResultFound = false;
|
||||
private boolean done = false;
|
||||
private V result = null;
|
||||
|
||||
private final HBaseAdmin admin;
|
||||
private final Long procId;
|
||||
|
||||
public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
|
||||
this.admin = admin;
|
||||
this.procId = procId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
// TODO: Abort not implemented yet
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get() throws InterruptedException, ExecutionException {
|
||||
// TODO: should we ever spin forever?
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
if (!done) {
|
||||
long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
|
||||
try {
|
||||
try {
|
||||
// if the master support procedures, try to wait the result
|
||||
if (procId != null) {
|
||||
result = waitProcedureResult(procId, deadlineTs);
|
||||
}
|
||||
// if we don't have a proc result, try the compatibility wait
|
||||
if (!procResultFound) {
|
||||
result = waitOperationResult(deadlineTs);
|
||||
}
|
||||
result = postOperationResult(result, deadlineTs);
|
||||
done = true;
|
||||
} catch (IOException e) {
|
||||
result = postOpeartionFailure(e, deadlineTs);
|
||||
done = true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
exception = new ExecutionException(e);
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return done;
|
||||
}
|
||||
|
||||
protected HBaseAdmin getAdmin() {
|
||||
return admin;
|
||||
}
|
||||
|
||||
private V waitProcedureResult(long procId, long deadlineTs)
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
|
||||
.setProcId(procId)
|
||||
.build();
|
||||
|
||||
int tries = 0;
|
||||
IOException serviceEx = null;
|
||||
while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
|
||||
GetProcedureResultResponse response = null;
|
||||
try {
|
||||
// Try to fetch the result
|
||||
response = getProcedureResult(request);
|
||||
} catch (IOException e) {
|
||||
serviceEx = unwrapException(e);
|
||||
|
||||
// the master may be down
|
||||
LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
|
||||
|
||||
// Not much to do, if we have a DoNotRetryIOException
|
||||
if (serviceEx instanceof DoNotRetryIOException) {
|
||||
// TODO: looks like there is no way to unwrap this exception and get the proper
|
||||
// UnsupportedOperationException aside from looking at the message.
|
||||
// anyway, if we fail here we just failover to the compatibility side
|
||||
// and that is always a valid solution.
|
||||
LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
|
||||
procResultFound = false;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// If the procedure is no longer running, we should have a result
|
||||
if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
|
||||
procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
|
||||
return convertResult(response);
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(getAdmin().getPauseTime(tries++));
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedException(
|
||||
"Interrupted while waiting for the result of proc " + procId);
|
||||
}
|
||||
}
|
||||
if (serviceEx != null) {
|
||||
throw serviceEx;
|
||||
} else {
|
||||
throw new TimeoutException("The procedure " + procId + " is still running");
|
||||
}
|
||||
}
|
||||
|
||||
private static IOException unwrapException(IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
return ((RemoteException)e).unwrapRemoteException();
|
||||
}
|
||||
return e;
|
||||
}
|
||||
|
||||
protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
|
||||
throws IOException {
|
||||
return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
|
||||
admin.getConnection()) {
|
||||
@Override
|
||||
public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
|
||||
return master.getProcedureResult(null, request);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the procedure result response to a specified type.
|
||||
* @param response the procedure result object to parse
|
||||
* @return the result data of the procedure.
|
||||
*/
|
||||
protected V convertResult(final GetProcedureResultResponse response) throws IOException {
|
||||
if (response.hasException()) {
|
||||
throw ForeignExceptionUtil.toIOException(response.getException());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fallback implementation in case the procedure is not supported by the server.
|
||||
* It should try to wait until the operation is completed.
|
||||
* @param deadlineTs the timestamp after which this method should throw a TimeoutException
|
||||
* @return the result data of the operation
|
||||
*/
|
||||
protected V waitOperationResult(final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the operation is completed and the result fetched.
|
||||
* this allows to perform extra steps after the procedure is completed.
|
||||
* it allows to apply transformations to the result that will be returned by get().
|
||||
* @param result the result of the procedure
|
||||
* @param deadlineTs the timestamp after which this method should throw a TimeoutException
|
||||
* @return the result of the procedure, which may be the same as the passed one
|
||||
*/
|
||||
protected V postOperationResult(final V result, final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after the operation is terminated with a failure.
|
||||
* this allows to perform extra steps after the procedure is terminated.
|
||||
* it allows to apply transformations to the result that will be returned by get().
|
||||
* The default implementation will rethrow the exception
|
||||
* @param exception the exception got from fetching the result
|
||||
* @param deadlineTs the timestamp after which this method should throw a TimeoutException
|
||||
* @return the result of the procedure, which may be the same as the passed one
|
||||
*/
|
||||
protected V postOpeartionFailure(final IOException exception, final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
throw exception;
|
||||
}
|
||||
|
||||
protected interface WaitForStateCallable {
|
||||
boolean checkState(int tries) throws IOException;
|
||||
void throwInterruptedException() throws InterruptedIOException;
|
||||
void throwTimeoutException(long elapsed) throws TimeoutException;
|
||||
}
|
||||
|
||||
protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
|
||||
throws IOException, TimeoutException {
|
||||
int tries = 0;
|
||||
IOException serverEx = null;
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
|
||||
serverEx = null;
|
||||
try {
|
||||
if (callable.checkState(tries)) {
|
||||
return;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
serverEx = e;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(getAdmin().getPauseTime(tries++));
|
||||
} catch (InterruptedException e) {
|
||||
callable.throwInterruptedException();
|
||||
}
|
||||
}
|
||||
if (serverEx != null) {
|
||||
throw unwrapException(serverEx);
|
||||
} else {
|
||||
callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestProcedureFuture {
|
||||
private static class TestFuture extends HBaseAdmin.ProcedureFuture<Void> {
|
||||
private boolean postOperationResultCalled = false;
|
||||
private boolean waitOperationResultCalled = false;
|
||||
private boolean getProcedureResultCalled = false;
|
||||
private boolean convertResultCalled = false;
|
||||
|
||||
public TestFuture(final HBaseAdmin admin, final Long procId) {
|
||||
super(admin, procId);
|
||||
}
|
||||
|
||||
public boolean wasPostOperationResultCalled() {
|
||||
return postOperationResultCalled;
|
||||
}
|
||||
|
||||
public boolean wasWaitOperationResultCalled() {
|
||||
return waitOperationResultCalled;
|
||||
}
|
||||
|
||||
public boolean wasGetProcedureResultCalled() {
|
||||
return getProcedureResultCalled;
|
||||
}
|
||||
|
||||
public boolean wasConvertResultCalled() {
|
||||
return convertResultCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GetProcedureResultResponse getProcedureResult(
|
||||
final GetProcedureResultRequest request) throws IOException {
|
||||
getProcedureResultCalled = true;
|
||||
return GetProcedureResultResponse.newBuilder()
|
||||
.setState(GetProcedureResultResponse.State.FINISHED)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Void convertResult(final GetProcedureResultResponse response) throws IOException {
|
||||
convertResultCalled = true;
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Void waitOperationResult(final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
waitOperationResultCalled = true;
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Void postOperationResult(final Void result, final long deadlineTs)
|
||||
throws IOException, TimeoutException {
|
||||
postOperationResultCalled = true;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When a master return a result with procId,
|
||||
* we are skipping the waitOperationResult() call,
|
||||
* since we are getting the procedure result.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testWithProcId() throws Exception {
|
||||
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
|
||||
TestFuture f = new TestFuture(admin, 100L);
|
||||
f.get(1, TimeUnit.MINUTES);
|
||||
|
||||
assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
|
||||
assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
|
||||
assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
|
||||
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the spin loop for the procedure running works.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testWithProcIdAndSpinning() throws Exception {
|
||||
final AtomicInteger spinCount = new AtomicInteger(0);
|
||||
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
|
||||
TestFuture f = new TestFuture(admin, 100L) {
|
||||
@Override
|
||||
protected GetProcedureResultResponse getProcedureResult(
|
||||
final GetProcedureResultRequest request) throws IOException {
|
||||
boolean done = spinCount.incrementAndGet() >= 10;
|
||||
return GetProcedureResultResponse.newBuilder()
|
||||
.setState(done ? GetProcedureResultResponse.State.FINISHED :
|
||||
GetProcedureResultResponse.State.RUNNING)
|
||||
.build();
|
||||
}
|
||||
};
|
||||
f.get(1, TimeUnit.MINUTES);
|
||||
|
||||
assertEquals(10, spinCount.get());
|
||||
assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
|
||||
assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
|
||||
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
|
||||
}
|
||||
|
||||
/**
|
||||
* When a master return a result without procId,
|
||||
* we are skipping the getProcedureResult() call.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testWithoutProcId() throws Exception {
|
||||
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
|
||||
TestFuture f = new TestFuture(admin, null);
|
||||
f.get(1, TimeUnit.MINUTES);
|
||||
|
||||
assertFalse("unexpected getProcedureResult() called", f.wasGetProcedureResultCalled());
|
||||
assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
|
||||
assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
|
||||
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
|
||||
}
|
||||
|
||||
/**
|
||||
* When a new client with procedure support tries to ask an old-master without proc-support
|
||||
* the procedure result we get a DoNotRetryIOException (which is an UnsupportedOperationException)
|
||||
* The future should trap that and fallback to the waitOperationResult().
|
||||
*
|
||||
* This happens when the operation calls happens on a "new master" but while we are waiting
|
||||
* the operation to be completed, we failover on an "old master".
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testOnServerWithNoProcedureSupport() throws Exception {
|
||||
HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
|
||||
TestFuture f = new TestFuture(admin, 100L) {
|
||||
@Override
|
||||
protected GetProcedureResultResponse getProcedureResult(
|
||||
final GetProcedureResultRequest request) throws IOException {
|
||||
super.getProcedureResult(request);
|
||||
throw new DoNotRetryIOException(new UnsupportedOperationException("getProcedureResult"));
|
||||
}
|
||||
};
|
||||
f.get(1, TimeUnit.MINUTES);
|
||||
|
||||
assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
|
||||
assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
|
||||
assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
|
||||
assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -28,6 +28,7 @@ option optimize_for = SPEED;
|
|||
import "HBase.proto";
|
||||
import "Client.proto";
|
||||
import "ClusterStatus.proto";
|
||||
import "ErrorHandling.proto";
|
||||
import "Quota.proto";
|
||||
|
||||
/* Column-level protobufs */
|
||||
|
@ -108,6 +109,7 @@ message CreateTableRequest {
|
|||
}
|
||||
|
||||
message CreateTableResponse {
|
||||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message DeleteTableRequest {
|
||||
|
@ -115,6 +117,7 @@ message DeleteTableRequest {
|
|||
}
|
||||
|
||||
message DeleteTableResponse {
|
||||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message TruncateTableRequest {
|
||||
|
@ -372,6 +375,24 @@ message IsProcedureDoneResponse {
|
|||
optional ProcedureDescription snapshot = 2;
|
||||
}
|
||||
|
||||
message GetProcedureResultRequest {
|
||||
required uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message GetProcedureResultResponse {
|
||||
enum State {
|
||||
NOT_FOUND = 0;
|
||||
RUNNING = 1;
|
||||
FINISHED = 2;
|
||||
}
|
||||
|
||||
required State state = 1;
|
||||
optional uint64 start_time = 2;
|
||||
optional uint64 last_update = 3;
|
||||
optional bytes result = 4;
|
||||
optional ForeignExceptionMessage exception = 5;
|
||||
}
|
||||
|
||||
message SetQuotaRequest {
|
||||
optional string user_name = 1;
|
||||
optional string user_group = 2;
|
||||
|
@ -622,4 +643,7 @@ service MasterService {
|
|||
/** Returns the timestamp of the last major compaction */
|
||||
rpc getLastMajorCompactionTimestampForRegion(MajorCompactionTimestampForRegionRequest)
|
||||
returns(MajorCompactionTimestampResponse);
|
||||
|
||||
rpc getProcedureResult(GetProcedureResultRequest)
|
||||
returns(GetProcedureResultResponse);
|
||||
}
|
||||
|
|
|
@ -1360,7 +1360,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createTable(HTableDescriptor hTableDescriptor,
|
||||
public long createTable(HTableDescriptor hTableDescriptor,
|
||||
byte [][] splitKeys) throws IOException {
|
||||
if (isStopped()) {
|
||||
throw new MasterNotRunningException();
|
||||
|
@ -1391,9 +1391,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
cpHost.postCreateTable(hTableDescriptor, newRegions);
|
||||
}
|
||||
|
||||
// TODO: change the interface to return the procId,
|
||||
// and add it to the response protobuf.
|
||||
//return procId;
|
||||
return procId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1604,7 +1602,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteTable(final TableName tableName) throws IOException {
|
||||
public long deleteTable(final TableName tableName) throws IOException {
|
||||
checkInitialized();
|
||||
if (cpHost != null) {
|
||||
cpHost.preDeleteTable(tableName);
|
||||
|
@ -1621,9 +1619,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
cpHost.postDeleteTable(tableName);
|
||||
}
|
||||
|
||||
// TODO: change the interface to return the procId,
|
||||
// and add it to the response protobuf.
|
||||
//return procId;
|
||||
return procId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureResult;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
|
@ -85,6 +87,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
|
||||
|
@ -157,6 +161,7 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
|||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
|
@ -404,11 +409,11 @@ public class MasterRpcServices extends RSRpcServices
|
|||
HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
|
||||
byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
|
||||
try {
|
||||
master.createTable(hTableDescriptor, splitKeys);
|
||||
long procId = master.createTable(hTableDescriptor, splitKeys);
|
||||
return CreateTableResponse.newBuilder().setProcId(procId).build();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
return CreateTableResponse.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -460,11 +465,11 @@ public class MasterRpcServices extends RSRpcServices
|
|||
public DeleteTableResponse deleteTable(RpcController controller,
|
||||
DeleteTableRequest request) throws ServiceException {
|
||||
try {
|
||||
master.deleteTable(ProtobufUtil.toTableName(request.getTableName()));
|
||||
long procId = master.deleteTable(ProtobufUtil.toTableName(request.getTableName()));
|
||||
return DeleteTableResponse.newBuilder().setProcId(procId).build();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
return DeleteTableResponse.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -943,6 +948,44 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetProcedureResultResponse getProcedureResult(RpcController controller,
|
||||
GetProcedureResultRequest request) throws ServiceException {
|
||||
LOG.debug("Checking to see if procedure is done procId=" + request.getProcId());
|
||||
try {
|
||||
master.checkInitialized();
|
||||
GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder();
|
||||
|
||||
Pair<ProcedureResult, Procedure> v = master.getMasterProcedureExecutor()
|
||||
.getResultOrProcedure(request.getProcId());
|
||||
if (v.getFirst() != null) {
|
||||
ProcedureResult result = v.getFirst();
|
||||
builder.setState(GetProcedureResultResponse.State.FINISHED);
|
||||
builder.setStartTime(result.getStartTime());
|
||||
builder.setLastUpdate(result.getLastUpdate());
|
||||
if (result.isFailed()) {
|
||||
builder.setException(result.getException().convert());
|
||||
}
|
||||
if (result.hasResultData()) {
|
||||
builder.setResult(ByteStringer.wrap(result.getResult()));
|
||||
}
|
||||
master.getMasterProcedureExecutor().removeResult(request.getProcId());
|
||||
} else {
|
||||
Procedure proc = v.getSecond();
|
||||
if (proc == null) {
|
||||
builder.setState(GetProcedureResultResponse.State.NOT_FOUND);
|
||||
} else {
|
||||
builder.setState(GetProcedureResultResponse.State.RUNNING);
|
||||
builder.setStartTime(proc.getStartTime());
|
||||
builder.setLastUpdate(proc.getLastUpdate());
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController c,
|
||||
ListNamespaceDescriptorsRequest request) throws ServiceException {
|
||||
|
|
|
@ -100,7 +100,7 @@ public interface MasterServices extends Server {
|
|||
* @param splitKeys Starting row keys for the initial table regions. If null
|
||||
* a single region is created.
|
||||
*/
|
||||
void createTable(HTableDescriptor desc, byte[][] splitKeys)
|
||||
long createTable(HTableDescriptor desc, byte[][] splitKeys)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -108,7 +108,7 @@ public interface MasterServices extends Server {
|
|||
* @param tableName The table name
|
||||
* @throws IOException
|
||||
*/
|
||||
void deleteTable(final TableName tableName) throws IOException;
|
||||
long deleteTable(final TableName tableName) throws IOException;
|
||||
|
||||
/**
|
||||
* Truncate a table
|
||||
|
|
|
@ -125,6 +125,7 @@ public class DeleteTableProcedure
|
|||
LOG.debug("delete '" + getTableName() + "' from filesystem");
|
||||
DeleteTableProcedure.deleteFromFs(env, getTableName(), regions, true);
|
||||
setNextState(DeleteTableState.DELETE_TABLE_UPDATE_DESC_CACHE);
|
||||
regions = null;
|
||||
break;
|
||||
case DELETE_TABLE_UPDATE_DESC_CACHE:
|
||||
LOG.debug("delete '" + getTableName() + "' descriptor");
|
||||
|
|
|
@ -224,9 +224,10 @@ public class TestCatalogJanitor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createTable(HTableDescriptor desc, byte[][] splitKeys)
|
||||
public long createTable(HTableDescriptor desc, byte[][] splitKeys)
|
||||
throws IOException {
|
||||
// no-op
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -408,7 +409,9 @@ public class TestCatalogJanitor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteTable(TableName tableName) throws IOException { }
|
||||
public long deleteTable(TableName tableName) throws IOException {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void truncateTable(TableName tableName, boolean preserveSplits) throws IOException { }
|
||||
|
|
|
@ -172,6 +172,7 @@ public class TestHBaseFsck {
|
|||
conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
|
||||
conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
|
||||
conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
|
||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
|
||||
tableExecutorService = new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS,
|
||||
|
|
Loading…
Reference in New Issue