From 7465973068f45a78d66394cad7c0858bfeda1b46 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 30 Aug 2017 14:06:04 +0800 Subject: [PATCH] HBASE-15806 An endpoint-based export tool --- hbase-endpoint/pom.xml | 17 + .../hadoop/hbase/coprocessor/Export.java | 538 ++++++++++++++++++ hbase-endpoint/src/main/protobuf/Export.proto | 45 ++ .../hbase/coprocessor/TestImportExport.java | 56 ++ .../hbase/coprocessor/TestSecureExport.java | 439 ++++++++++++++ .../client/example/ExportEndpointExample.java | 86 +++ .../apache/hadoop/hbase/mapreduce/Export.java | 131 +---- .../hadoop/hbase/mapreduce/ExportUtils.java | 175 ++++++ .../hbase/mapreduce/TestImportExport.java | 202 ++++--- .../hbase/security/access/SecureTestUtil.java | 2 +- 10 files changed, 1513 insertions(+), 178 deletions(-) create mode 100644 hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java create mode 100644 hbase-endpoint/src/main/protobuf/Export.proto create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml index 29bd33b20f8..e37272f0527 100644 --- a/hbase-endpoint/pom.xml +++ b/hbase-endpoint/pom.xml @@ -166,6 +166,23 @@ test-jar test + + + + org.apache.hbase + hbase-mapreduce + + + org.apache.hbase + hbase-mapreduce + test-jar + test + + + org.apache.hadoop + hadoop-minikdc + test + commons-logging diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java new file mode 100644 index 00000000000..7a7cc00aa1a --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import java.io.Closeable; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.mapreduce.ExportUtils; +import org.apache.hadoop.hbase.mapreduce.Import; +import org.apache.hadoop.hbase.mapreduce.ResultSerialization; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken; +import org.apache.hadoop.hbase.protobuf.generated.ExportProtos; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.FsDelegationToken; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.util.ArrayUtils; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Triple; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Export an HBase table. Writes content to sequence files up in HDFS. Use + * {@link Import} to read it back in again. It is implemented by the endpoint + * technique. + * + * @see org.apache.hadoop.hbase.mapreduce.Export + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public class Export extends ExportProtos.ExportService + implements Coprocessor, CoprocessorService { + + private static final Log LOG = LogFactory.getLog(Export.class); + private static final Class DEFAULT_CODEC = DefaultCodec.class; + private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD; + private RegionCoprocessorEnvironment env = null; + private UserProvider userProvider; + + public static void main(String[] args) throws Throwable { + Map response = run(HBaseConfiguration.create(), args); + System.exit(response == null ? -1 : 0); + } + + @VisibleForTesting + static Map run(final Configuration conf, final String[] args) throws Throwable { + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (!ExportUtils.isValidArguements(args)) { + ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(otherArgs)); + return null; + } + Triple arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs); + return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird()); + } + + public static Map run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable { + FileSystem fs = dir.getFileSystem(conf); + UserProvider userProvider = UserProvider.instantiate(conf); + checkDir(fs, dir); + FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + fsDelegationToken.acquireDelegationToken(fs); + try { + final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir, + scan, fsDelegationToken.getUserToken()); + try (Connection con = ConnectionFactory.createConnection(conf); + Table table = con.getTable(tableName)) { + Map result = new TreeMap<>(Bytes.BYTES_COMPARATOR); + table.coprocessorService(ExportProtos.ExportService.class, + scan.getStartRow(), + scan.getStopRow(), + (ExportProtos.ExportService service) -> { + ServerRpcController controller = new ServerRpcController(); + Map rval = new TreeMap<>(Bytes.BYTES_COMPARATOR); + CoprocessorRpcUtils.BlockingRpcCallback + rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); + service.export(controller, request, rpcCallback); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + }).forEach((k, v) -> result.put(k, new Response(v))); + return result; + } catch (Throwable e) { + fs.delete(dir, true); + throw e; + } + } finally { + fsDelegationToken.releaseDelegationToken(); + } + } + + private static boolean getCompression(final ExportProtos.ExportRequest request) { + if (request.hasCompressed()) { + return request.getCompressed(); + } else { + return false; + } + } + + private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) { + if (request.hasCompressType()) { + return SequenceFile.CompressionType.valueOf(request.getCompressType()); + } else { + return DEFAULT_TYPE; + } + } + + private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) { + try { + Class codecClass; + if (request.hasCompressCodec()) { + codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class); + } else { + codecClass = DEFAULT_CODEC; + } + return ReflectionUtils.newInstance(codecClass, conf); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Compression codec " + + request.getCompressCodec() + " was not found.", e); + } + } + + private static SequenceFile.Writer.Option getOutputPath(final Configuration conf, + final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException { + Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName()); + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + throw new IOException(file + " exists"); + } + return SequenceFile.Writer.file(file); + } + + private static List getWriterOptions(final Configuration conf, + final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException { + List rval = new LinkedList<>(); + rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class)); + rval.add(SequenceFile.Writer.valueClass(Result.class)); + rval.add(getOutputPath(conf, info, request)); + if (getCompression(request)) { + rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request))); + } else { + rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); + } + return rval; + } + + private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf, + final UserProvider userProvider, final Scan scan, final Token userToken, + final List opts) throws IOException { + ScanCoprocessor cp = new ScanCoprocessor(region); + RegionScanner scanner = null; + try (RegionOp regionOp = new RegionOp(region); + SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) { + scanner = cp.checkScannerOpen(scan); + ImmutableBytesWritable key = new ImmutableBytesWritable(); + long rowCount = 0; + long cellCount = 0; + List results = new ArrayList<>(); + List cells = new ArrayList<>(); + boolean hasMore; + do { + boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch()); + if (bypass) { + hasMore = false; + } else { + hasMore = scanner.nextRaw(cells); + if (cells.isEmpty()) { + continue; + } + Cell firstCell = cells.get(0); + for (Cell cell : cells) { + if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(), + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) { + throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??" + + " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength()) + + ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + } + results.add(Result.create(cells)); + cells.clear(); + cp.postScannerNext(scanner, results, scan.getBatch(), hasMore); + } + for (Result r : results) { + key.set(r.getRow()); + out.append(key, r); + ++rowCount; + cellCount += r.size(); + } + results.clear(); + } while (hasMore); + return ExportProtos.ExportResponse.newBuilder() + .setRowCount(rowCount) + .setCellCount(cellCount) + .build(); + } finally { + cp.checkScannerClose(scanner); + } + } + + private static void checkDir(final FileSystem fs, final Path dir) throws IOException { + if (fs.exists(dir)) { + throw new RuntimeException("The " + dir + " exists"); + } + if (!fs.mkdirs(dir)) { + throw new IOException("Failed to create the " + dir); + } + } + + private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf, + Path dir, final Scan scan, final Token userToken) throws IOException { + boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false); + String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE, + DEFAULT_TYPE.toString()); + String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC, + DEFAULT_CODEC.getName()); + DelegationToken protoToken = null; + if (userToken != null) { + protoToken = DelegationToken.newBuilder() + .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) + .setPassword(ByteStringer.wrap(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); + } + LOG.info("compressed=" + compressed + + ", compression type=" + compressionType + + ", compression codec=" + compressionCodec + + ", userToken=" + userToken); + ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder() + .setScan(ProtobufUtil.toScan(scan)) + .setOutputPath(dir.toString()) + .setCompressed(compressed) + .setCompressCodec(compressionCodec) + .setCompressType(compressionType); + if (protoToken != null) { + builder.setFsToken(protoToken); + } + return builder.build(); + } + + + @Override + public void start(CoprocessorEnvironment environment) throws IOException { + if (environment instanceof RegionCoprocessorEnvironment) { + env = (RegionCoprocessorEnvironment) environment; + userProvider = UserProvider.instantiate(env.getConfiguration()); + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } + + @Override + public Service getService() { + return this; + } + + @Override + public void export(RpcController controller, ExportProtos.ExportRequest request, + RpcCallback done) { + Region region = env.getRegion(); + Configuration conf = HBaseConfiguration.create(env.getConfiguration()); + conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName()); + try { + Scan scan = validateKey(region.getRegionInfo(), request); + Token userToken = null; + if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) { + LOG.warn("Hadoop security is enable, but no found of user token"); + } else if (userProvider.isHadoopSecurityEnabled()) { + userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), + request.getFsToken().getPassword().toByteArray(), + new Text(request.getFsToken().getKind()), + new Text(request.getFsToken().getService())); + } + ExportProtos.ExportResponse response = processData(region, conf, userProvider, + scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request)); + done.run(response); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + LOG.error(e); + } + } + + private Scan validateKey(final HRegionInfo region, final ExportProtos.ExportRequest request) throws IOException { + Scan scan = ProtobufUtil.toScan(request.getScan()); + byte[] regionStartKey = region.getStartKey(); + byte[] originStartKey = scan.getStartRow(); + if (originStartKey == null + || Bytes.compareTo(originStartKey, regionStartKey) < 0) { + scan.setStartRow(regionStartKey); + } + byte[] regionEndKey = region.getEndKey(); + byte[] originEndKey = scan.getStopRow(); + if (originEndKey == null + || Bytes.compareTo(originEndKey, regionEndKey) > 0) { + scan.setStartRow(regionEndKey); + } + return scan; + } + + private static class RegionOp implements Closeable { + + private final Region region; + + RegionOp(final Region region) throws IOException { + this.region = region; + region.startRegionOperation(); + } + + @Override + public void close() throws IOException { + region.closeRegionOperation(); + } + } + + private static class ScanCoprocessor { + + private final Region region; + + ScanCoprocessor(final Region region) { + this.region = region; + } + + RegionScanner checkScannerOpen(final Scan scan) throws IOException { + RegionScanner scanner; + if (region.getCoprocessorHost() == null) { + scanner = region.getScanner(scan); + } else { + scanner = region.getCoprocessorHost().preScannerOpen(scan); + if (scanner == null) { + scanner = region.getScanner(scan); + } + scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); + } + if (scanner == null) { + throw new IOException("Failed to open region scanner"); + } + return scanner; + } + + void checkScannerClose(final InternalScanner s) throws IOException { + if (s == null) { + return; + } + if (region.getCoprocessorHost() == null) { + s.close(); + return; + } + if (region.getCoprocessorHost().preScannerClose(s)) { + return; + } + try { + s.close(); + } finally { + region.getCoprocessorHost().postScannerClose(s); + } + } + + boolean preScannerNext(final InternalScanner s, + final List results, final int limit) throws IOException { + if (region.getCoprocessorHost() == null) { + return false; + } else { + Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, limit); + return bypass == null ? false : bypass; + } + } + + boolean postScannerNext(final InternalScanner s, + final List results, final int limit, boolean hasMore) + throws IOException { + if (region.getCoprocessorHost() == null) { + return false; + } else { + return region.getCoprocessorHost().postScannerNext(s, results, limit, hasMore); + } + } + } + + private static class SecureWriter implements Closeable { + private final PrivilegedWriter privilegedWriter; + + SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken, + final List opts) throws IOException { + privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken), + SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()]))); + } + + void append(final Object key, final Object value) throws IOException { + privilegedWriter.append(key, value); + } + + private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException { + User user = RpcServer.getRequestUser(); + if (user == null) { + user = userProvider.getCurrent(); + } + if (user == null && userToken != null) { + LOG.warn("No found of user credentials, but a token was got from user request"); + } else if (user != null && userToken != null) { + user.addToken(userToken); + } + return user; + } + + @Override + public void close() throws IOException { + privilegedWriter.close(); + } + } + + private static class PrivilegedWriter implements PrivilegedExceptionAction, Closeable { + private final User user; + private final SequenceFile.Writer out; + private Object key; + private Object value; + + PrivilegedWriter(final User user, final SequenceFile.Writer out) { + this.user = user; + this.out = out; + } + + void append(final Object key, final Object value) throws IOException { + if (user == null) { + out.append(key, value); + } else { + this.key = key; + this.value = value; + try { + user.runAs(this); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + } + + @Override + public Boolean run() throws Exception { + out.append(key, value); + return true; + } + + @Override + public void close() throws IOException { + out.close(); + } + } + + public static class Response { + + private final long rowCount; + private final long cellCount; + + private Response(ExportProtos.ExportResponse r) { + this.rowCount = r.getRowCount(); + this.cellCount = r.getCellCount(); + } + + public long getRowCount() { + return rowCount; + } + + public long getCellCount() { + return cellCount; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(35); + return builder.append("rowCount=") + .append(rowCount) + .append(", cellCount=") + .append(cellCount) + .toString(); + } + } +} diff --git a/hbase-endpoint/src/main/protobuf/Export.proto b/hbase-endpoint/src/main/protobuf/Export.proto new file mode 100644 index 00000000000..5e6c262e8ee --- /dev/null +++ b/hbase-endpoint/src/main/protobuf/Export.proto @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "ExportProtos"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; +option java_generic_services = true; + +import "Client.proto"; + +service ExportService { + rpc export (ExportRequest) returns (ExportResponse); +} + +message ExportRequest { + required Scan scan = 1; + required string outputPath = 2; + optional bool compressed = 3 [default = false]; + optional string compressType = 4; + optional string compressCodec = 5; + optional DelegationToken fsToken = 6; +} +message ExportResponse { + required uint64 rowCount = 1; + required uint64 cellCount = 2; +} + diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java new file mode 100644 index 00000000000..e0d4fd2a201 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class}) +public class TestImportExport extends org.apache.hadoop.hbase.mapreduce.TestImportExport { + + @BeforeClass + public static void beforeClass() throws Throwable { + UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + org.apache.hadoop.hbase.coprocessor.Export.class.getName()); + org.apache.hadoop.hbase.mapreduce.TestImportExport.beforeClass(); + } + + @Override + protected boolean runExport(String[] args) throws Throwable { + Export.run(new Configuration(UTIL.getConfiguration()), args); + return true; + } + + @Override + protected void runExportMain(String[] args) throws Throwable { + Export.main(args); + } + + /** + * Skip the test which is unrelated to the coprocessor.Export. + */ + @Test + @Ignore + public void testImport94Table() throws Throwable { + } +} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java new file mode 100644 index 00000000000..66d99dde890 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import com.google.protobuf.ServiceException; +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; +import org.apache.hadoop.hbase.mapreduce.ExportUtils; +import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting; +import org.apache.hadoop.hbase.mapreduce.Import; +import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos; +import org.apache.hadoop.hbase.security.HBaseKerberosUtils; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessControlConstants; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; +import org.apache.hadoop.hbase.security.access.SecureTestUtil.AccessTestAction; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; +import org.apache.hadoop.hbase.security.visibility.VisibilityClient; +import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; +import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({MediumTests.class}) +public class TestSecureExport { + private static final Log LOG = LogFactory.getLog(TestSecureExport.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static MiniKdc KDC; + private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); + private static String USERNAME; + private static String SERVER_PRINCIPAL; + private static String HTTP_PRINCIPAL; + private static final String FAMILYA_STRING = "fma"; + private static final String FAMILYB_STRING = "fma"; + private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); + private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] ROW3 = Bytes.toBytes("row3"); + private static final byte[] QUAL = Bytes.toBytes("qual"); + private static final String LOCALHOST = "localhost"; + private static final long NOW = System.currentTimeMillis(); + // user granted with all global permission + private static final String USER_ADMIN = "admin"; + // user is table owner. will have all permissions on table + private static final String USER_OWNER = "owner"; + // user with rx permissions. + private static final String USER_RX = "rxuser"; + // user with exe-only permissions. + private static final String USER_XO = "xouser"; + // user with read-only permissions. + private static final String USER_RO = "rouser"; + // user with no permissions + private static final String USER_NONE = "noneuser"; + private static final String PRIVATE = "private"; + private static final String CONFIDENTIAL = "confidential"; + private static final String SECRET = "secret"; + private static final String TOPSECRET = "topsecret"; + @Rule + public final TestName name = new TestName(); + private static void setUpKdcServer() throws Exception { + Properties conf = MiniKdc.createConf(); + conf.put(MiniKdc.DEBUG, true); + File kdcFile = new File(UTIL.getDataTestDir("kdc").toUri().getPath()); + KDC = new MiniKdc(conf, kdcFile); + KDC.start(); + USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); + SERVER_PRINCIPAL = USERNAME + "/" + LOCALHOST; + HTTP_PRINCIPAL = "HTTP/" + LOCALHOST; + KDC.createPrincipal(KEYTAB_FILE, + SERVER_PRINCIPAL, + HTTP_PRINCIPAL, + USER_ADMIN + "/" + LOCALHOST, + USER_OWNER + "/" + LOCALHOST, + USER_RX + "/" + LOCALHOST, + USER_RO + "/" + LOCALHOST, + USER_XO + "/" + LOCALHOST, + USER_NONE + "/" + LOCALHOST); + } + private static User getUserByLogin(final String user) throws IOException { + return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrinciple(user), KEYTAB_FILE.getAbsolutePath())); + } + private static String getPrinciple(final String user) { + return user + "/" + LOCALHOST + "@" + KDC.getRealm(); + } + private static void setUpClusterKdc() throws Exception { + HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); + HBaseKerberosUtils.setPrincipalForTesting(SERVER_PRINCIPAL + "@" + KDC.getRealm()); + HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration()); + // if we drop support for hadoop-2.4.0 and hadoop-2.4.1, + // the following key should be changed. + // 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY + // 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY + UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + // set yarn principal + UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0"); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0"); + + File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath()); + keystoresDir.mkdirs(); + String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureExport.class); + KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, UTIL.getConfiguration(), false); + + UTIL.getConfiguration().setBoolean("ignore.secure.ports.for.testing", true); + UserGroupInformation.setConfiguration(UTIL.getConfiguration()); + UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, UTIL.getConfiguration().get( + CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName()); + } + private static void addLabels(final Configuration conf, final List users, final List labels) throws Exception { + PrivilegedExceptionAction action + = () -> { + try (Connection conn = ConnectionFactory.createConnection(conf)) { + VisibilityClient.addLabels(conn, labels.toArray(new String[labels.size()])); + for (String user : users) { + VisibilityClient.setAuths(conn, labels.toArray(new String[labels.size()]), user); + } + } catch (Throwable t) { + throw new IOException(t); + } + return null; + }; + getUserByLogin(USER_ADMIN).runAs(action); + } + + @Before + public void announce() { + LOG.info("Running " + name.getMethodName()); + } + + @After + public void cleanup() throws IOException { + } + private static void clearOutput(Path path) throws IOException { + FileSystem fs = path.getFileSystem(UTIL.getConfiguration()); + if (fs.exists(path)) { + assertEquals(true, fs.delete(path, true)); + } + } + /** + * Sets the security firstly for getting the correct default realm. + * @throws Exception + */ + @BeforeClass + public static void beforeClass() throws Exception { + UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), HadoopSecurityEnabledUserProviderForTesting.class); + setUpKdcServer(); + SecureTestUtil.enableSecurity(UTIL.getConfiguration()); + UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true); + VisibilityTestUtil.enableVisiblityLabels(UTIL.getConfiguration()); + SecureTestUtil.verifyConfiguration(UTIL.getConfiguration()); + setUpClusterKdc(); + UTIL.startMiniCluster(); + UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME); + UTIL.waitUntilAllRegionsAssigned(VisibilityConstants.LABELS_TABLE_NAME); + UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME, 50000); + UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME, 50000); + SecureTestUtil.grantGlobal(UTIL, USER_ADMIN, + Permission.Action.ADMIN, + Permission.Action.CREATE, + Permission.Action.EXEC, + Permission.Action.READ, + Permission.Action.WRITE); + addLabels(UTIL.getConfiguration(), Arrays.asList(USER_OWNER), + Arrays.asList(PRIVATE, CONFIDENTIAL, SECRET, TOPSECRET)); + } + + @AfterClass + public static void afterClass() throws Exception { + if (KDC != null) { + KDC.stop(); + } + UTIL.shutdownMiniCluster(); + } + + /** + * Test the ExportEndpoint's access levels. The {@link Export} test is ignored + * since the access exceptions cannot be collected from the mappers. + * + * @throws java.io.IOException + */ + @Test + public void testAccessCase() throws IOException, Throwable { + final String exportTable = name.getMethodName(); + TableDescriptor exportHtd = TableDescriptorBuilder + .newBuilder(TableName.valueOf(name.getMethodName())) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA)) + .setOwnerString(USER_OWNER) + .build(); + SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")}); + SecureTestUtil.grantOnTable(UTIL, USER_RO, + TableName.valueOf(exportTable), null, null, + Permission.Action.READ); + SecureTestUtil.grantOnTable(UTIL, USER_RX, + TableName.valueOf(exportTable), null, null, + Permission.Action.READ, + Permission.Action.EXEC); + SecureTestUtil.grantOnTable(UTIL, USER_XO, + TableName.valueOf(exportTable), null, null, + Permission.Action.EXEC); + assertEquals(4, AccessControlLists.getTablePermissions(UTIL.getConfiguration(), + TableName.valueOf(exportTable)).size()); + AccessTestAction putAction = () -> { + Put p = new Put(ROW1); + p.addColumn(FAMILYA, Bytes.toBytes("qual_0"), NOW, QUAL); + p.addColumn(FAMILYA, Bytes.toBytes("qual_1"), NOW, QUAL); + try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Table t = conn.getTable(TableName.valueOf(exportTable))) { + t.put(p); + } + return null; + }; + // no hdfs access. + SecureTestUtil.verifyAllowed(putAction, + getUserByLogin(USER_ADMIN), + getUserByLogin(USER_OWNER)); + SecureTestUtil.verifyDenied(putAction, + getUserByLogin(USER_RO), + getUserByLogin(USER_XO), + getUserByLogin(USER_RX), + getUserByLogin(USER_NONE)); + + final FileSystem fs = UTIL.getDFSCluster().getFileSystem(); + final Path openDir = fs.makeQualified(new Path("testAccessCase")); + fs.mkdirs(openDir); + fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + final Path output = fs.makeQualified(new Path(openDir, "output")); + AccessTestAction exportAction = () -> { + try { + String[] args = new String[]{exportTable, output.toString()}; + Map result + = Export.run(new Configuration(UTIL.getConfiguration()), args); + long rowCount = 0; + long cellCount = 0; + for (Export.Response r : result.values()) { + rowCount += r.getRowCount(); + cellCount += r.getCellCount(); + } + assertEquals(1, rowCount); + assertEquals(2, cellCount); + return null; + } catch (ServiceException | IOException ex) { + throw ex; + } catch (Throwable ex) { + LOG.error(ex); + throw new Exception(ex); + } finally { + clearOutput(output); + } + }; + SecureTestUtil.verifyDenied(exportAction, + getUserByLogin(USER_RO), + getUserByLogin(USER_XO), + getUserByLogin(USER_NONE)); + SecureTestUtil.verifyAllowed(exportAction, + getUserByLogin(USER_ADMIN), + getUserByLogin(USER_OWNER), + getUserByLogin(USER_RX)); + AccessTestAction deleteAction = () -> { + UTIL.deleteTable(TableName.valueOf(exportTable)); + return null; + }; + SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER)); + fs.delete(openDir, true); + } + @Test + public void testVisibilityLabels() throws IOException, Throwable { + final String exportTable = name.getMethodName() + "_export"; + final String importTable = name.getMethodName() + "_import"; + final TableDescriptor exportHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(exportTable)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA)) + .setOwnerString(USER_OWNER) + .build(); + SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")}); + AccessTestAction putAction = () -> { + Put p1 = new Put(ROW1); + p1.addColumn(FAMILYA, QUAL, NOW, QUAL); + p1.setCellVisibility(new CellVisibility(SECRET)); + Put p2 = new Put(ROW2); + p2.addColumn(FAMILYA, QUAL, NOW, QUAL); + p2.setCellVisibility(new CellVisibility(PRIVATE + " & " + CONFIDENTIAL)); + Put p3 = new Put(ROW3); + p3.addColumn(FAMILYA, QUAL, NOW, QUAL); + p3.setCellVisibility(new CellVisibility("!" + CONFIDENTIAL + " & " + TOPSECRET)); + try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Table t = conn.getTable(TableName.valueOf(exportTable))) { + t.put(p1); + t.put(p2); + t.put(p3); + } + return null; + }; + SecureTestUtil.verifyAllowed(putAction, getUserByLogin(USER_OWNER)); + List, Integer>> labelsAndRowCounts = new LinkedList<>(); + labelsAndRowCounts.add(new Pair<>(Arrays.asList(SECRET), 1)); + labelsAndRowCounts.add(new Pair<>(Arrays.asList(PRIVATE, CONFIDENTIAL), 1)); + labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET), 1)); + labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL), 0)); + labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL, PRIVATE, SECRET), 2)); + for (final Pair, Integer> labelsAndRowCount : labelsAndRowCounts) { + final List labels = labelsAndRowCount.getFirst(); + final int rowCount = labelsAndRowCount.getSecond(); + //create a open permission directory. + final Path openDir = new Path("testAccessCase"); + final FileSystem fs = openDir.getFileSystem(UTIL.getConfiguration()); + fs.mkdirs(openDir); + fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + final Path output = fs.makeQualified(new Path(openDir, "output")); + AccessTestAction exportAction = () -> { + StringBuilder buf = new StringBuilder(); + labels.forEach(v -> buf.append(v).append(",")); + buf.deleteCharAt(buf.length() - 1); + try { + String[] args = new String[]{ + "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + buf.toString(), + exportTable, + output.toString(),}; + Export.run(new Configuration(UTIL.getConfiguration()), args); + return null; + } catch (ServiceException | IOException ex) { + throw ex; + } catch (Throwable ex) { + throw new Exception(ex); + } + }; + SecureTestUtil.verifyAllowed(exportAction, getUserByLogin(USER_OWNER)); + final TableDescriptor importHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(importTable)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYB)) + .setOwnerString(USER_OWNER) + .build(); + SecureTestUtil.createTable(UTIL, importHtd, new byte[][]{Bytes.toBytes("s")}); + AccessTestAction importAction = () -> { + String[] args = new String[]{ + "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING, + importTable, + output.toString() + }; + assertEquals(0, ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args)); + return null; + }; + SecureTestUtil.verifyAllowed(importAction, getUserByLogin(USER_OWNER)); + AccessTestAction scanAction = () -> { + Scan scan = new Scan(); + scan.setAuthorizations(new Authorizations(labels)); + try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Table table = conn.getTable(importHtd.getTableName()); + ResultScanner scanner = table.getScanner(scan)) { + int count = 0; + for (Result r : scanner) { + ++count; + } + assertEquals(rowCount, count); + } + return null; + }; + SecureTestUtil.verifyAllowed(scanAction, getUserByLogin(USER_OWNER)); + AccessTestAction deleteAction = () -> { + UTIL.deleteTable(importHtd.getTableName()); + return null; + }; + SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER)); + clearOutput(output); + } + AccessTestAction deleteAction = () -> { + UTIL.deleteTable(exportHtd.getTableName()); + return null; + }; + SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER)); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java new file mode 100644 index 00000000000..cc06844595e --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.Export; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A simple example on how to use {@link org.apache.hadoop.hbase.coprocessor.Export}. + * + *

