HBASE-15806 An endpoint-based export tool (ChiaPing Tsai)
This commit is contained in:
parent
7130a222ce
commit
c03ea895c4
|
@ -180,6 +180,7 @@
|
||||||
<include>Comparator.proto</include>
|
<include>Comparator.proto</include>
|
||||||
<include>Encryption.proto</include>
|
<include>Encryption.proto</include>
|
||||||
<include>ErrorHandling.proto</include>
|
<include>ErrorHandling.proto</include>
|
||||||
|
<include>Export.proto</include>
|
||||||
<include>FS.proto</include>
|
<include>FS.proto</include>
|
||||||
<include>Filter.proto</include>
|
<include>Filter.proto</include>
|
||||||
<include>HBase.proto</include>
|
<include>HBase.proto</include>
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package hbase.pb;
|
||||||
|
|
||||||
|
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
||||||
|
option java_outer_classname = "ExportProtos";
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
option optimize_for = SPEED;
|
||||||
|
option java_generic_services = true;
|
||||||
|
|
||||||
|
import "Client.proto";
|
||||||
|
|
||||||
|
service ExportService {
|
||||||
|
rpc export (ExportRequest) returns (ExportResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message ExportRequest {
|
||||||
|
required Scan scan = 1;
|
||||||
|
required string outputPath = 2;
|
||||||
|
optional bool compressed = 3 [default = false];
|
||||||
|
optional string compressType = 4;
|
||||||
|
optional string compressCodec = 5;
|
||||||
|
}
|
||||||
|
message ExportResponse {
|
||||||
|
required uint64 rowCount = 1;
|
||||||
|
required uint64 cellCount = 2;
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,280 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcCallback;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.Service;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
|
||||||
|
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ExportProtos;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.Export;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.Import;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Export an HBase table.
|
||||||
|
* Writes content to sequence files up in HDFS. Use {@link Import} to read it
|
||||||
|
* back in again.
|
||||||
|
* It is implemented by the endpoint technique.
|
||||||
|
* @see Export
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class ExportEndpoint extends ExportProtos.ExportService
|
||||||
|
implements Coprocessor, CoprocessorService {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ExportEndpoint.class);
|
||||||
|
private RegionCoprocessorEnvironment env = null;
|
||||||
|
@Override
|
||||||
|
public void start(CoprocessorEnvironment environment) throws IOException {
|
||||||
|
if (environment instanceof RegionCoprocessorEnvironment) {
|
||||||
|
this.env = (RegionCoprocessorEnvironment) environment;
|
||||||
|
} else {
|
||||||
|
throw new CoprocessorException("Must be loaded on a table region!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Service getService() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
private static boolean getCompression(final ExportProtos.ExportRequest request) {
|
||||||
|
if (request.hasCompressed()) {
|
||||||
|
return request.getCompressed();
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) {
|
||||||
|
if (!request.hasCompressType()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return SequenceFile.CompressionType.valueOf(request.getCompressType());
|
||||||
|
}
|
||||||
|
private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) {
|
||||||
|
if (!request.hasCompressCodec()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Class<? extends CompressionCodec> codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
|
||||||
|
return ReflectionUtils.newInstance(codecClass, conf);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new IllegalArgumentException("Compression codec "
|
||||||
|
+ request.getCompressCodec()+ " was not found.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private static SequenceFile.Writer.Option getOutputPath(final Configuration conf,
|
||||||
|
final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
|
||||||
|
Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName());
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
if (fs.exists(file)) {
|
||||||
|
throw new IOException(file + " exists");
|
||||||
|
}
|
||||||
|
return SequenceFile.Writer.file(file);
|
||||||
|
}
|
||||||
|
private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
|
||||||
|
final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
|
||||||
|
List<SequenceFile.Writer.Option> rval = new LinkedList<>();
|
||||||
|
rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
|
||||||
|
rval.add(SequenceFile.Writer.valueClass(Result.class));
|
||||||
|
rval.add(getOutputPath(conf, info, request));
|
||||||
|
boolean compressed = getCompression(request);
|
||||||
|
if (compressed) {
|
||||||
|
SequenceFile.CompressionType type = getCompressionType(request);
|
||||||
|
if (type != null) {
|
||||||
|
CompressionCodec codec = getCompressionCodec(conf, request);
|
||||||
|
rval.add(SequenceFile.Writer.compression(type, codec));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
private Scan validateKey(final HRegionInfo region, final Scan scan) {
|
||||||
|
byte[] regionStartKey = region.getStartKey();
|
||||||
|
byte[] originStartKey = scan.getStartRow();
|
||||||
|
if (originStartKey == null
|
||||||
|
|| Bytes.compareTo(originStartKey, regionStartKey) < 0) {
|
||||||
|
scan.setStartRow(regionStartKey);
|
||||||
|
}
|
||||||
|
byte[] regionEndKey = region.getEndKey();
|
||||||
|
byte[] originEndKey = scan.getStopRow();
|
||||||
|
if (originEndKey == null
|
||||||
|
|| Bytes.compareTo(originEndKey, regionEndKey) > 0) {
|
||||||
|
scan.setStartRow(regionEndKey);
|
||||||
|
}
|
||||||
|
return scan;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void export(RpcController controller, ExportProtos.ExportRequest request,
|
||||||
|
RpcCallback<ExportProtos.ExportResponse> done) {
|
||||||
|
Region region = env.getRegion();
|
||||||
|
Configuration conf = HBaseConfiguration.create(env.getConfiguration());
|
||||||
|
conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName());
|
||||||
|
try {
|
||||||
|
Scan scan = validateKey(region.getRegionInfo(), ProtobufUtil.toScan(request.getScan()));
|
||||||
|
ExportProtos.ExportResponse response = processData(conf, region, scan,
|
||||||
|
getWriterOptions(conf, region.getRegionInfo(), request));
|
||||||
|
done.run(response);
|
||||||
|
} catch (IOException e) {
|
||||||
|
ResponseConverter.setControllerException(controller, e);
|
||||||
|
LOG.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private static ExportProtos.ExportResponse processData(final Configuration conf,
|
||||||
|
final Region region, final Scan scan, final List<SequenceFile.Writer.Option> opts) throws IOException {
|
||||||
|
region.startRegionOperation();
|
||||||
|
try (SequenceFile.Writer out = SequenceFile.createWriter(conf,
|
||||||
|
opts.toArray(new SequenceFile.Writer.Option[opts.size()]));
|
||||||
|
RegionScanner scanner = region.getScanner(scan)) {
|
||||||
|
ImmutableBytesWritable key = new ImmutableBytesWritable();
|
||||||
|
long rowCount = 0;
|
||||||
|
long cellCount = 0;
|
||||||
|
List<Cell> buf = new ArrayList<>();
|
||||||
|
boolean hasMore;
|
||||||
|
do {
|
||||||
|
hasMore = scanner.nextRaw(buf);
|
||||||
|
if (!buf.isEmpty()) {
|
||||||
|
Cell firstCell = buf.get(0);
|
||||||
|
for (Cell cell : buf) {
|
||||||
|
if (Bytes.compareTo(
|
||||||
|
firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(),
|
||||||
|
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) {
|
||||||
|
throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
key.set(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength());
|
||||||
|
out.append(key, Result.create(buf));
|
||||||
|
++rowCount;
|
||||||
|
cellCount += buf.size();
|
||||||
|
buf.clear();
|
||||||
|
}
|
||||||
|
} while (hasMore);
|
||||||
|
return ExportProtos.ExportResponse.newBuilder()
|
||||||
|
.setRowCount(rowCount)
|
||||||
|
.setCellCount(cellCount)
|
||||||
|
.build();
|
||||||
|
} finally {
|
||||||
|
region.closeRegionOperation();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static void main(String[] args) throws IOException, Throwable {
|
||||||
|
run(HBaseConfiguration.create(), args);
|
||||||
|
}
|
||||||
|
public static Map<byte[], ExportProtos.ExportResponse> run(final Configuration conf,
|
||||||
|
final String[] args) throws IOException, Throwable {
|
||||||
|
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
|
||||||
|
if (!Export.checkArguments(otherArgs)) {
|
||||||
|
Export.usage("Wrong number of arguments: " + otherArgs.length);
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
TableName tableName = TableName.valueOf(otherArgs[0]);
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
String dir = otherArgs[1];
|
||||||
|
checkDir(fs, dir);
|
||||||
|
Scan scan = Export.getConfiguredScanForJob(conf, otherArgs);
|
||||||
|
final ExportProtos.ExportRequest request = getConfiguredRequestForJob(conf, otherArgs, scan);
|
||||||
|
try (Connection con = ConnectionFactory.createConnection(conf);
|
||||||
|
Table table = con.getTable(tableName)) {
|
||||||
|
return table.coprocessorService(ExportProtos.ExportService.class,
|
||||||
|
scan.getStartRow(),
|
||||||
|
scan.getStopRow(), new Batch.Call<ExportProtos.ExportService, ExportProtos.ExportResponse>() {
|
||||||
|
@Override
|
||||||
|
public ExportProtos.ExportResponse call(ExportProtos.ExportService service) throws IOException {
|
||||||
|
ServerRpcController controller = new ServerRpcController();
|
||||||
|
BlockingRpcCallback<ExportProtos.ExportResponse> rpcCallback = new BlockingRpcCallback<>();
|
||||||
|
service.export(controller, request, rpcCallback);
|
||||||
|
if (controller.failedOnException()) {
|
||||||
|
throw controller.getFailedOn();
|
||||||
|
}
|
||||||
|
return rpcCallback.get();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Throwable e) {
|
||||||
|
fs.delete(new Path(dir), true);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private static void checkDir(final FileSystem fs, final String path) throws IOException {
|
||||||
|
Path dir = fs.makeQualified(new Path(path));
|
||||||
|
if (fs.exists(dir)) {
|
||||||
|
throw new RuntimeException("The " + path + " exists");
|
||||||
|
}
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
fs.setPermission(dir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
|
||||||
|
}
|
||||||
|
private static ExportProtos.ExportRequest getConfiguredRequestForJob(Configuration conf,
|
||||||
|
String[] args, final Scan scan) throws IOException {
|
||||||
|
String dir = args[1];
|
||||||
|
boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, true);
|
||||||
|
String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE,
|
||||||
|
SequenceFile.CompressionType.RECORD.toString());
|
||||||
|
String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC,
|
||||||
|
DefaultCodec.class.getName());
|
||||||
|
LOG.info("compressed=" + compressed
|
||||||
|
+ ", compression type=" + compressionType
|
||||||
|
+ ", compression codec=" + compressionCodec);
|
||||||
|
return ExportProtos.ExportRequest.newBuilder()
|
||||||
|
.setScan(ProtobufUtil.toScan(scan))
|
||||||
|
.setOutputPath(dir)
|
||||||
|
.setCompressed(compressed)
|
||||||
|
.setCompressCodec(compressionCodec)
|
||||||
|
.setCompressType(compressionType)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -86,7 +86,7 @@ public class Export extends Configured implements Tool {
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
|
public static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
|
||||||
Scan s = new Scan();
|
Scan s = new Scan();
|
||||||
// Optional arguments.
|
// Optional arguments.
|
||||||
// Set Scan Versions
|
// Set Scan Versions
|
||||||
|
@ -150,7 +150,7 @@ public class Export extends Configured implements Tool {
|
||||||
/*
|
/*
|
||||||
* @param errorMsg Error message. Can be null.
|
* @param errorMsg Error message. Can be null.
|
||||||
*/
|
*/
|
||||||
private static void usage(final String errorMsg) {
|
public static void usage(final String errorMsg) {
|
||||||
if (errorMsg != null && errorMsg.length() > 0) {
|
if (errorMsg != null && errorMsg.length() > 0) {
|
||||||
System.err.println("ERROR: " + errorMsg);
|
System.err.println("ERROR: " + errorMsg);
|
||||||
}
|
}
|
||||||
|
@ -176,11 +176,12 @@ public class Export extends Configured implements Tool {
|
||||||
System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
|
System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
|
||||||
+ " -D" + EXPORT_BATCHING + "=10");
|
+ " -D" + EXPORT_BATCHING + "=10");
|
||||||
}
|
}
|
||||||
|
public static boolean checkArguments(final String[] args) {
|
||||||
|
return args.length >= 2;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
if (args.length < 2) {
|
if (!checkArguments(args)) {
|
||||||
usage("Wrong number of arguments: " + args.length);
|
usage("Wrong number of arguments: " + args.length);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,13 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -34,11 +28,10 @@ import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableMap;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -59,6 +52,8 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ExportEndpoint;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||||
|
@ -72,7 +67,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
import org.apache.hadoop.mapreduce.Mapper.Context;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -85,6 +83,12 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the table import and table export MR job functionality
|
* Tests the table import and table export MR job functionality
|
||||||
|
@ -106,14 +110,40 @@ public class TestImportExport {
|
||||||
private static final String EXPORT_BATCH_SIZE = "100";
|
private static final String EXPORT_BATCH_SIZE = "100";
|
||||||
|
|
||||||
private static long now = System.currentTimeMillis();
|
private static long now = System.currentTimeMillis();
|
||||||
|
private static final Exporter EXPORTER_MR = new Exporter() {
|
||||||
|
@Override
|
||||||
|
public boolean runExport(String[] args) throws Throwable {
|
||||||
|
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
||||||
|
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
|
||||||
|
return status == 0;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "MR-based export";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private static final Exporter EXPORTER_ENDPOINT = new Exporter() {
|
||||||
|
@Override
|
||||||
|
public boolean runExport(String[] args) throws Throwable {
|
||||||
|
ExportEndpoint.run(new Configuration(UTIL.getConfiguration()), args);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Endpoint-based export";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private static final List<Exporter> EXPORTERS = Arrays.asList(EXPORTER_MR, EXPORTER_ENDPOINT);
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeClass() throws Exception {
|
public static void beforeClass() throws Exception {
|
||||||
// Up the handlers; this test needs more than usual.
|
// Up the handlers; this test needs more than usual.
|
||||||
|
UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
ExportEndpoint.class.getName());
|
||||||
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
||||||
UTIL.startMiniCluster();
|
UTIL.startMiniCluster();
|
||||||
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
FQ_OUTPUT_DIR =
|
FQ_OUTPUT_DIR =
|
||||||
new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
|
new Path(OUTPUT_DIR).makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -131,47 +161,108 @@ public class TestImportExport {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@After
|
@After
|
||||||
public void cleanup() throws Exception {
|
public void cleanup() throws IOException {
|
||||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
deleteOutput();
|
||||||
fs.delete(new Path(OUTPUT_DIR), true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private static void deleteOutput() throws IOException {
|
||||||
* Runs an export job with the specified command line args
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
* @param args
|
fs.delete(new Path(OUTPUT_DIR), true);
|
||||||
* @return true if job completed successfully
|
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException
|
|
||||||
* @throws ClassNotFoundException
|
|
||||||
*/
|
|
||||||
boolean runExport(String[] args) throws Exception {
|
|
||||||
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
|
||||||
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
|
|
||||||
return status == 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs an import job with the specified command line args
|
* Runs an import job with the specified command line args
|
||||||
* @param args
|
* @param args
|
||||||
* @return true if job completed successfully
|
* @return true if job completed successfully
|
||||||
* @throws IOException
|
* @throws Exception
|
||||||
* @throws InterruptedException
|
|
||||||
* @throws ClassNotFoundException
|
|
||||||
*/
|
*/
|
||||||
boolean runImport(String[] args) throws Exception {
|
boolean runImport(String[] args) throws Exception {
|
||||||
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
||||||
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
|
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
|
||||||
return status == 0;
|
return status == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test simple replication case with column mapping
|
* Test the writer's options.
|
||||||
* @throws Exception
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleCase() throws Exception {
|
public void testOutputFileFormat() throws IOException, Throwable {
|
||||||
String EXPORT_TABLE = "exportSimpleCase";
|
String exportTable = "testOutputFileFormat";
|
||||||
try (Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);) {
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
testOutputFileFormat(exportTable, exporter);
|
||||||
|
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Test the writer's options.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void testOutputFileFormat(final String exportTable, final Exporter exporter) throws IOException, Throwable {
|
||||||
|
String codec = BZip2Codec.class.getName();
|
||||||
|
String type = SequenceFile.CompressionType.RECORD.name();
|
||||||
|
try (Table t = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 3);) {
|
||||||
|
Put p = new Put(ROW1);
|
||||||
|
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
t.put(p);
|
||||||
|
p = new Put(ROW2);
|
||||||
|
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
t.put(p);
|
||||||
|
p = new Put(ROW3);
|
||||||
|
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
t.put(p);
|
||||||
|
}
|
||||||
|
//use compress
|
||||||
|
String[] args = new String[] {
|
||||||
|
// Only export row1 & row2.
|
||||||
|
"-D" + FileOutputFormat.COMPRESS + "=true",
|
||||||
|
"-D" + FileOutputFormat.COMPRESS_CODEC + "=" + codec,
|
||||||
|
"-D" + FileOutputFormat.COMPRESS_TYPE + "=" + type,
|
||||||
|
exportTable,
|
||||||
|
FQ_OUTPUT_DIR
|
||||||
|
};
|
||||||
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
|
List<FileStatus> files = Arrays.asList(fs.listStatus(new Path(FQ_OUTPUT_DIR)));
|
||||||
|
assertEquals(exporter.toString(), false, files.isEmpty());
|
||||||
|
Configuration copy = new Configuration(UTIL.getConfiguration());
|
||||||
|
//need to make a copy of the configuration because to make sure the Exporter has set the "io.serializations"
|
||||||
|
copy.setStrings("io.serializations", copy.get("io.serializations"),
|
||||||
|
ResultSerialization.class.getName());
|
||||||
|
for (FileStatus file : files) {
|
||||||
|
Path path = file.getPath();
|
||||||
|
//skips the MR meta output
|
||||||
|
if (path.getName().equals("_SUCCESS")) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
try (SequenceFile.Reader reader = new SequenceFile.Reader(
|
||||||
|
copy, SequenceFile.Reader.file(file.getPath()))) {
|
||||||
|
assertEquals(exporter.toString(), reader.getCompressionCodec().getClass().getName(), codec);
|
||||||
|
assertEquals(exporter.toString(), reader.getCompressionType().name(), type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Test simple replication case with column mapping
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSimpleCase() throws IOException, Throwable {
|
||||||
|
String exportTable = "exportSimpleCase";
|
||||||
|
String importTable = "importTableSimpleCase";
|
||||||
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
testSimpleCase(exportTable, importTable, exporter);
|
||||||
|
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||||
|
UTIL.deleteTable(TableName.valueOf(importTable));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Test simple replication case with column mapping.
|
||||||
|
*/
|
||||||
|
public void testSimpleCase(final String exportTable, final String importTable,
|
||||||
|
final Exporter exporter) throws IOException, Throwable {
|
||||||
|
try (Table t = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 3);) {
|
||||||
Put p = new Put(ROW1);
|
Put p = new Put(ROW1);
|
||||||
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
||||||
|
@ -189,53 +280,53 @@ public class TestImportExport {
|
||||||
t.put(p);
|
t.put(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
// Only export row1 & row2.
|
// Only export row1 & row2.
|
||||||
"-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
|
"-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
|
||||||
"-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
|
"-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
|
||||||
EXPORT_TABLE,
|
exportTable,
|
||||||
FQ_OUTPUT_DIR,
|
FQ_OUTPUT_DIR,
|
||||||
"1000", // max number of key versions per key to export
|
"1000", // max number of key versions per key to export
|
||||||
|
};
|
||||||
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
|
||||||
|
try (Table t = UTIL.createTable(TableName.valueOf(importTable), FAMILYB, 3);) {
|
||||||
|
args = new String[] {
|
||||||
|
"-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
|
||||||
|
importTable,
|
||||||
|
FQ_OUTPUT_DIR
|
||||||
};
|
};
|
||||||
assertTrue(runExport(args));
|
assertTrue(exporter.toString(), runImport(args));
|
||||||
|
|
||||||
String IMPORT_TABLE = "importTableSimpleCase";
|
Get g = new Get(ROW1);
|
||||||
try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
|
g.setMaxVersions();
|
||||||
args = new String[] {
|
Result r = t.get(g);
|
||||||
"-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
|
assertEquals(exporter.toString(), 3, r.size());
|
||||||
IMPORT_TABLE,
|
g = new Get(ROW2);
|
||||||
FQ_OUTPUT_DIR
|
g.setMaxVersions();
|
||||||
};
|
r = t.get(g);
|
||||||
assertTrue(runImport(args));
|
assertEquals(exporter.toString(), 3, r.size());
|
||||||
|
g = new Get(ROW3);
|
||||||
Get g = new Get(ROW1);
|
r = t.get(g);
|
||||||
g.setMaxVersions();
|
assertEquals(exporter.toString(), 0, r.size());
|
||||||
Result r = t.get(g);
|
}
|
||||||
assertEquals(3, r.size());
|
|
||||||
g = new Get(ROW2);
|
|
||||||
g.setMaxVersions();
|
|
||||||
r = t.get(g);
|
|
||||||
assertEquals(3, r.size());
|
|
||||||
g = new Get(ROW3);
|
|
||||||
r = t.get(g);
|
|
||||||
assertEquals(0, r.size());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test export hbase:meta table
|
* Test export hbase:meta table
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMetaExport() throws Exception {
|
public void testMetaExport() throws IOException, Throwable {
|
||||||
String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
|
String exportTable = TableName.META_TABLE_NAME.getNameAsString();
|
||||||
String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
|
String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1", "0", "0" };
|
||||||
assertTrue(runExport(args));
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test import data from 0.94 exported file
|
* Test import data from 0.94 exported file.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -252,11 +343,11 @@ public class TestImportExport {
|
||||||
Path importPath = new Path(f.toURI());
|
Path importPath = new Path(f.toURI());
|
||||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
|
fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
|
||||||
String IMPORT_TABLE = name;
|
String importTable = name;
|
||||||
try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
|
try (Table t = UTIL.createTable(TableName.valueOf(importTable), Bytes.toBytes("f1"), 3);) {
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
"-Dhbase.import.version=0.94" ,
|
"-Dhbase.import.version=0.94" ,
|
||||||
IMPORT_TABLE, FQ_OUTPUT_DIR
|
importTable, FQ_OUTPUT_DIR
|
||||||
};
|
};
|
||||||
assertTrue(runImport(args));
|
assertTrue(runImport(args));
|
||||||
/* exportedTableIn94Format contains 5 rows
|
/* exportedTableIn94Format contains 5 rows
|
||||||
|
@ -270,18 +361,28 @@ public class TestImportExport {
|
||||||
assertEquals(5, UTIL.countRows(t));
|
assertEquals(5, UTIL.countRows(t));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test export scanner batching
|
* Test export scanner batching
|
||||||
|
* @throws java.lang.IOException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testExportScannerBatching() throws Exception {
|
public void testExportScannerBatching() throws IOException, Throwable {
|
||||||
String BATCH_TABLE = "exportWithBatch";
|
String exportTable = "exportWithBatch";
|
||||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(BATCH_TABLE));
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
testExportScannerBatching(exportTable, exporter);
|
||||||
|
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Test export scanner batching.
|
||||||
|
*/
|
||||||
|
public void testExportScannerBatching(final String exportTable, final Exporter exporter) throws IOException, Throwable {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable));
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
.setMaxVersions(1)
|
.setMaxVersions(1)
|
||||||
);
|
);
|
||||||
UTIL.getHBaseAdmin().createTable(desc);
|
UTIL.getAdmin().createTable(desc);
|
||||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||||
|
|
||||||
Put p = new Put(ROW1);
|
Put p = new Put(ROW1);
|
||||||
|
@ -294,25 +395,34 @@ public class TestImportExport {
|
||||||
|
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
"-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
|
"-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
|
||||||
BATCH_TABLE,
|
exportTable,
|
||||||
FQ_OUTPUT_DIR
|
FQ_OUTPUT_DIR
|
||||||
};
|
};
|
||||||
assertTrue(runExport(args));
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
fs.delete(new Path(FQ_OUTPUT_DIR), true);
|
fs.delete(new Path(FQ_OUTPUT_DIR), true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithDeletes() throws Exception {
|
public void testWithDeletes() throws IOException, Throwable {
|
||||||
String EXPORT_TABLE = "exportWithDeletes";
|
String exportTable = "exportWithDeletes";
|
||||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
|
String importTable = "importWithDeletes";
|
||||||
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
testWithDeletes(exportTable, importTable, exporter);
|
||||||
|
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||||
|
UTIL.deleteTable(TableName.valueOf(importTable));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void testWithDeletes(final String exportTable, final String importTable,
|
||||||
|
final Exporter exporter) throws IOException, Throwable {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable));
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
.setMaxVersions(5)
|
.setMaxVersions(5)
|
||||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||||
);
|
);
|
||||||
UTIL.getHBaseAdmin().createTable(desc);
|
UTIL.getAdmin().createTable(desc);
|
||||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||||
|
|
||||||
Put p = new Put(ROW1);
|
Put p = new Put(ROW1);
|
||||||
|
@ -332,25 +442,23 @@ public class TestImportExport {
|
||||||
|
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
"-D" + Export.RAW_SCAN + "=true",
|
"-D" + Export.RAW_SCAN + "=true",
|
||||||
EXPORT_TABLE,
|
exportTable,
|
||||||
FQ_OUTPUT_DIR,
|
FQ_OUTPUT_DIR,
|
||||||
"1000", // max number of key versions per key to export
|
"1000", // max number of key versions per key to export
|
||||||
};
|
};
|
||||||
assertTrue(runExport(args));
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
desc = new HTableDescriptor(TableName.valueOf(importTable));
|
||||||
String IMPORT_TABLE = "importWithDeletes";
|
|
||||||
desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
|
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
.setMaxVersions(5)
|
.setMaxVersions(5)
|
||||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||||
);
|
);
|
||||||
UTIL.getHBaseAdmin().createTable(desc);
|
UTIL.getAdmin().createTable(desc);
|
||||||
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
|
||||||
args = new String[] {
|
args = new String[] {
|
||||||
IMPORT_TABLE,
|
importTable,
|
||||||
FQ_OUTPUT_DIR
|
FQ_OUTPUT_DIR
|
||||||
};
|
};
|
||||||
assertTrue(runImport(args));
|
assertTrue(exporter.toString(), runImport(args));
|
||||||
|
|
||||||
Scan s = new Scan();
|
Scan s = new Scan();
|
||||||
s.setMaxVersions();
|
s.setMaxVersions();
|
||||||
|
@ -358,29 +466,36 @@ public class TestImportExport {
|
||||||
ResultScanner scanner = t.getScanner(s);
|
ResultScanner scanner = t.getScanner(s);
|
||||||
Result r = scanner.next();
|
Result r = scanner.next();
|
||||||
Cell[] res = r.rawCells();
|
Cell[] res = r.rawCells();
|
||||||
assertTrue(CellUtil.isDeleteFamily(res[0]));
|
assertTrue(exporter.toString(), CellUtil.isDeleteFamily(res[0]));
|
||||||
assertEquals(now+4, res[1].getTimestamp());
|
assertEquals(exporter.toString(), now+4, res[1].getTimestamp());
|
||||||
assertEquals(now+3, res[2].getTimestamp());
|
assertEquals(exporter.toString(), now+3, res[2].getTimestamp());
|
||||||
assertTrue(CellUtil.isDelete(res[3]));
|
assertTrue(exporter.toString(), CellUtil.isDelete(res[3]));
|
||||||
assertEquals(now+2, res[4].getTimestamp());
|
assertEquals(exporter.toString(), now+2, res[4].getTimestamp());
|
||||||
assertEquals(now+1, res[5].getTimestamp());
|
assertEquals(exporter.toString(), now+1, res[5].getTimestamp());
|
||||||
assertEquals(now, res[6].getTimestamp());
|
assertEquals(exporter.toString(), now, res[6].getTimestamp());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
|
public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws IOException, Throwable {
|
||||||
TableName EXPORT_TABLE =
|
String exportTable = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
|
||||||
TableName.valueOf("exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily");
|
String importTable = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
|
||||||
HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily(exportTable, importTable, exporter);
|
||||||
|
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||||
|
UTIL.deleteTable(TableName.valueOf(importTable));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily(final String exportTable, final String importTable,
|
||||||
|
final Exporter exporter) throws IOException, Throwable {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable));
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
.setMaxVersions(5)
|
.setMaxVersions(5)
|
||||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||||
);
|
);
|
||||||
UTIL.getHBaseAdmin().createTable(desc);
|
UTIL.getAdmin().createTable(desc);
|
||||||
|
|
||||||
Table exportT = UTIL.getConnection().getTable(EXPORT_TABLE);
|
Table exportT = UTIL.getConnection().getTable(desc.getTableName());
|
||||||
|
|
||||||
//Add first version of QUAL
|
//Add first version of QUAL
|
||||||
Put p = new Put(ROW1);
|
Put p = new Put(ROW1);
|
||||||
|
@ -402,26 +517,24 @@ public class TestImportExport {
|
||||||
|
|
||||||
|
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
"-D" + Export.RAW_SCAN + "=true", EXPORT_TABLE.getNameAsString(),
|
"-D" + Export.RAW_SCAN + "=true", exportTable,
|
||||||
FQ_OUTPUT_DIR,
|
FQ_OUTPUT_DIR,
|
||||||
"1000", // max number of key versions per key to export
|
"1000", // max number of key versions per key to export
|
||||||
};
|
};
|
||||||
assertTrue(runExport(args));
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
desc = new HTableDescriptor(TableName.valueOf(importTable));
|
||||||
String IMPORT_TABLE = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
|
|
||||||
desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
|
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
.setMaxVersions(5)
|
.setMaxVersions(5)
|
||||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||||
);
|
);
|
||||||
UTIL.getHBaseAdmin().createTable(desc);
|
UTIL.getAdmin().createTable(desc);
|
||||||
|
|
||||||
Table importT = UTIL.getConnection().getTable(TableName.valueOf(IMPORT_TABLE));
|
Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
|
||||||
args = new String[] {
|
args = new String[] {
|
||||||
IMPORT_TABLE,
|
importTable,
|
||||||
FQ_OUTPUT_DIR
|
FQ_OUTPUT_DIR
|
||||||
};
|
};
|
||||||
assertTrue(runImport(args));
|
assertTrue(exporter.toString(), runImport(args));
|
||||||
|
|
||||||
Scan s = new Scan();
|
Scan s = new Scan();
|
||||||
s.setMaxVersions();
|
s.setMaxVersions();
|
||||||
|
@ -434,26 +547,39 @@ public class TestImportExport {
|
||||||
Result exportedTResult = exportedTScanner.next();
|
Result exportedTResult = exportedTScanner.next();
|
||||||
try {
|
try {
|
||||||
Result.compareResults(exportedTResult, importedTResult);
|
Result.compareResults(exportedTResult, importedTResult);
|
||||||
} catch (Exception e) {
|
} catch (IOException e) {
|
||||||
fail("Original and imported tables data comparision failed with error:"+e.getMessage());
|
fail("Original and imported tables data comparision failed with error:"+e.getMessage());
|
||||||
} finally {
|
} finally {
|
||||||
exportT.close();
|
exportT.close();
|
||||||
importT.close();
|
importT.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
|
* Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
|
||||||
* attempt with invalid values.
|
* attempt with invalid values.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testWithFilter() throws Exception {
|
public void testWithFilter() throws IOException, Throwable {
|
||||||
|
String exportTable = "exportSimpleCase_ImportWithFilter";
|
||||||
|
String importTable = "importWithFilter";
|
||||||
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
testWithFilter(exportTable, importTable, exporter);
|
||||||
|
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||||
|
UTIL.deleteTable(TableName.valueOf(importTable));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
|
||||||
|
* attempt with invalid values.
|
||||||
|
*/
|
||||||
|
public void testWithFilter(final String exportTable, final String importTable,
|
||||||
|
final Exporter exporter) throws IOException, Throwable {
|
||||||
// Create simple table to export
|
// Create simple table to export
|
||||||
String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
|
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(exportTable));
|
||||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
|
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
||||||
UTIL.getHBaseAdmin().createTable(desc);
|
UTIL.getAdmin().createTable(desc);
|
||||||
Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
|
Table exportT = UTIL.getConnection().getTable(desc.getTableName());
|
||||||
|
|
||||||
Put p1 = new Put(ROW1);
|
Put p1 = new Put(ROW1);
|
||||||
p1.addColumn(FAMILYA, QUAL, now, QUAL);
|
p1.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
@ -466,43 +592,42 @@ public class TestImportExport {
|
||||||
Put p2 = new Put(ROW2);
|
Put p2 = new Put(ROW2);
|
||||||
p2.addColumn(FAMILYA, QUAL, now, QUAL);
|
p2.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
|
||||||
exportTable.put(Arrays.asList(p1, p2));
|
exportT.put(Arrays.asList(p1, p2));
|
||||||
|
|
||||||
// Export the simple table
|
// Export the simple table
|
||||||
String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
|
String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1000" };
|
||||||
assertTrue(runExport(args));
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
|
||||||
// Import to a new table
|
// Import to a new table
|
||||||
String IMPORT_TABLE = "importWithFilter";
|
desc = new HTableDescriptor(TableName.valueOf(importTable));
|
||||||
desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
|
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
||||||
UTIL.getHBaseAdmin().createTable(desc);
|
UTIL.getAdmin().createTable(desc);
|
||||||
|
|
||||||
Table importTable = UTIL.getConnection().getTable(desc.getTableName());
|
Table importT = UTIL.getConnection().getTable(desc.getTableName());
|
||||||
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
|
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
|
||||||
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
|
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), importTable,
|
||||||
FQ_OUTPUT_DIR,
|
FQ_OUTPUT_DIR,
|
||||||
"1000" };
|
"1000" };
|
||||||
assertTrue(runImport(args));
|
assertTrue(exporter.toString(), runImport(args));
|
||||||
|
|
||||||
// get the count of the source table for that time range
|
// get the count of the source table for that time range
|
||||||
PrefixFilter filter = new PrefixFilter(ROW1);
|
PrefixFilter filter = new PrefixFilter(ROW1);
|
||||||
int count = getCount(exportTable, filter);
|
int count = getCount(exportT, filter);
|
||||||
|
|
||||||
Assert.assertEquals("Unexpected row count between export and import tables", count,
|
Assert.assertEquals("Unexpected row count between export(" + exporter.toString() + ") and import tables", count,
|
||||||
getCount(importTable, null));
|
getCount(importT, null));
|
||||||
|
|
||||||
// and then test that a broken command doesn't bork everything - easier here because we don't
|
// and then test that a broken command doesn't bork everything - easier here because we don't
|
||||||
// need to re-run the export job
|
// need to re-run the export job
|
||||||
|
|
||||||
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
|
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
|
||||||
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
|
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", exportTable,
|
||||||
FQ_OUTPUT_DIR, "1000" };
|
FQ_OUTPUT_DIR, "1000" };
|
||||||
assertFalse(runImport(args));
|
assertFalse(runImport(args));
|
||||||
|
|
||||||
// cleanup
|
// cleanup
|
||||||
exportTable.close();
|
exportT.close();
|
||||||
importTable.close();
|
importT.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -637,40 +762,50 @@ public class TestImportExport {
|
||||||
|
|
||||||
Import.addFilterAndArguments(configuration, FilterBase.class, args);
|
Import.addFilterAndArguments(configuration, FilterBase.class, args);
|
||||||
assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
|
assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
|
||||||
configuration.get(Import.FILTER_CLASS_CONF_KEY));
|
configuration.get(Import.FILTER_CLASS_CONF_KEY));
|
||||||
assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
|
assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDurability() throws Exception {
|
public void testDurability() throws IOException, Throwable {
|
||||||
|
String exportTable = "exporttestDurability";
|
||||||
|
String importTable = "importTestDurability1";
|
||||||
|
String importTableV2 = "importTestDurability2";
|
||||||
|
for (Exporter exporter : EXPORTERS) {
|
||||||
|
testDurability(exportTable, importTable, importTableV2, exporter);
|
||||||
|
UTIL.deleteTable(TableName.valueOf(exportTable));
|
||||||
|
UTIL.deleteTable(TableName.valueOf(importTable));
|
||||||
|
UTIL.deleteTable(TableName.valueOf(importTableV2));
|
||||||
|
deleteOutput();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void testDurability(final String exportTable, final String importTable, final String importTable2,
|
||||||
|
final Exporter exporter) throws IOException, Throwable {
|
||||||
// Create an export table.
|
// Create an export table.
|
||||||
String exportTableName = "exporttestDurability";
|
try (Table exportT = UTIL.createTable(TableName.valueOf(exportTable), FAMILYA, 3);) {
|
||||||
try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
|
|
||||||
|
|
||||||
// Insert some data
|
// Insert some data
|
||||||
Put put = new Put(ROW1);
|
Put put = new Put(ROW1);
|
||||||
put.addColumn(FAMILYA, QUAL, now, QUAL);
|
put.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
||||||
put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
|
put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
|
||||||
exportTable.put(put);
|
exportT.put(put);
|
||||||
|
|
||||||
put = new Put(ROW2);
|
put = new Put(ROW2);
|
||||||
put.addColumn(FAMILYA, QUAL, now, QUAL);
|
put.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
|
||||||
put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
|
put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
|
||||||
exportTable.put(put);
|
exportT.put(put);
|
||||||
|
|
||||||
// Run the export
|
// Run the export
|
||||||
String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
|
String[] args = new String[] { exportTable, FQ_OUTPUT_DIR, "1000"};
|
||||||
assertTrue(runExport(args));
|
assertTrue(exporter.toString(), exporter.runExport(args));
|
||||||
|
|
||||||
// Create the table for import
|
// Create the table for import
|
||||||
String importTableName = "importTestDurability1";
|
Table importT = UTIL.createTable(TableName.valueOf(importTable), FAMILYA, 3);
|
||||||
Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
|
|
||||||
|
|
||||||
// Register the wal listener for the import table
|
// Register the wal listener for the import table
|
||||||
HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
|
HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
|
||||||
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
|
.getOnlineRegions(importT.getName()).get(0).getRegionInfo();
|
||||||
TableWALActionListener walListener = new TableWALActionListener(region);
|
TableWALActionListener walListener = new TableWALActionListener(region);
|
||||||
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
|
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
|
||||||
wal.registerWALActionsListener(walListener);
|
wal.registerWALActionsListener(walListener);
|
||||||
|
@ -678,27 +813,26 @@ public class TestImportExport {
|
||||||
// Run the import with SKIP_WAL
|
// Run the import with SKIP_WAL
|
||||||
args =
|
args =
|
||||||
new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
|
new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
|
||||||
importTableName, FQ_OUTPUT_DIR };
|
importTable, FQ_OUTPUT_DIR };
|
||||||
assertTrue(runImport(args));
|
assertTrue(exporter.toString(), runImport(args));
|
||||||
//Assert that the wal is not visisted
|
//Assert that the wal is not visisted
|
||||||
assertTrue(!walListener.isWALVisited());
|
assertTrue(exporter.toString(), !walListener.isWALVisited());
|
||||||
//Ensure that the count is 2 (only one version of key value is obtained)
|
//Ensure that the count is 2 (only one version of key value is obtained)
|
||||||
assertTrue(getCount(importTable, null) == 2);
|
assertTrue(exporter.toString(), getCount(importT, null) == 2);
|
||||||
|
|
||||||
// Run the import with the default durability option
|
// Run the import with the default durability option
|
||||||
importTableName = "importTestDurability2";
|
importT = UTIL.createTable(TableName.valueOf(importTable2), FAMILYA, 3);
|
||||||
importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
|
|
||||||
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
|
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
|
||||||
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
|
.getOnlineRegions(importT.getName()).get(0).getRegionInfo();
|
||||||
wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
|
wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
|
||||||
walListener = new TableWALActionListener(region);
|
walListener = new TableWALActionListener(region);
|
||||||
wal.registerWALActionsListener(walListener);
|
wal.registerWALActionsListener(walListener);
|
||||||
args = new String[] { importTableName, FQ_OUTPUT_DIR };
|
args = new String[] { importTable2, FQ_OUTPUT_DIR };
|
||||||
assertTrue(runImport(args));
|
assertTrue(exporter.toString(), runImport(args));
|
||||||
//Assert that the wal is visisted
|
//Assert that the wal is visisted
|
||||||
assertTrue(walListener.isWALVisited());
|
assertTrue(exporter.toString(), walListener.isWALVisited());
|
||||||
//Ensure that the count is 2 (only one version of key value is obtained)
|
//Ensure that the count is 2 (only one version of key value is obtained)
|
||||||
assertTrue(getCount(importTable, null) == 2);
|
assertTrue(exporter.toString(), getCount(importT, null) == 2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -727,4 +861,7 @@ public class TestImportExport {
|
||||||
return isVisited;
|
return isVisited;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
public interface Exporter {
|
||||||
|
boolean runExport(final String[] args) throws Throwable;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue