HBASE-17497 Add first async MetaTableAccessor impl and Implement tableExists method

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Guanghao Zhang 2017-01-19 18:35:19 +08:00 committed by Michael Stack
parent 31f3e8f833
commit 2ee3c73f76
4 changed files with 144 additions and 0 deletions

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RawAsyncTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* The asynchronous meta table accessor. Used to read/write region and assignment information store
* in <code>hbase:meta</code>.
*/
@InterfaceAudience.Private
public class AsyncMetaTableAccessor {
private static final Log LOG = LogFactory.getLog(AsyncMetaTableAccessor.class);
private static CompletableFuture<RawAsyncTable> getMetaTable(AsyncConnection conn) {
return CompletableFuture.completedFuture(conn.getRawTable(META_TABLE_NAME));
}
public static CompletableFuture<Boolean> tableExists(AsyncConnection conn, TableName tableName) {
if (tableName.equals(META_TABLE_NAME)) {
return CompletableFuture.completedFuture(true);
}
return getTableState(conn, tableName).thenApply(Optional::isPresent);
}
private static CompletableFuture<Optional<TableState>> getTableState(AsyncConnection conn,
TableName tableName) {
CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
getMetaTable(conn).thenAccept((metaTable) -> {
Get get = new Get(tableName.getName()).addColumn(getTableFamily(), getStateColumn());
long time = EnvironmentEdgeManager.currentTime();
try {
get.setTimeRange(0, time);
} catch (IOException ioe) {
future.completeExceptionally(ioe);
return;
}
metaTable.get(get).whenComplete((result, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
try {
future.complete(getTableState(result));
} catch (IOException e) {
future.completeExceptionally(e);
}
});
});
return future;
}
private static Optional<TableState> getTableState(Result r) throws IOException {
Cell cell = r.getColumnLatestCell(getTableFamily(), getStateColumn());
if (cell == null) return Optional.empty();
try {
return Optional.of(TableState.parseFrom(
TableName.valueOf(r.getRow()),
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset()
+ cell.getValueLength())));
} catch (DeserializationException e) {
throw new IOException("Failed to parse table state from result: " + r, e);
}
}
/**
* Returns the column family used for table columns.
* @return HConstants.TABLE_FAMILY.
*/
private static byte[] getTableFamily() {
return HConstants.TABLE_FAMILY;
}
/**
* Returns the column qualifier for serialized table state
* @return HConstants.TABLE_STATE_QUALIFIER
*/
private static byte[] getStateColumn() {
return HConstants.TABLE_STATE_QUALIFIER;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -30,6 +31,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceStability.Unstable
public interface AsyncAdmin {
/**
* @param tableName Table to check.
* @return True if table exists already. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> tableExists(final TableName tableName);
/**
* Turn the load balancer on or off.
* @param on

View File

@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
@ -141,4 +143,9 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
(s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
.call();
}
@Override
public CompletableFuture<Boolean> tableExists(TableName tableName) {
return AsyncMetaTableAccessor.tableExists(connection, tableName);
}
}

View File

@ -23,8 +23,10 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -39,6 +41,7 @@ public class TestAsyncAdmin {
private static final Log LOG = LogFactory.getLog(TestAdmin1.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static AsyncConnection ASYNC_CONN;
private AsyncAdmin admin;
@ -63,6 +66,19 @@ public class TestAsyncAdmin {
this.admin = ASYNC_CONN.getAdmin();
}
@Test
public void testTableExist() throws Exception {
final TableName table = TableName.valueOf("testTableExist");
boolean exist;
exist = admin.tableExists(table).get();
assertEquals(false, exist);
TEST_UTIL.createTable(table, FAMILY);
exist = admin.tableExists(table).get();
assertEquals(true, exist);
exist = admin.tableExists(TableName.META_TABLE_NAME).get();
assertEquals(true, exist);
}
@Test(timeout = 30000)
public void testBalancer() throws Exception {
boolean initialState = admin.isBalancerEnabled().get();