+ * For the protocol buffer definition of the ExportService, see the source file located under + * hbase-endpoint/src/main/protobuf/Export.proto. + *

+ */ +public class ExportEndpointExample { + + public static void main(String[] args) throws Throwable { + int rowCount = 100; + byte[] family = Bytes.toBytes("family"); + Configuration conf = HBaseConfiguration.create(); + TableName tableName = TableName.valueOf("ExportEndpointExample"); + try (Connection con = ConnectionFactory.createConnection(conf); + Admin admin = con.getAdmin()) { + TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) + // MUST mount the export endpoint + .addCoprocessor(Export.class.getName()) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(family)) + .build(); + admin.createTable(desc); + + List puts = new ArrayList<>(rowCount); + for (int row = 0; row != rowCount; ++row) { + byte[] bs = Bytes.toBytes(row); + Put put = new Put(bs); + put.addColumn(family, bs, bs); + puts.add(put); + } + try (Table table = con.getTable(tableName)) { + table.put(puts); + } + + Path output = new Path("/tmp/ExportEndpointExample_output"); + Scan scan = new Scan(); + Map result = Export.run(conf, tableName, scan, output); + final long totalOutputRows = result.values().stream().mapToLong(v -> v.getRowCount()).sum(); + final long totalOutputCells = result.values().stream().mapToLong(v -> v.getCellCount()).sum(); + System.out.println("table:" + tableName); + System.out.println("output:" + output); + System.out.println("total rows:" + totalOutputRows); + System.out.println("total cells:" + totalOutputCells); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java index de6cf3a88db..b2dc2541383 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -20,23 +20,17 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.IncompatibleFilterException; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.filter.RegexStringComparator; -import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ArrayUtils; +import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; @@ -50,12 +44,8 @@ import org.apache.hadoop.util.ToolRunner; */ @InterfaceAudience.Public public class Export extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(Export.class); - final static String NAME = "export"; - final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; - final static String EXPORT_BATCHING = "hbase.export.scanner.batch"; - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + static final String NAME = "export"; + static final String JOB_NAME_CONF_KEY = "mapreduce.job.name"; /** * Sets up the actual job. @@ -67,13 +57,14 @@ public class Export extends Configured implements Tool { */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { - String tableName = args[0]; - Path outputDir = new Path(args[1]); + Triple arguments = ExportUtils.getArgumentsFromCommandLine(conf, args); + String tableName = arguments.getFirst().getNameAsString(); + Path outputDir = arguments.getThird(); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJobName(NAME + "_" + tableName); job.setJarByClass(Export.class); // Set optional scan parameters - Scan s = getConfiguredScanForJob(conf, args); + Scan s = arguments.getSecond(); IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job); // No reducers. Just write straight to output files. job.setNumReduceTasks(0); @@ -84,101 +75,15 @@ public class Export extends Configured implements Tool { return job; } - private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { - Scan s = new Scan(); - // Optional arguments. - // Set Scan Versions - int versions = args.length > 2? Integer.parseInt(args[2]): 1; - s.setMaxVersions(versions); - // Set Scan Range - long startTime = args.length > 3? Long.parseLong(args[3]): 0L; - long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; - s.setTimeRange(startTime, endTime); - // Set cache blocks - s.setCacheBlocks(false); - // set Start and Stop row - if (conf.get(TableInputFormat.SCAN_ROW_START) != null) { - s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START))); - } - if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) { - s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP))); - } - // Set Scan Column Family - boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); - if (raw) { - s.setRaw(raw); - } - for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) { - s.addFamily(Bytes.toBytes(columnFamily)); - } - // Set RowFilter or Prefix Filter if applicable. - Filter exportFilter = getExportFilter(args); - if (exportFilter!= null) { - LOG.info("Setting Scan Filter for Export."); - s.setFilter(exportFilter); - } - - int batching = conf.getInt(EXPORT_BATCHING, -1); - if (batching != -1){ - try { - s.setBatch(batching); - } catch (IncompatibleFilterException e) { - LOG.error("Batching could not be set", e); - } - } - LOG.info("versions=" + versions + ", starttime=" + startTime + - ", endtime=" + endTime + ", keepDeletedCells=" + raw); - return s; - } - - private static Filter getExportFilter(String[] args) { - Filter exportFilter = null; - String filterCriteria = (args.length > 5) ? args[5]: null; - if (filterCriteria == null) return null; - if (filterCriteria.startsWith("^")) { - String regexPattern = filterCriteria.substring(1, filterCriteria.length()); - exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern)); - } else { - exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); - } - return exportFilter; - } - - /* - * @param errorMsg Error message. Can be null. - */ - private static void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: Export [-D ]* [ " + - "[ []] [^[regex pattern] or [Prefix] to filter]]\n"); - System.err.println(" Note: -D properties will be applied to the conf used. "); - System.err.println(" For example: "); - System.err.println(" -D mapreduce.output.fileoutputformat.compress=true"); - System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec"); - System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK"); - System.err.println(" Additionally, the following SCAN properties can be specified"); - System.err.println(" to control/limit what is exported.."); - System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=,, ..."); - System.err.println(" -D " + RAW_SCAN + "=true"); - System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "="); - System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "="); - System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the export"); - System.err.println("For performance consider the following properties:\n" - + " -Dhbase.client.scanner.caching=100\n" - + " -Dmapreduce.map.speculative=false\n" - + " -Dmapreduce.reduce.speculative=false"); - System.err.println("For tables with very wide rows consider setting the batch size as below:\n" - + " -D" + EXPORT_BATCHING + "=10"); - } - - @Override public int run(String[] args) throws Exception { - if (args.length < 2) { - usage("Wrong number of arguments: " + args.length); + if (!ExportUtils.isValidArguements(args)) { + ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(args)); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the export"); + System.err.println("For MR performance consider the following properties:"); + System.err.println(" -D mapreduce.map.speculative=false"); + System.err.println(" -D mapreduce.reduce.speculative=false"); return -1; } Job job = createSubmittableJob(getConf(), args); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java new file mode 100644 index 00000000000..9cc2a80cc77 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java @@ -0,0 +1,175 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.IncompatibleFilterException; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Triple; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Some helper methods are used by {@link org.apache.hadoop.hbase.mapreduce.Export} + * and org.apache.hadoop.hbase.coprocessor.Export (in hbase-endpooint). + */ +@InterfaceAudience.Private +public final class ExportUtils { + private static final Log LOG = LogFactory.getLog(ExportUtils.class); + public static final String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; + public static final String EXPORT_BATCHING = "hbase.export.scanner.batch"; + public static final String EXPORT_CACHING = "hbase.export.scanner.caching"; + public static final String EXPORT_VISIBILITY_LABELS = "hbase.export.visibility.labels"; + /** + * Common usage for other export tools. + * @param errorMsg Error message. Can be null. + */ + public static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: Export [-D ]* [ " + + "[ []] [^[regex pattern] or [Prefix] to filter]]\n"); + System.err.println(" Note: -D properties will be applied to the conf used. "); + System.err.println(" For example: "); + System.err.println(" -D " + FileOutputFormat.COMPRESS + "=true"); + System.err.println(" -D " + FileOutputFormat.COMPRESS_CODEC + "=org.apache.hadoop.io.compress.GzipCodec"); + System.err.println(" -D " + FileOutputFormat.COMPRESS_TYPE + "=BLOCK"); + System.err.println(" Additionally, the following SCAN properties can be specified"); + System.err.println(" to control/limit what is exported.."); + System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=,, ..."); + System.err.println(" -D " + RAW_SCAN + "=true"); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "="); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "="); + System.err.println(" -D " + HConstants.HBASE_CLIENT_SCANNER_CACHING + "=100"); + System.err.println(" -D " + EXPORT_VISIBILITY_LABELS + "="); + System.err.println("For tables with very wide rows consider setting the batch size as below:\n" + + " -D " + EXPORT_BATCHING + "=10\n" + + " -D " + EXPORT_CACHING + "=100"); + } + + private static Filter getExportFilter(String[] args) { + Filter exportFilter; + String filterCriteria = (args.length > 5) ? args[5]: null; + if (filterCriteria == null) return null; + if (filterCriteria.startsWith("^")) { + String regexPattern = filterCriteria.substring(1, filterCriteria.length()); + exportFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern)); + } else { + exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); + } + return exportFilter; + } + + public static boolean isValidArguements(String[] args) { + return args != null && args.length >= 2; + } + + public static Triple getArgumentsFromCommandLine( + Configuration conf, String[] args) throws IOException { + if (!isValidArguements(args)) { + return null; + } + return new Triple<>(TableName.valueOf(args[0]), getScanFromCommandLine(conf, args), new Path(args[1])); + } + + @VisibleForTesting + static Scan getScanFromCommandLine(Configuration conf, String[] args) throws IOException { + Scan s = new Scan(); + // Optional arguments. + // Set Scan Versions + int versions = args.length > 2? Integer.parseInt(args[2]): 1; + s.setMaxVersions(versions); + // Set Scan Range + long startTime = args.length > 3? Long.parseLong(args[3]): 0L; + long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; + s.setTimeRange(startTime, endTime); + // Set cache blocks + s.setCacheBlocks(false); + // set Start and Stop row + if (conf.get(TableInputFormat.SCAN_ROW_START) != null) { + s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START))); + } + if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) { + s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP))); + } + // Set Scan Column Family + boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); + if (raw) { + s.setRaw(raw); + } + for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) { + s.addFamily(Bytes.toBytes(columnFamily)); + } + // Set RowFilter or Prefix Filter if applicable. + Filter exportFilter = getExportFilter(args); + if (exportFilter!= null) { + LOG.info("Setting Scan Filter for Export."); + s.setFilter(exportFilter); + } + List labels = null; + if (conf.get(EXPORT_VISIBILITY_LABELS) != null) { + labels = Arrays.asList(conf.getStrings(EXPORT_VISIBILITY_LABELS)); + if (!labels.isEmpty()) { + s.setAuthorizations(new Authorizations(labels)); + } + } + + int batching = conf.getInt(EXPORT_BATCHING, -1); + if (batching != -1) { + try { + s.setBatch(batching); + } catch (IncompatibleFilterException e) { + LOG.error("Batching could not be set", e); + } + } + + int caching = conf.getInt(EXPORT_CACHING, 100); + if (caching != -1) { + try { + s.setCaching(caching); + } catch (IncompatibleFilterException e) { + LOG.error("Caching could not be set", e); + } + } + LOG.info("versions=" + versions + ", starttime=" + startTime + + ", endtime=" + endTime + ", keepDeletedCells=" + raw + + ", visibility labels=" + labels); + return s; + } + + private ExportUtils() { + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 91d2696fbe7..60d88bc3dd7 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -44,12 +44,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -58,6 +57,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.PrefixFilter; @@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.AfterClass; @@ -90,8 +92,9 @@ import org.mockito.stubbing.Answer; */ @Category({VerySlowMapReduceTests.class, MediumTests.class}) public class TestImportExport { + private static final Log LOG = LogFactory.getLog(TestImportExport.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); @@ -104,10 +107,12 @@ public class TestImportExport { private static String FQ_OUTPUT_DIR; private static final String EXPORT_BATCH_SIZE = "100"; - private static long now = System.currentTimeMillis(); + private static final long now = System.currentTimeMillis(); + private final TableName EXPORT_TABLE = TableName.valueOf("export_table"); + private final TableName IMPORT_TABLE = TableName.valueOf("import_table"); @BeforeClass - public static void beforeClass() throws Exception { + public static void beforeClass() throws Throwable { // Up the handlers; this test needs more than usual. UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); UTIL.startMiniCluster(); @@ -116,7 +121,7 @@ public class TestImportExport { } @AfterClass - public static void afterClass() throws Exception { + public static void afterClass() throws Throwable { UTIL.shutdownMiniCluster(); } @@ -128,11 +133,16 @@ public class TestImportExport { LOG.info("Running " + name.getMethodName()); } - @Before @After - public void cleanup() throws Exception { + public void cleanup() throws Throwable { FileSystem fs = FileSystem.get(UTIL.getConfiguration()); fs.delete(new Path(OUTPUT_DIR), true); + if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) { + UTIL.deleteTable(EXPORT_TABLE); + } + if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) { + UTIL.deleteTable(IMPORT_TABLE); + } } /** @@ -143,12 +153,16 @@ public class TestImportExport { * @throws InterruptedException * @throws ClassNotFoundException */ - boolean runExport(String[] args) throws Exception { + protected boolean runExport(String[] args) throws Throwable { // need to make a copy of the configuration because to make sure different temp dirs are used. int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); return status == 0; } + protected void runExportMain(String[] args) throws Throwable { + Export.main(args); + } + /** * Runs an import job with the specified command line args * @param args @@ -157,7 +171,7 @@ public class TestImportExport { * @throws InterruptedException * @throws ClassNotFoundException */ - boolean runImport(String[] args) throws Exception { + boolean runImport(String[] args) throws Throwable { // need to make a copy of the configuration because to make sure different temp dirs are used. int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); return status == 0; @@ -168,7 +182,7 @@ public class TestImportExport { * @throws Exception */ @Test - public void testSimpleCase() throws Exception { + public void testSimpleCase() throws Throwable { try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) { Put p = new Put(ROW1); p.addColumn(FAMILYA, QUAL, now, QUAL); @@ -223,21 +237,21 @@ public class TestImportExport { /** * Test export hbase:meta table * - * @throws Exception + * @throws Throwable */ @Test - public void testMetaExport() throws Exception { - String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString(); - String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" }; + public void testMetaExport() throws Throwable { + String[] args = new String[] { TableName.META_TABLE_NAME.getNameAsString(), + FQ_OUTPUT_DIR, "1", "0", "0" }; assertTrue(runExport(args)); } /** * Test import data from 0.94 exported file - * @throws Exception + * @throws Throwable */ @Test - public void testImport94Table() throws Exception { + public void testImport94Table() throws Throwable { final String name = "exportedTableIn94Format"; URL url = TestImportExport.class.getResource(name); File f = new File(url.toURI()); @@ -273,11 +287,13 @@ public class TestImportExport { * Test export scanner batching */ @Test - public void testExportScannerBatching() throws Exception { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(1) - ); + public void testExportScannerBatching() throws Throwable { + TableDescriptor desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(name.getMethodName())) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(1) + .build()) + .build(); UTIL.getAdmin().createTable(desc); try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { @@ -290,7 +306,7 @@ public class TestImportExport { t.put(p); String[] args = new String[] { - "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. + "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. name.getMethodName(), FQ_OUTPUT_DIR }; @@ -302,12 +318,14 @@ public class TestImportExport { } @Test - public void testWithDeletes() throws Exception { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); + public void testWithDeletes() throws Throwable { + TableDescriptor desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(name.getMethodName())) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + .build()) + .build(); UTIL.getAdmin().createTable(desc); try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { @@ -327,7 +345,7 @@ public class TestImportExport { } String[] args = new String[] { - "-D" + Export.RAW_SCAN + "=true", + "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export @@ -335,11 +353,13 @@ public class TestImportExport { assertTrue(runExport(args)); final String IMPORT_TABLE = name.getMethodName() + "import"; - desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); + desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(IMPORT_TABLE)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + .build()) + .build(); UTIL.getAdmin().createTable(desc); try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { args = new String[] { @@ -366,13 +386,15 @@ public class TestImportExport { @Test - public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception { + public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable { final TableName exportTable = TableName.valueOf(name.getMethodName()); - HTableDescriptor desc = new HTableDescriptor(exportTable); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); + TableDescriptor desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(name.getMethodName())) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + .build()) + .build(); UTIL.getAdmin().createTable(desc); Table exportT = UTIL.getConnection().getTable(exportTable); @@ -397,18 +419,20 @@ public class TestImportExport { String[] args = new String[] { - "-D" + Export.RAW_SCAN + "=true", exportTable.getNameAsString(), + "-D" + ExportUtils.RAW_SCAN + "=true", exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export }; assertTrue(runExport(args)); final String importTable = name.getMethodName() + "import"; - desc = new HTableDescriptor(TableName.valueOf(importTable)); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); + desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(importTable)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + .build()) + .build(); UTIL.getAdmin().createTable(desc); Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); @@ -429,7 +453,7 @@ public class TestImportExport { Result exportedTResult = exportedTScanner.next(); try { Result.compareResults(exportedTResult, importedTResult); - } catch (Exception e) { + } catch (Throwable e) { fail("Original and imported tables data comparision failed with error:"+e.getMessage()); } finally { exportT.close(); @@ -442,10 +466,14 @@ public class TestImportExport { * attempt with invalid values. */ @Test - public void testWithFilter() throws Exception { + public void testWithFilter() throws Throwable { // Create simple table to export - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); + TableDescriptor desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(name.getMethodName())) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .build()) + .build(); UTIL.getAdmin().createTable(desc); Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); @@ -468,8 +496,12 @@ public class TestImportExport { // Import to a new table final String IMPORT_TABLE = name.getMethodName() + "import"; - desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); - desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); + desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(IMPORT_TABLE)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .build()) + .build(); UTIL.getAdmin().createTable(desc); Table importTable = UTIL.getConnection().getTable(desc.getTableName()); @@ -501,8 +533,6 @@ public class TestImportExport { /** * Count the number of keyvalues in the specified table for the given timerange - * @param start - * @param end * @param table * @return * @throws IOException @@ -523,7 +553,7 @@ public class TestImportExport { * test main method. Import should print help and call System.exit */ @Test - public void testImportMain() throws Exception { + public void testImportMain() throws Throwable { PrintStream oldPrintStream = System.err; SecurityManager SECURITY_MANAGER = System.getSecurityManager(); LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); @@ -548,11 +578,56 @@ public class TestImportExport { } } + @Test + public void testExportScan() throws Exception { + int version = 100; + long startTime = System.currentTimeMillis(); + long endTime = startTime + 1; + String prefix = "row"; + String label_0 = "label_0"; + String label_1 = "label_1"; + String[] args = { + "table", + "outputDir", + String.valueOf(version), + String.valueOf(startTime), + String.valueOf(endTime), + prefix + }; + Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args); + assertEquals(version, scan.getMaxVersions()); + assertEquals(startTime, scan.getTimeRange().getMin()); + assertEquals(endTime, scan.getTimeRange().getMax()); + assertEquals(true, (scan.getFilter() instanceof PrefixFilter)); + assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); + String[] argsWithLabels = { + "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, + "table", + "outputDir", + String.valueOf(version), + String.valueOf(startTime), + String.valueOf(endTime), + prefix + }; + Configuration conf = new Configuration(UTIL.getConfiguration()); + // parse the "-D" options + String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs(); + Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs); + assertEquals(version, scanWithLabels.getMaxVersions()); + assertEquals(startTime, scanWithLabels.getTimeRange().getMin()); + assertEquals(endTime, scanWithLabels.getTimeRange().getMax()); + assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter)); + assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); + assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size()); + assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0)); + assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1)); + } + /** * test main method. Export should print help and call System.exit */ @Test - public void testExportMain() throws Exception { + public void testExportMain() throws Throwable { PrintStream oldPrintStream = System.err; SecurityManager SECURITY_MANAGER = System.getSecurityManager(); LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); @@ -562,7 +637,7 @@ public class TestImportExport { System.setErr(new PrintStream(data)); try { System.setErr(new PrintStream(data)); - Export.main(args); + runExportMain(args); fail("should be SecurityException"); } catch (SecurityException e) { assertEquals(-1, newSecurityManager.getExitCode()); @@ -574,10 +649,9 @@ public class TestImportExport { assertTrue( errMsg.contains("-D hbase.mapreduce.scan.column.family=,, ...")); assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); - assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100")); - assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false")); - assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false")); - assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10")); + assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100")); + assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10")); + assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100")); } finally { System.setErr(oldPrintStream); System.setSecurityManager(SECURITY_MANAGER); @@ -589,7 +663,7 @@ public class TestImportExport { */ @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testKeyValueImporter() throws Exception { + public void testKeyValueImporter() throws Throwable { KeyValueImporter importer = new KeyValueImporter(); Configuration configuration = new Configuration(); Context ctx = mock(Context.class); @@ -638,7 +712,7 @@ public class TestImportExport { } @Test - public void testDurability() throws Exception { + public void testDurability() throws Throwable { // Create an export table. String exportTableName = name.getMethodName() + "export"; try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java index 126c4e47fa8..19cbc38faa7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java @@ -168,7 +168,7 @@ public class SecureTestUtil { * To indicate the action was not allowed, either throw an AccessDeniedException * or return an empty list of KeyValues. */ - protected static interface AccessTestAction extends PrivilegedExceptionAction { } + public interface AccessTestAction extends PrivilegedExceptionAction { } /** This fails only in case of ADE or empty list for any of the actions. */ public static void verifyAllowed(User user, AccessTestAction... actions) throws Exception {