HBASE-15806 An endpoint-based export tool
This commit is contained in:
parent
314d357029
commit
7465973068
|
@ -166,6 +166,23 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- The coprocessor.Export needs mapreduce.Import and mapreduce.Export to run the unit tests -->
|
||||
<!-- see org.apache.hadoop.hbase.coprocessor.TestImportExport -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minikdc</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- General dependencies -->
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
|
|
|
@ -0,0 +1,538 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.mapreduce.ExportUtils;
|
||||
import org.apache.hadoop.hbase.mapreduce.Import;
|
||||
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ExportProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.util.ArrayUtils;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Export an HBase table. Writes content to sequence files up in HDFS. Use
|
||||
* {@link Import} to read it back in again. It is implemented by the endpoint
|
||||
* technique.
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.mapreduce.Export
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||
@InterfaceStability.Evolving
|
||||
public class Export extends ExportProtos.ExportService
|
||||
implements Coprocessor, CoprocessorService {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(Export.class);
|
||||
private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
|
||||
private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD;
|
||||
private RegionCoprocessorEnvironment env = null;
|
||||
private UserProvider userProvider;
|
||||
|
||||
public static void main(String[] args) throws Throwable {
|
||||
Map<byte[], Response> response = run(HBaseConfiguration.create(), args);
|
||||
System.exit(response == null ? -1 : 0);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Map<byte[], Response> run(final Configuration conf, final String[] args) throws Throwable {
|
||||
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
|
||||
if (!ExportUtils.isValidArguements(args)) {
|
||||
ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(otherArgs));
|
||||
return null;
|
||||
}
|
||||
Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
|
||||
return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
|
||||
}
|
||||
|
||||
public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable {
|
||||
FileSystem fs = dir.getFileSystem(conf);
|
||||
UserProvider userProvider = UserProvider.instantiate(conf);
|
||||
checkDir(fs, dir);
|
||||
FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
||||
fsDelegationToken.acquireDelegationToken(fs);
|
||||
try {
|
||||
final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir,
|
||||
scan, fsDelegationToken.getUserToken());
|
||||
try (Connection con = ConnectionFactory.createConnection(conf);
|
||||
Table table = con.getTable(tableName)) {
|
||||
Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
table.coprocessorService(ExportProtos.ExportService.class,
|
||||
scan.getStartRow(),
|
||||
scan.getStopRow(),
|
||||
(ExportProtos.ExportService service) -> {
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse>
|
||||
rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
service.export(controller, request, rpcCallback);
|
||||
if (controller.failedOnException()) {
|
||||
throw controller.getFailedOn();
|
||||
}
|
||||
return rpcCallback.get();
|
||||
}).forEach((k, v) -> result.put(k, new Response(v)));
|
||||
return result;
|
||||
} catch (Throwable e) {
|
||||
fs.delete(dir, true);
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
fsDelegationToken.releaseDelegationToken();
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean getCompression(final ExportProtos.ExportRequest request) {
|
||||
if (request.hasCompressed()) {
|
||||
return request.getCompressed();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) {
|
||||
if (request.hasCompressType()) {
|
||||
return SequenceFile.CompressionType.valueOf(request.getCompressType());
|
||||
} else {
|
||||
return DEFAULT_TYPE;
|
||||
}
|
||||
}
|
||||
|
||||
private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) {
|
||||
try {
|
||||
Class<? extends CompressionCodec> codecClass;
|
||||
if (request.hasCompressCodec()) {
|
||||
codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
|
||||
} else {
|
||||
codecClass = DEFAULT_CODEC;
|
||||
}
|
||||
return ReflectionUtils.newInstance(codecClass, conf);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IllegalArgumentException("Compression codec "
|
||||
+ request.getCompressCodec() + " was not found.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static SequenceFile.Writer.Option getOutputPath(final Configuration conf,
|
||||
final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
|
||||
Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName());
|
||||
FileSystem fs = file.getFileSystem(conf);
|
||||
if (fs.exists(file)) {
|
||||
throw new IOException(file + " exists");
|
||||
}
|
||||
return SequenceFile.Writer.file(file);
|
||||
}
|
||||
|
||||
private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
|
||||
final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
|
||||
List<SequenceFile.Writer.Option> rval = new LinkedList<>();
|
||||
rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
|
||||
rval.add(SequenceFile.Writer.valueClass(Result.class));
|
||||
rval.add(getOutputPath(conf, info, request));
|
||||
if (getCompression(request)) {
|
||||
rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request)));
|
||||
} else {
|
||||
rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf,
|
||||
final UserProvider userProvider, final Scan scan, final Token userToken,
|
||||
final List<SequenceFile.Writer.Option> opts) throws IOException {
|
||||
ScanCoprocessor cp = new ScanCoprocessor(region);
|
||||
RegionScanner scanner = null;
|
||||
try (RegionOp regionOp = new RegionOp(region);
|
||||
SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
|
||||
scanner = cp.checkScannerOpen(scan);
|
||||
ImmutableBytesWritable key = new ImmutableBytesWritable();
|
||||
long rowCount = 0;
|
||||
long cellCount = 0;
|
||||
List<Result> results = new ArrayList<>();
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
boolean hasMore;
|
||||
do {
|
||||
boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
|
||||
if (bypass) {
|
||||
hasMore = false;
|
||||
} else {
|
||||
hasMore = scanner.nextRaw(cells);
|
||||
if (cells.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
Cell firstCell = cells.get(0);
|
||||
for (Cell cell : cells) {
|
||||
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(),
|
||||
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) {
|
||||
throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??"
|
||||
+ " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength())
|
||||
+ ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
|
||||
}
|
||||
}
|
||||
results.add(Result.create(cells));
|
||||
cells.clear();
|
||||
cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
|
||||
}
|
||||
for (Result r : results) {
|
||||
key.set(r.getRow());
|
||||
out.append(key, r);
|
||||
++rowCount;
|
||||
cellCount += r.size();
|
||||
}
|
||||
results.clear();
|
||||
} while (hasMore);
|
||||
return ExportProtos.ExportResponse.newBuilder()
|
||||
.setRowCount(rowCount)
|
||||
.setCellCount(cellCount)
|
||||
.build();
|
||||
} finally {
|
||||
cp.checkScannerClose(scanner);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkDir(final FileSystem fs, final Path dir) throws IOException {
|
||||
if (fs.exists(dir)) {
|
||||
throw new RuntimeException("The " + dir + " exists");
|
||||
}
|
||||
if (!fs.mkdirs(dir)) {
|
||||
throw new IOException("Failed to create the " + dir);
|
||||
}
|
||||
}
|
||||
|
||||
private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf,
|
||||
Path dir, final Scan scan, final Token<?> userToken) throws IOException {
|
||||
boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false);
|
||||
String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE,
|
||||
DEFAULT_TYPE.toString());
|
||||
String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC,
|
||||
DEFAULT_CODEC.getName());
|
||||
DelegationToken protoToken = null;
|
||||
if (userToken != null) {
|
||||
protoToken = DelegationToken.newBuilder()
|
||||
.setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
|
||||
.setPassword(ByteStringer.wrap(userToken.getPassword()))
|
||||
.setKind(userToken.getKind().toString())
|
||||
.setService(userToken.getService().toString()).build();
|
||||
}
|
||||
LOG.info("compressed=" + compressed
|
||||
+ ", compression type=" + compressionType
|
||||
+ ", compression codec=" + compressionCodec
|
||||
+ ", userToken=" + userToken);
|
||||
ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder()
|
||||
.setScan(ProtobufUtil.toScan(scan))
|
||||
.setOutputPath(dir.toString())
|
||||
.setCompressed(compressed)
|
||||
.setCompressCodec(compressionCodec)
|
||||
.setCompressType(compressionType);
|
||||
if (protoToken != null) {
|
||||
builder.setFsToken(protoToken);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment environment) throws IOException {
|
||||
if (environment instanceof RegionCoprocessorEnvironment) {
|
||||
env = (RegionCoprocessorEnvironment) environment;
|
||||
userProvider = UserProvider.instantiate(env.getConfiguration());
|
||||
} else {
|
||||
throw new CoprocessorException("Must be loaded on a table region!");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Service getService() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void export(RpcController controller, ExportProtos.ExportRequest request,
|
||||
RpcCallback<ExportProtos.ExportResponse> done) {
|
||||
Region region = env.getRegion();
|
||||
Configuration conf = HBaseConfiguration.create(env.getConfiguration());
|
||||
conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName());
|
||||
try {
|
||||
Scan scan = validateKey(region.getRegionInfo(), request);
|
||||
Token userToken = null;
|
||||
if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) {
|
||||
LOG.warn("Hadoop security is enable, but no found of user token");
|
||||
} else if (userProvider.isHadoopSecurityEnabled()) {
|
||||
userToken = new Token(request.getFsToken().getIdentifier().toByteArray(),
|
||||
request.getFsToken().getPassword().toByteArray(),
|
||||
new Text(request.getFsToken().getKind()),
|
||||
new Text(request.getFsToken().getService()));
|
||||
}
|
||||
ExportProtos.ExportResponse response = processData(region, conf, userProvider,
|
||||
scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request));
|
||||
done.run(response);
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Scan validateKey(final HRegionInfo region, final ExportProtos.ExportRequest request) throws IOException {
|
||||
Scan scan = ProtobufUtil.toScan(request.getScan());
|
||||
byte[] regionStartKey = region.getStartKey();
|
||||
byte[] originStartKey = scan.getStartRow();
|
||||
if (originStartKey == null
|
||||
|| Bytes.compareTo(originStartKey, regionStartKey) < 0) {
|
||||
scan.setStartRow(regionStartKey);
|
||||
}
|
||||
byte[] regionEndKey = region.getEndKey();
|
||||
byte[] originEndKey = scan.getStopRow();
|
||||
if (originEndKey == null
|
||||
|| Bytes.compareTo(originEndKey, regionEndKey) > 0) {
|
||||
scan.setStartRow(regionEndKey);
|
||||
}
|
||||
return scan;
|
||||
}
|
||||
|
||||
private static class RegionOp implements Closeable {
|
||||
|
||||
private final Region region;
|
||||
|
||||
RegionOp(final Region region) throws IOException {
|
||||
this.region = region;
|
||||
region.startRegionOperation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
region.closeRegionOperation();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ScanCoprocessor {
|
||||
|
||||
private final Region region;
|
||||
|
||||
ScanCoprocessor(final Region region) {
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
RegionScanner checkScannerOpen(final Scan scan) throws IOException {
|
||||
RegionScanner scanner;
|
||||
if (region.getCoprocessorHost() == null) {
|
||||
scanner = region.getScanner(scan);
|
||||
} else {
|
||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||
if (scanner == null) {
|
||||
scanner = region.getScanner(scan);
|
||||
}
|
||||
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
|
||||
}
|
||||
if (scanner == null) {
|
||||
throw new IOException("Failed to open region scanner");
|
||||
}
|
||||
return scanner;
|
||||
}
|
||||
|
||||
void checkScannerClose(final InternalScanner s) throws IOException {
|
||||
if (s == null) {
|
||||
return;
|
||||
}
|
||||
if (region.getCoprocessorHost() == null) {
|
||||
s.close();
|
||||
return;
|
||||
}
|
||||
if (region.getCoprocessorHost().preScannerClose(s)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
s.close();
|
||||
} finally {
|
||||
region.getCoprocessorHost().postScannerClose(s);
|
||||
}
|
||||
}
|
||||
|
||||
boolean preScannerNext(final InternalScanner s,
|
||||
final List<Result> results, final int limit) throws IOException {
|
||||
if (region.getCoprocessorHost() == null) {
|
||||
return false;
|
||||
} else {
|
||||
Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, limit);
|
||||
return bypass == null ? false : bypass;
|
||||
}
|
||||
}
|
||||
|
||||
boolean postScannerNext(final InternalScanner s,
|
||||
final List<Result> results, final int limit, boolean hasMore)
|
||||
throws IOException {
|
||||
if (region.getCoprocessorHost() == null) {
|
||||
return false;
|
||||
} else {
|
||||
return region.getCoprocessorHost().postScannerNext(s, results, limit, hasMore);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class SecureWriter implements Closeable {
|
||||
private final PrivilegedWriter privilegedWriter;
|
||||
|
||||
SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken,
|
||||
final List<SequenceFile.Writer.Option> opts) throws IOException {
|
||||
privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken),
|
||||
SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
|
||||
}
|
||||
|
||||
void append(final Object key, final Object value) throws IOException {
|
||||
privilegedWriter.append(key, value);
|
||||
}
|
||||
|
||||
private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException {
|
||||
User user = RpcServer.getRequestUser();
|
||||
if (user == null) {
|
||||
user = userProvider.getCurrent();
|
||||
}
|
||||
if (user == null && userToken != null) {
|
||||
LOG.warn("No found of user credentials, but a token was got from user request");
|
||||
} else if (user != null && userToken != null) {
|
||||
user.addToken(userToken);
|
||||
}
|
||||
return user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
privilegedWriter.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>, Closeable {
|
||||
private final User user;
|
||||
private final SequenceFile.Writer out;
|
||||
private Object key;
|
||||
private Object value;
|
||||
|
||||
PrivilegedWriter(final User user, final SequenceFile.Writer out) {
|
||||
this.user = user;
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
void append(final Object key, final Object value) throws IOException {
|
||||
if (user == null) {
|
||||
out.append(key, value);
|
||||
} else {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
try {
|
||||
user.runAs(this);
|
||||
} catch (InterruptedException ex) {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean run() throws Exception {
|
||||
out.append(key, value);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response {
|
||||
|
||||
private final long rowCount;
|
||||
private final long cellCount;
|
||||
|
||||
private Response(ExportProtos.ExportResponse r) {
|
||||
this.rowCount = r.getRowCount();
|
||||
this.cellCount = r.getCellCount();
|
||||
}
|
||||
|
||||
public long getRowCount() {
|
||||
return rowCount;
|
||||
}
|
||||
|
||||
public long getCellCount() {
|
||||
return cellCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder(35);
|
||||
return builder.append("rowCount=")
|
||||
.append(rowCount)
|
||||
.append(", cellCount=")
|
||||
.append(cellCount)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package hbase.pb;
|
||||
|
||||
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
||||
option java_outer_classname = "ExportProtos";
|
||||
option java_generate_equals_and_hash = true;
|
||||
option optimize_for = SPEED;
|
||||
option java_generic_services = true;
|
||||
|
||||
import "Client.proto";
|
||||
|
||||
service ExportService {
|
||||
rpc export (ExportRequest) returns (ExportResponse);
|
||||
}
|
||||
|
||||
message ExportRequest {
|
||||
required Scan scan = 1;
|
||||
required string outputPath = 2;
|
||||
optional bool compressed = 3 [default = false];
|
||||
optional string compressType = 4;
|
||||
optional string compressCodec = 5;
|
||||
optional DelegationToken fsToken = 6;
|
||||
}
|
||||
message ExportResponse {
|
||||
required uint64 rowCount = 1;
|
||||
required uint64 cellCount = 2;
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({MediumTests.class})
|
||||
public class TestImportExport extends org.apache.hadoop.hbase.mapreduce.TestImportExport {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Throwable {
|
||||
UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
org.apache.hadoop.hbase.coprocessor.Export.class.getName());
|
||||
org.apache.hadoop.hbase.mapreduce.TestImportExport.beforeClass();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean runExport(String[] args) throws Throwable {
|
||||
Export.run(new Configuration(UTIL.getConfiguration()), args);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runExportMain(String[] args) throws Throwable {
|
||||
Export.main(args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip the test which is unrelated to the coprocessor.Export.
|
||||
*/
|
||||
@Test
|
||||
@Ignore
|
||||
public void testImport94Table() throws Throwable {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,439 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.hbase.mapreduce.ExportUtils;
|
||||
import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
|
||||
import org.apache.hadoop.hbase.mapreduce.Import;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
|
||||
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.access.AccessControlConstants;
|
||||
import org.apache.hadoop.hbase.security.access.AccessControlLists;
|
||||
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
|
||||
import org.apache.hadoop.hbase.security.access.SecureTestUtil.AccessTestAction;
|
||||
import org.apache.hadoop.hbase.security.access.Permission;
|
||||
import org.apache.hadoop.hbase.security.visibility.Authorizations;
|
||||
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
||||
import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
|
||||
import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
|
||||
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({MediumTests.class})
|
||||
public class TestSecureExport {
|
||||
private static final Log LOG = LogFactory.getLog(TestSecureExport.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static MiniKdc KDC;
|
||||
private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
|
||||
private static String USERNAME;
|
||||
private static String SERVER_PRINCIPAL;
|
||||
private static String HTTP_PRINCIPAL;
|
||||
private static final String FAMILYA_STRING = "fma";
|
||||
private static final String FAMILYB_STRING = "fma";
|
||||
private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
|
||||
private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
|
||||
private static final byte[] ROW1 = Bytes.toBytes("row1");
|
||||
private static final byte[] ROW2 = Bytes.toBytes("row2");
|
||||
private static final byte[] ROW3 = Bytes.toBytes("row3");
|
||||
private static final byte[] QUAL = Bytes.toBytes("qual");
|
||||
private static final String LOCALHOST = "localhost";
|
||||
private static final long NOW = System.currentTimeMillis();
|
||||
// user granted with all global permission
|
||||
private static final String USER_ADMIN = "admin";
|
||||
// user is table owner. will have all permissions on table
|
||||
private static final String USER_OWNER = "owner";
|
||||
// user with rx permissions.
|
||||
private static final String USER_RX = "rxuser";
|
||||
// user with exe-only permissions.
|
||||
private static final String USER_XO = "xouser";
|
||||
// user with read-only permissions.
|
||||
private static final String USER_RO = "rouser";
|
||||
// user with no permissions
|
||||
private static final String USER_NONE = "noneuser";
|
||||
private static final String PRIVATE = "private";
|
||||
private static final String CONFIDENTIAL = "confidential";
|
||||
private static final String SECRET = "secret";
|
||||
private static final String TOPSECRET = "topsecret";
|
||||
@Rule
|
||||
public final TestName name = new TestName();
|
||||
private static void setUpKdcServer() throws Exception {
|
||||
Properties conf = MiniKdc.createConf();
|
||||
conf.put(MiniKdc.DEBUG, true);
|
||||
File kdcFile = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
|
||||
KDC = new MiniKdc(conf, kdcFile);
|
||||
KDC.start();
|
||||
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
|
||||
SERVER_PRINCIPAL = USERNAME + "/" + LOCALHOST;
|
||||
HTTP_PRINCIPAL = "HTTP/" + LOCALHOST;
|
||||
KDC.createPrincipal(KEYTAB_FILE,
|
||||
SERVER_PRINCIPAL,
|
||||
HTTP_PRINCIPAL,
|
||||
USER_ADMIN + "/" + LOCALHOST,
|
||||
USER_OWNER + "/" + LOCALHOST,
|
||||
USER_RX + "/" + LOCALHOST,
|
||||
USER_RO + "/" + LOCALHOST,
|
||||
USER_XO + "/" + LOCALHOST,
|
||||
USER_NONE + "/" + LOCALHOST);
|
||||
}
|
||||
private static User getUserByLogin(final String user) throws IOException {
|
||||
return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrinciple(user), KEYTAB_FILE.getAbsolutePath()));
|
||||
}
|
||||
private static String getPrinciple(final String user) {
|
||||
return user + "/" + LOCALHOST + "@" + KDC.getRealm();
|
||||
}
|
||||
private static void setUpClusterKdc() throws Exception {
|
||||
HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
|
||||
HBaseKerberosUtils.setPrincipalForTesting(SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration());
|
||||
// if we drop support for hadoop-2.4.0 and hadoop-2.4.1,
|
||||
// the following key should be changed.
|
||||
// 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY
|
||||
// 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
|
||||
// set yarn principal
|
||||
UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
|
||||
|
||||
File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
|
||||
keystoresDir.mkdirs();
|
||||
String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureExport.class);
|
||||
KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, UTIL.getConfiguration(), false);
|
||||
|
||||
UTIL.getConfiguration().setBoolean("ignore.secure.ports.for.testing", true);
|
||||
UserGroupInformation.setConfiguration(UTIL.getConfiguration());
|
||||
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, UTIL.getConfiguration().get(
|
||||
CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName());
|
||||
}
|
||||
private static void addLabels(final Configuration conf, final List<String> users, final List<String> labels) throws Exception {
|
||||
PrivilegedExceptionAction<VisibilityLabelsProtos.VisibilityLabelsResponse> action
|
||||
= () -> {
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf)) {
|
||||
VisibilityClient.addLabels(conn, labels.toArray(new String[labels.size()]));
|
||||
for (String user : users) {
|
||||
VisibilityClient.setAuths(conn, labels.toArray(new String[labels.size()]), user);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new IOException(t);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
getUserByLogin(USER_ADMIN).runAs(action);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void announce() {
|
||||
LOG.info("Running " + name.getMethodName());
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws IOException {
|
||||
}
|
||||
private static void clearOutput(Path path) throws IOException {
|
||||
FileSystem fs = path.getFileSystem(UTIL.getConfiguration());
|
||||
if (fs.exists(path)) {
|
||||
assertEquals(true, fs.delete(path, true));
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Sets the security firstly for getting the correct default realm.
|
||||
* @throws Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), HadoopSecurityEnabledUserProviderForTesting.class);
|
||||
setUpKdcServer();
|
||||
SecureTestUtil.enableSecurity(UTIL.getConfiguration());
|
||||
UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
|
||||
VisibilityTestUtil.enableVisiblityLabels(UTIL.getConfiguration());
|
||||
SecureTestUtil.verifyConfiguration(UTIL.getConfiguration());
|
||||
setUpClusterKdc();
|
||||
UTIL.startMiniCluster();
|
||||
UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
|
||||
UTIL.waitUntilAllRegionsAssigned(VisibilityConstants.LABELS_TABLE_NAME);
|
||||
UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME, 50000);
|
||||
UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME, 50000);
|
||||
SecureTestUtil.grantGlobal(UTIL, USER_ADMIN,
|
||||
Permission.Action.ADMIN,
|
||||
Permission.Action.CREATE,
|
||||
Permission.Action.EXEC,
|
||||
Permission.Action.READ,
|
||||
Permission.Action.WRITE);
|
||||
addLabels(UTIL.getConfiguration(), Arrays.asList(USER_OWNER),
|
||||
Arrays.asList(PRIVATE, CONFIDENTIAL, SECRET, TOPSECRET));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
if (KDC != null) {
|
||||
KDC.stop();
|
||||
}
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the ExportEndpoint's access levels. The {@link Export} test is ignored
|
||||
* since the access exceptions cannot be collected from the mappers.
|
||||
*
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
@Test
|
||||
public void testAccessCase() throws IOException, Throwable {
|
||||
final String exportTable = name.getMethodName();
|
||||
TableDescriptor exportHtd = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
|
||||
.setOwnerString(USER_OWNER)
|
||||
.build();
|
||||
SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")});
|
||||
SecureTestUtil.grantOnTable(UTIL, USER_RO,
|
||||
TableName.valueOf(exportTable), null, null,
|
||||
Permission.Action.READ);
|
||||
SecureTestUtil.grantOnTable(UTIL, USER_RX,
|
||||
TableName.valueOf(exportTable), null, null,
|
||||
Permission.Action.READ,
|
||||
Permission.Action.EXEC);
|
||||
SecureTestUtil.grantOnTable(UTIL, USER_XO,
|
||||
TableName.valueOf(exportTable), null, null,
|
||||
Permission.Action.EXEC);
|
||||
assertEquals(4, AccessControlLists.getTablePermissions(UTIL.getConfiguration(),
|
||||
TableName.valueOf(exportTable)).size());
|
||||
AccessTestAction putAction = () -> {
|
||||
Put p = new Put(ROW1);
|
||||
p.addColumn(FAMILYA, Bytes.toBytes("qual_0"), NOW, QUAL);
|
||||
p.addColumn(FAMILYA, Bytes.toBytes("qual_1"), NOW, QUAL);
|
||||
try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||
Table t = conn.getTable(TableName.valueOf(exportTable))) {
|
||||
t.put(p);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
// no hdfs access.
|
||||
SecureTestUtil.verifyAllowed(putAction,
|
||||
getUserByLogin(USER_ADMIN),
|
||||
getUserByLogin(USER_OWNER));
|
||||
SecureTestUtil.verifyDenied(putAction,
|
||||
getUserByLogin(USER_RO),
|
||||
getUserByLogin(USER_XO),
|
||||
getUserByLogin(USER_RX),
|
||||
getUserByLogin(USER_NONE));
|
||||
|
||||
final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
|
||||
final Path openDir = fs.makeQualified(new Path("testAccessCase"));
|
||||
fs.mkdirs(openDir);
|
||||
fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
|
||||
final Path output = fs.makeQualified(new Path(openDir, "output"));
|
||||
AccessTestAction exportAction = () -> {
|
||||
try {
|
||||
String[] args = new String[]{exportTable, output.toString()};
|
||||
Map<byte[], Export.Response> result
|
||||
= Export.run(new Configuration(UTIL.getConfiguration()), args);
|
||||
long rowCount = 0;
|
||||
long cellCount = 0;
|
||||
for (Export.Response r : result.values()) {
|
||||
rowCount += r.getRowCount();
|
||||
cellCount += r.getCellCount();
|
||||
}
|
||||
assertEquals(1, rowCount);
|
||||
assertEquals(2, cellCount);
|
||||
return null;
|
||||
} catch (ServiceException | IOException ex) {
|
||||
throw ex;
|
||||
} catch (Throwable ex) {
|
||||
LOG.error(ex);
|
||||
throw new Exception(ex);
|
||||
} finally {
|
||||
clearOutput(output);
|
||||
}
|
||||
};
|
||||
SecureTestUtil.verifyDenied(exportAction,
|
||||
getUserByLogin(USER_RO),
|
||||
getUserByLogin(USER_XO),
|
||||
getUserByLogin(USER_NONE));
|
||||
SecureTestUtil.verifyAllowed(exportAction,
|
||||
getUserByLogin(USER_ADMIN),
|
||||
getUserByLogin(USER_OWNER),
|
||||
getUserByLogin(USER_RX));
|
||||
AccessTestAction deleteAction = () -> {
|
||||
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||
return null;
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
|
||||
fs.delete(openDir, true);
|
||||
}
|
||||
@Test
|
||||
public void testVisibilityLabels() throws IOException, Throwable {
|
||||
final String exportTable = name.getMethodName() + "_export";
|
||||
final String importTable = name.getMethodName() + "_import";
|
||||
final TableDescriptor exportHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(exportTable))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
|
||||
.setOwnerString(USER_OWNER)
|
||||
.build();
|
||||
SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")});
|
||||
AccessTestAction putAction = () -> {
|
||||
Put p1 = new Put(ROW1);
|
||||
p1.addColumn(FAMILYA, QUAL, NOW, QUAL);
|
||||
p1.setCellVisibility(new CellVisibility(SECRET));
|
||||
Put p2 = new Put(ROW2);
|
||||
p2.addColumn(FAMILYA, QUAL, NOW, QUAL);
|
||||
p2.setCellVisibility(new CellVisibility(PRIVATE + " & " + CONFIDENTIAL));
|
||||
Put p3 = new Put(ROW3);
|
||||
p3.addColumn(FAMILYA, QUAL, NOW, QUAL);
|
||||
p3.setCellVisibility(new CellVisibility("!" + CONFIDENTIAL + " & " + TOPSECRET));
|
||||
try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||
Table t = conn.getTable(TableName.valueOf(exportTable))) {
|
||||
t.put(p1);
|
||||
t.put(p2);
|
||||
t.put(p3);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(putAction, getUserByLogin(USER_OWNER));
|
||||
List<Pair<List<String>, Integer>> labelsAndRowCounts = new LinkedList<>();
|
||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(SECRET), 1));
|
||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(PRIVATE, CONFIDENTIAL), 1));
|
||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET), 1));
|
||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL), 0));
|
||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL, PRIVATE, SECRET), 2));
|
||||
for (final Pair<List<String>, Integer> labelsAndRowCount : labelsAndRowCounts) {
|
||||
final List<String> labels = labelsAndRowCount.getFirst();
|
||||
final int rowCount = labelsAndRowCount.getSecond();
|
||||
//create a open permission directory.
|
||||
final Path openDir = new Path("testAccessCase");
|
||||
final FileSystem fs = openDir.getFileSystem(UTIL.getConfiguration());
|
||||
fs.mkdirs(openDir);
|
||||
fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
|
||||
final Path output = fs.makeQualified(new Path(openDir, "output"));
|
||||
AccessTestAction exportAction = () -> {
|
||||
StringBuilder buf = new StringBuilder();
|
||||
labels.forEach(v -> buf.append(v).append(","));
|
||||
buf.deleteCharAt(buf.length() - 1);
|
||||
try {
|
||||
String[] args = new String[]{
|
||||
"-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + buf.toString(),
|
||||
exportTable,
|
||||
output.toString(),};
|
||||
Export.run(new Configuration(UTIL.getConfiguration()), args);
|
||||
return null;
|
||||
} catch (ServiceException | IOException ex) {
|
||||
throw ex;
|
||||
} catch (Throwable ex) {
|
||||
throw new Exception(ex);
|
||||
}
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(exportAction, getUserByLogin(USER_OWNER));
|
||||
final TableDescriptor importHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(importTable))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYB))
|
||||
.setOwnerString(USER_OWNER)
|
||||
.build();
|
||||
SecureTestUtil.createTable(UTIL, importHtd, new byte[][]{Bytes.toBytes("s")});
|
||||
AccessTestAction importAction = () -> {
|
||||
String[] args = new String[]{
|
||||
"-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
|
||||
importTable,
|
||||
output.toString()
|
||||
};
|
||||
assertEquals(0, ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args));
|
||||
return null;
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(importAction, getUserByLogin(USER_OWNER));
|
||||
AccessTestAction scanAction = () -> {
|
||||
Scan scan = new Scan();
|
||||
scan.setAuthorizations(new Authorizations(labels));
|
||||
try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||
Table table = conn.getTable(importHtd.getTableName());
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
int count = 0;
|
||||
for (Result r : scanner) {
|
||||
++count;
|
||||
}
|
||||
assertEquals(rowCount, count);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(scanAction, getUserByLogin(USER_OWNER));
|
||||
AccessTestAction deleteAction = () -> {
|
||||
UTIL.deleteTable(importHtd.getTableName());
|
||||
return null;
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
|
||||
clearOutput(output);
|
||||
}
|
||||
AccessTestAction deleteAction = () -> {
|
||||
UTIL.deleteTable(exportHtd.getTableName());
|
||||
return null;
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.example;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.coprocessor.Export;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A simple example on how to use {@link org.apache.hadoop.hbase.coprocessor.Export}.
|
||||
*
|
||||
* <p>
|
||||
* For the protocol buffer definition of the ExportService, see the source file located under
|
||||
* hbase-endpoint/src/main/protobuf/Export.proto.
|
||||
* </p>
|
||||
*/
|
||||
public class ExportEndpointExample {
|
||||
|
||||
public static void main(String[] args) throws Throwable {
|
||||
int rowCount = 100;
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
TableName tableName = TableName.valueOf("ExportEndpointExample");
|
||||
try (Connection con = ConnectionFactory.createConnection(conf);
|
||||
Admin admin = con.getAdmin()) {
|
||||
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
|
||||
// MUST mount the export endpoint
|
||||
.addCoprocessor(Export.class.getName())
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(family))
|
||||
.build();
|
||||
admin.createTable(desc);
|
||||
|
||||
List<Put> puts = new ArrayList<>(rowCount);
|
||||
for (int row = 0; row != rowCount; ++row) {
|
||||
byte[] bs = Bytes.toBytes(row);
|
||||
Put put = new Put(bs);
|
||||
put.addColumn(family, bs, bs);
|
||||
puts.add(put);
|
||||
}
|
||||
try (Table table = con.getTable(tableName)) {
|
||||
table.put(puts);
|
||||
}
|
||||
|
||||
Path output = new Path("/tmp/ExportEndpointExample_output");
|
||||
Scan scan = new Scan();
|
||||
Map<byte[], Export.Response> result = Export.run(conf, tableName, scan, output);
|
||||
final long totalOutputRows = result.values().stream().mapToLong(v -> v.getRowCount()).sum();
|
||||
final long totalOutputCells = result.values().stream().mapToLong(v -> v.getCellCount()).sum();
|
||||
System.out.println("table:" + tableName);
|
||||
System.out.println("output:" + output);
|
||||
System.out.println("total rows:" + totalOutputRows);
|
||||
System.out.println("total cells:" + totalOutputCells);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,23 +20,17 @@ package org.apache.hadoop.hbase.mapreduce;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ArrayUtils;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
|
@ -50,12 +44,8 @@ import org.apache.hadoop.util.ToolRunner;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class Export extends Configured implements Tool {
|
||||
private static final Log LOG = LogFactory.getLog(Export.class);
|
||||
final static String NAME = "export";
|
||||
final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
|
||||
final static String EXPORT_BATCHING = "hbase.export.scanner.batch";
|
||||
|
||||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||
static final String NAME = "export";
|
||||
static final String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||
|
||||
/**
|
||||
* Sets up the actual job.
|
||||
|
@ -67,13 +57,14 @@ public class Export extends Configured implements Tool {
|
|||
*/
|
||||
public static Job createSubmittableJob(Configuration conf, String[] args)
|
||||
throws IOException {
|
||||
String tableName = args[0];
|
||||
Path outputDir = new Path(args[1]);
|
||||
Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, args);
|
||||
String tableName = arguments.getFirst().getNameAsString();
|
||||
Path outputDir = arguments.getThird();
|
||||
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
|
||||
job.setJobName(NAME + "_" + tableName);
|
||||
job.setJarByClass(Export.class);
|
||||
// Set optional scan parameters
|
||||
Scan s = getConfiguredScanForJob(conf, args);
|
||||
Scan s = arguments.getSecond();
|
||||
IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
|
||||
// No reducers. Just write straight to output files.
|
||||
job.setNumReduceTasks(0);
|
||||
|
@ -84,101 +75,15 @@ public class Export extends Configured implements Tool {
|
|||
return job;
|
||||
}
|
||||
|
||||
private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
|
||||
Scan s = new Scan();
|
||||
// Optional arguments.
|
||||
// Set Scan Versions
|
||||
int versions = args.length > 2? Integer.parseInt(args[2]): 1;
|
||||
s.setMaxVersions(versions);
|
||||
// Set Scan Range
|
||||
long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
|
||||
long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
|
||||
s.setTimeRange(startTime, endTime);
|
||||
// Set cache blocks
|
||||
s.setCacheBlocks(false);
|
||||
// set Start and Stop row
|
||||
if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
|
||||
s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
|
||||
}
|
||||
if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
|
||||
s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
|
||||
}
|
||||
// Set Scan Column Family
|
||||
boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
|
||||
if (raw) {
|
||||
s.setRaw(raw);
|
||||
}
|
||||
for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
|
||||
s.addFamily(Bytes.toBytes(columnFamily));
|
||||
}
|
||||
// Set RowFilter or Prefix Filter if applicable.
|
||||
Filter exportFilter = getExportFilter(args);
|
||||
if (exportFilter!= null) {
|
||||
LOG.info("Setting Scan Filter for Export.");
|
||||
s.setFilter(exportFilter);
|
||||
}
|
||||
|
||||
int batching = conf.getInt(EXPORT_BATCHING, -1);
|
||||
if (batching != -1){
|
||||
try {
|
||||
s.setBatch(batching);
|
||||
} catch (IncompatibleFilterException e) {
|
||||
LOG.error("Batching could not be set", e);
|
||||
}
|
||||
}
|
||||
LOG.info("versions=" + versions + ", starttime=" + startTime +
|
||||
", endtime=" + endTime + ", keepDeletedCells=" + raw);
|
||||
return s;
|
||||
}
|
||||
|
||||
private static Filter getExportFilter(String[] args) {
|
||||
Filter exportFilter = null;
|
||||
String filterCriteria = (args.length > 5) ? args[5]: null;
|
||||
if (filterCriteria == null) return null;
|
||||
if (filterCriteria.startsWith("^")) {
|
||||
String regexPattern = filterCriteria.substring(1, filterCriteria.length());
|
||||
exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern));
|
||||
} else {
|
||||
exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
|
||||
}
|
||||
return exportFilter;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param errorMsg Error message. Can be null.
|
||||
*/
|
||||
private static void usage(final String errorMsg) {
|
||||
if (errorMsg != null && errorMsg.length() > 0) {
|
||||
System.err.println("ERROR: " + errorMsg);
|
||||
}
|
||||
System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
|
||||
"[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
|
||||
System.err.println(" Note: -D properties will be applied to the conf used. ");
|
||||
System.err.println(" For example: ");
|
||||
System.err.println(" -D mapreduce.output.fileoutputformat.compress=true");
|
||||
System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec");
|
||||
System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK");
|
||||
System.err.println(" Additionally, the following SCAN properties can be specified");
|
||||
System.err.println(" to control/limit what is exported..");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
|
||||
System.err.println(" -D " + RAW_SCAN + "=true");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
|
||||
System.err.println(" -D " + JOB_NAME_CONF_KEY
|
||||
+ "=jobName - use the specified mapreduce job name for the export");
|
||||
System.err.println("For performance consider the following properties:\n"
|
||||
+ " -Dhbase.client.scanner.caching=100\n"
|
||||
+ " -Dmapreduce.map.speculative=false\n"
|
||||
+ " -Dmapreduce.reduce.speculative=false");
|
||||
System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
|
||||
+ " -D" + EXPORT_BATCHING + "=10");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
usage("Wrong number of arguments: " + args.length);
|
||||
if (!ExportUtils.isValidArguements(args)) {
|
||||
ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(args));
|
||||
System.err.println(" -D " + JOB_NAME_CONF_KEY
|
||||
+ "=jobName - use the specified mapreduce job name for the export");
|
||||
System.err.println("For MR performance consider the following properties:");
|
||||
System.err.println(" -D mapreduce.map.speculative=false");
|
||||
System.err.println(" -D mapreduce.reduce.speculative=false");
|
||||
return -1;
|
||||
}
|
||||
Job job = createSubmittableJob(getConf(), args);
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
import org.apache.hadoop.hbase.security.visibility.Authorizations;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
|
||||
/**
|
||||
* Some helper methods are used by {@link org.apache.hadoop.hbase.mapreduce.Export}
|
||||
* and org.apache.hadoop.hbase.coprocessor.Export (in hbase-endpooint).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class ExportUtils {
|
||||
private static final Log LOG = LogFactory.getLog(ExportUtils.class);
|
||||
public static final String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
|
||||
public static final String EXPORT_BATCHING = "hbase.export.scanner.batch";
|
||||
public static final String EXPORT_CACHING = "hbase.export.scanner.caching";
|
||||
public static final String EXPORT_VISIBILITY_LABELS = "hbase.export.visibility.labels";
|
||||
/**
|
||||
* Common usage for other export tools.
|
||||
* @param errorMsg Error message. Can be null.
|
||||
*/
|
||||
public static void usage(final String errorMsg) {
|
||||
if (errorMsg != null && errorMsg.length() > 0) {
|
||||
System.err.println("ERROR: " + errorMsg);
|
||||
}
|
||||
System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
|
||||
"[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
|
||||
System.err.println(" Note: -D properties will be applied to the conf used. ");
|
||||
System.err.println(" For example: ");
|
||||
System.err.println(" -D " + FileOutputFormat.COMPRESS + "=true");
|
||||
System.err.println(" -D " + FileOutputFormat.COMPRESS_CODEC + "=org.apache.hadoop.io.compress.GzipCodec");
|
||||
System.err.println(" -D " + FileOutputFormat.COMPRESS_TYPE + "=BLOCK");
|
||||
System.err.println(" Additionally, the following SCAN properties can be specified");
|
||||
System.err.println(" to control/limit what is exported..");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
|
||||
System.err.println(" -D " + RAW_SCAN + "=true");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
|
||||
System.err.println(" -D " + HConstants.HBASE_CLIENT_SCANNER_CACHING + "=100");
|
||||
System.err.println(" -D " + EXPORT_VISIBILITY_LABELS + "=<labels>");
|
||||
System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
|
||||
+ " -D " + EXPORT_BATCHING + "=10\n"
|
||||
+ " -D " + EXPORT_CACHING + "=100");
|
||||
}
|
||||
|
||||
private static Filter getExportFilter(String[] args) {
|
||||
Filter exportFilter;
|
||||
String filterCriteria = (args.length > 5) ? args[5]: null;
|
||||
if (filterCriteria == null) return null;
|
||||
if (filterCriteria.startsWith("^")) {
|
||||
String regexPattern = filterCriteria.substring(1, filterCriteria.length());
|
||||
exportFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
|
||||
} else {
|
||||
exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
|
||||
}
|
||||
return exportFilter;
|
||||
}
|
||||
|
||||
public static boolean isValidArguements(String[] args) {
|
||||
return args != null && args.length >= 2;
|
||||
}
|
||||
|
||||
public static Triple<TableName, Scan, Path> getArgumentsFromCommandLine(
|
||||
Configuration conf, String[] args) throws IOException {
|
||||
if (!isValidArguements(args)) {
|
||||
return null;
|
||||
}
|
||||
return new Triple<>(TableName.valueOf(args[0]), getScanFromCommandLine(conf, args), new Path(args[1]));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Scan getScanFromCommandLine(Configuration conf, String[] args) throws IOException {
|
||||
Scan s = new Scan();
|
||||
// Optional arguments.
|
||||
// Set Scan Versions
|
||||
int versions = args.length > 2? Integer.parseInt(args[2]): 1;
|
||||
s.setMaxVersions(versions);
|
||||
// Set Scan Range
|
||||
long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
|
||||
long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
|
||||
s.setTimeRange(startTime, endTime);
|
||||
// Set cache blocks
|
||||
s.setCacheBlocks(false);
|
||||
// set Start and Stop row
|
||||
if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
|
||||
s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
|
||||
}
|
||||
if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
|
||||
s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
|
||||
}
|
||||
// Set Scan Column Family
|
||||
boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
|
||||
if (raw) {
|
||||
s.setRaw(raw);
|
||||
}
|
||||
for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
|
||||
s.addFamily(Bytes.toBytes(columnFamily));
|
||||
}
|
||||
// Set RowFilter or Prefix Filter if applicable.
|
||||
Filter exportFilter = getExportFilter(args);
|
||||
if (exportFilter!= null) {
|
||||
LOG.info("Setting Scan Filter for Export.");
|
||||
s.setFilter(exportFilter);
|
||||
}
|
||||
List<String> labels = null;
|
||||
if (conf.get(EXPORT_VISIBILITY_LABELS) != null) {
|
||||
labels = Arrays.asList(conf.getStrings(EXPORT_VISIBILITY_LABELS));
|
||||
if (!labels.isEmpty()) {
|
||||
s.setAuthorizations(new Authorizations(labels));
|
||||
}
|
||||
}
|
||||
|
||||
int batching = conf.getInt(EXPORT_BATCHING, -1);
|
||||
if (batching != -1) {
|
||||
try {
|
||||
s.setBatch(batching);
|
||||
} catch (IncompatibleFilterException e) {
|
||||
LOG.error("Batching could not be set", e);
|
||||
}
|
||||
}
|
||||
|
||||
int caching = conf.getInt(EXPORT_CACHING, 100);
|
||||
if (caching != -1) {
|
||||
try {
|
||||
s.setCaching(caching);
|
||||
} catch (IncompatibleFilterException e) {
|
||||
LOG.error("Caching could not be set", e);
|
||||
}
|
||||
}
|
||||
LOG.info("versions=" + versions + ", starttime=" + startTime
|
||||
+ ", endtime=" + endTime + ", keepDeletedCells=" + raw
|
||||
+ ", visibility labels=" + labels);
|
||||
return s;
|
||||
}
|
||||
|
||||
private ExportUtils() {
|
||||
}
|
||||
}
|
|
@ -44,12 +44,11 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -58,6 +57,8 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
|
@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -90,8 +92,9 @@ import org.mockito.stubbing.Answer;
|
|||
*/
|
||||
@Category({VerySlowMapReduceTests.class, MediumTests.class})
|
||||
public class TestImportExport {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestImportExport.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
|
||||
private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
|
||||
private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
|
||||
|
@ -104,10 +107,12 @@ public class TestImportExport {
|
|||
private static String FQ_OUTPUT_DIR;
|
||||
private static final String EXPORT_BATCH_SIZE = "100";
|
||||
|
||||
private static long now = System.currentTimeMillis();
|
||||
private static final long now = System.currentTimeMillis();
|
||||
private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
|
||||
private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
public static void beforeClass() throws Throwable {
|
||||
// Up the handlers; this test needs more than usual.
|
||||
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
||||
UTIL.startMiniCluster();
|
||||
|
@ -116,7 +121,7 @@ public class TestImportExport {
|
|||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
public static void afterClass() throws Throwable {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
@ -128,11 +133,16 @@ public class TestImportExport {
|
|||
LOG.info("Running " + name.getMethodName());
|
||||
}
|
||||
|
||||
@Before
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
public void cleanup() throws Throwable {
|
||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||
fs.delete(new Path(OUTPUT_DIR), true);
|
||||
if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
|
||||
UTIL.deleteTable(EXPORT_TABLE);
|
||||
}
|
||||
if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
|
||||
UTIL.deleteTable(IMPORT_TABLE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -143,12 +153,16 @@ public class TestImportExport {
|
|||
* @throws InterruptedException
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
boolean runExport(String[] args) throws Exception {
|
||||
protected boolean runExport(String[] args) throws Throwable {
|
||||
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
||||
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
|
||||
return status == 0;
|
||||
}
|
||||
|
||||
protected void runExportMain(String[] args) throws Throwable {
|
||||
Export.main(args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs an import job with the specified command line args
|
||||
* @param args
|
||||
|
@ -157,7 +171,7 @@ public class TestImportExport {
|
|||
* @throws InterruptedException
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
boolean runImport(String[] args) throws Exception {
|
||||
boolean runImport(String[] args) throws Throwable {
|
||||
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
||||
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
|
||||
return status == 0;
|
||||
|
@ -168,7 +182,7 @@ public class TestImportExport {
|
|||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testSimpleCase() throws Exception {
|
||||
public void testSimpleCase() throws Throwable {
|
||||
try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) {
|
||||
Put p = new Put(ROW1);
|
||||
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||
|
@ -223,21 +237,21 @@ public class TestImportExport {
|
|||
/**
|
||||
* Test export hbase:meta table
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Throwable
|
||||
*/
|
||||
@Test
|
||||
public void testMetaExport() throws Exception {
|
||||
String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
|
||||
String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
|
||||
public void testMetaExport() throws Throwable {
|
||||
String[] args = new String[] { TableName.META_TABLE_NAME.getNameAsString(),
|
||||
FQ_OUTPUT_DIR, "1", "0", "0" };
|
||||
assertTrue(runExport(args));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test import data from 0.94 exported file
|
||||
* @throws Exception
|
||||
* @throws Throwable
|
||||
*/
|
||||
@Test
|
||||
public void testImport94Table() throws Exception {
|
||||
public void testImport94Table() throws Throwable {
|
||||
final String name = "exportedTableIn94Format";
|
||||
URL url = TestImportExport.class.getResource(name);
|
||||
File f = new File(url.toURI());
|
||||
|
@ -273,11 +287,13 @@ public class TestImportExport {
|
|||
* Test export scanner batching
|
||||
*/
|
||||
@Test
|
||||
public void testExportScannerBatching() throws Exception {
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||
.setMaxVersions(1)
|
||||
);
|
||||
public void testExportScannerBatching() throws Throwable {
|
||||
TableDescriptor desc = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(1)
|
||||
.build())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||
|
||||
|
@ -290,7 +306,7 @@ public class TestImportExport {
|
|||
t.put(p);
|
||||
|
||||
String[] args = new String[] {
|
||||
"-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
|
||||
"-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
|
||||
name.getMethodName(),
|
||||
FQ_OUTPUT_DIR
|
||||
};
|
||||
|
@ -302,12 +318,14 @@ public class TestImportExport {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithDeletes() throws Exception {
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
);
|
||||
public void testWithDeletes() throws Throwable {
|
||||
TableDescriptor desc = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
.build())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||
|
||||
|
@ -327,7 +345,7 @@ public class TestImportExport {
|
|||
}
|
||||
|
||||
String[] args = new String[] {
|
||||
"-D" + Export.RAW_SCAN + "=true",
|
||||
"-D" + ExportUtils.RAW_SCAN + "=true",
|
||||
name.getMethodName(),
|
||||
FQ_OUTPUT_DIR,
|
||||
"1000", // max number of key versions per key to export
|
||||
|
@ -335,11 +353,13 @@ public class TestImportExport {
|
|||
assertTrue(runExport(args));
|
||||
|
||||
final String IMPORT_TABLE = name.getMethodName() + "import";
|
||||
desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
);
|
||||
desc = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(IMPORT_TABLE))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
.build())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||
args = new String[] {
|
||||
|
@ -366,13 +386,15 @@ public class TestImportExport {
|
|||
|
||||
|
||||
@Test
|
||||
public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
|
||||
public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
|
||||
final TableName exportTable = TableName.valueOf(name.getMethodName());
|
||||
HTableDescriptor desc = new HTableDescriptor(exportTable);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
);
|
||||
TableDescriptor desc = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
.build())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
|
||||
Table exportT = UTIL.getConnection().getTable(exportTable);
|
||||
|
@ -397,18 +419,20 @@ public class TestImportExport {
|
|||
|
||||
|
||||
String[] args = new String[] {
|
||||
"-D" + Export.RAW_SCAN + "=true", exportTable.getNameAsString(),
|
||||
"-D" + ExportUtils.RAW_SCAN + "=true", exportTable.getNameAsString(),
|
||||
FQ_OUTPUT_DIR,
|
||||
"1000", // max number of key versions per key to export
|
||||
};
|
||||
assertTrue(runExport(args));
|
||||
|
||||
final String importTable = name.getMethodName() + "import";
|
||||
desc = new HTableDescriptor(TableName.valueOf(importTable));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
);
|
||||
desc = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(importTable))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
.build())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
|
||||
Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
|
||||
|
@ -429,7 +453,7 @@ public class TestImportExport {
|
|||
Result exportedTResult = exportedTScanner.next();
|
||||
try {
|
||||
Result.compareResults(exportedTResult, importedTResult);
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
fail("Original and imported tables data comparision failed with error:"+e.getMessage());
|
||||
} finally {
|
||||
exportT.close();
|
||||
|
@ -442,10 +466,14 @@ public class TestImportExport {
|
|||
* attempt with invalid values.
|
||||
*/
|
||||
@Test
|
||||
public void testWithFilter() throws Exception {
|
||||
public void testWithFilter() throws Throwable {
|
||||
// Create simple table to export
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
||||
TableDescriptor desc = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.build())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
|
||||
|
||||
|
@ -468,8 +496,12 @@ public class TestImportExport {
|
|||
|
||||
// Import to a new table
|
||||
final String IMPORT_TABLE = name.getMethodName() + "import";
|
||||
desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
||||
desc = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(IMPORT_TABLE))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.build())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
|
||||
Table importTable = UTIL.getConnection().getTable(desc.getTableName());
|
||||
|
@ -501,8 +533,6 @@ public class TestImportExport {
|
|||
|
||||
/**
|
||||
* Count the number of keyvalues in the specified table for the given timerange
|
||||
* @param start
|
||||
* @param end
|
||||
* @param table
|
||||
* @return
|
||||
* @throws IOException
|
||||
|
@ -523,7 +553,7 @@ public class TestImportExport {
|
|||
* test main method. Import should print help and call System.exit
|
||||
*/
|
||||
@Test
|
||||
public void testImportMain() throws Exception {
|
||||
public void testImportMain() throws Throwable {
|
||||
PrintStream oldPrintStream = System.err;
|
||||
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
|
||||
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
|
||||
|
@ -548,11 +578,56 @@ public class TestImportExport {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExportScan() throws Exception {
|
||||
int version = 100;
|
||||
long startTime = System.currentTimeMillis();
|
||||
long endTime = startTime + 1;
|
||||
String prefix = "row";
|
||||
String label_0 = "label_0";
|
||||
String label_1 = "label_1";
|
||||
String[] args = {
|
||||
"table",
|
||||
"outputDir",
|
||||
String.valueOf(version),
|
||||
String.valueOf(startTime),
|
||||
String.valueOf(endTime),
|
||||
prefix
|
||||
};
|
||||
Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
|
||||
assertEquals(version, scan.getMaxVersions());
|
||||
assertEquals(startTime, scan.getTimeRange().getMin());
|
||||
assertEquals(endTime, scan.getTimeRange().getMax());
|
||||
assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
|
||||
assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
|
||||
String[] argsWithLabels = {
|
||||
"-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1,
|
||||
"table",
|
||||
"outputDir",
|
||||
String.valueOf(version),
|
||||
String.valueOf(startTime),
|
||||
String.valueOf(endTime),
|
||||
prefix
|
||||
};
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
// parse the "-D" options
|
||||
String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
|
||||
Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
|
||||
assertEquals(version, scanWithLabels.getMaxVersions());
|
||||
assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
|
||||
assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
|
||||
assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
|
||||
assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
|
||||
assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
|
||||
assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
|
||||
assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
|
||||
}
|
||||
|
||||
/**
|
||||
* test main method. Export should print help and call System.exit
|
||||
*/
|
||||
@Test
|
||||
public void testExportMain() throws Exception {
|
||||
public void testExportMain() throws Throwable {
|
||||
PrintStream oldPrintStream = System.err;
|
||||
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
|
||||
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
|
||||
|
@ -562,7 +637,7 @@ public class TestImportExport {
|
|||
System.setErr(new PrintStream(data));
|
||||
try {
|
||||
System.setErr(new PrintStream(data));
|
||||
Export.main(args);
|
||||
runExportMain(args);
|
||||
fail("should be SecurityException");
|
||||
} catch (SecurityException e) {
|
||||
assertEquals(-1, newSecurityManager.getExitCode());
|
||||
|
@ -574,10 +649,9 @@ public class TestImportExport {
|
|||
assertTrue(
|
||||
errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
|
||||
assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
|
||||
assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100"));
|
||||
assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false"));
|
||||
assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false"));
|
||||
assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10"));
|
||||
assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
|
||||
assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
|
||||
assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
|
||||
} finally {
|
||||
System.setErr(oldPrintStream);
|
||||
System.setSecurityManager(SECURITY_MANAGER);
|
||||
|
@ -589,7 +663,7 @@ public class TestImportExport {
|
|||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test
|
||||
public void testKeyValueImporter() throws Exception {
|
||||
public void testKeyValueImporter() throws Throwable {
|
||||
KeyValueImporter importer = new KeyValueImporter();
|
||||
Configuration configuration = new Configuration();
|
||||
Context ctx = mock(Context.class);
|
||||
|
@ -638,7 +712,7 @@ public class TestImportExport {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDurability() throws Exception {
|
||||
public void testDurability() throws Throwable {
|
||||
// Create an export table.
|
||||
String exportTableName = name.getMethodName() + "export";
|
||||
try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
|
||||
|
|
|
@ -168,7 +168,7 @@ public class SecureTestUtil {
|
|||
* To indicate the action was not allowed, either throw an AccessDeniedException
|
||||
* or return an empty list of KeyValues.
|
||||
*/
|
||||
protected static interface AccessTestAction extends PrivilegedExceptionAction<Object> { }
|
||||
public interface AccessTestAction extends PrivilegedExceptionAction<Object> { }
|
||||
|
||||
/** This fails only in case of ADE or empty list for any of the actions. */
|
||||
public static void verifyAllowed(User user, AccessTestAction... actions) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue